package org.apache.hadoop.hbase.io.asyncfs;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemLinkResolver;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.LocatedBlockHelper;
import org.apache.hadoop.hbase.util.NettyFutureUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundInvoker;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.class */
public final class FanOutOneBlockAsyncDFSOutputHelper {
    public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";
    public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;
    public static final long HEART_BEAT_SEQNO = -1;
    public static final int READ_TIMEOUT = 60000;
    private static final LeaseManager LEASE_MANAGER;
    private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
    private static final FileCreator FILE_CREATOR;
    private static final CreateFlag SHOULD_REPLICATE_FLAG;
    private static final Logger LOG = LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class);
    private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;

    /* loaded from: input_file:org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper$CancelOnClose.class */
    static final class CancelOnClose implements CancelableProgressable {
        private final DFSClient client;

        public CancelOnClose(DFSClient dFSClient) {
            this.client = dFSClient;
        }

        @Override // org.apache.hadoop.hbase.util.CancelableProgressable
        public boolean progress() {
            return FanOutOneBlockAsyncDFSOutputHelper.DFS_CLIENT_ADAPTOR.isClientRunning(this.client);
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper$DFSClientAdaptor.class */
    private interface DFSClientAdaptor {
        boolean isClientRunning(DFSClient dFSClient);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper$FileCreator.class */
    public interface FileCreator {
        default HdfsFileStatus create(ClientProtocol clientProtocol, String str, FsPermission fsPermission, String str2, EnumSetWritable<CreateFlag> enumSetWritable, boolean z, short s, long j, CryptoProtocolVersion[] cryptoProtocolVersionArr) throws Exception {
            try {
                return (HdfsFileStatus) createObject(clientProtocol, str, fsPermission, str2, enumSetWritable, z, s, j, cryptoProtocolVersionArr);
            } catch (InvocationTargetException e) {
                if (e.getCause() instanceof Exception) {
                    throw ((Exception) e.getCause());
                }
                throw new RuntimeException(e.getCause());
            }
        }

        Object createObject(ClientProtocol clientProtocol, String str, FsPermission fsPermission, String str2, EnumSetWritable<CreateFlag> enumSetWritable, boolean z, short s, long j, CryptoProtocolVersion[] cryptoProtocolVersionArr) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper$LeaseManager.class */
    public interface LeaseManager {
        void begin(DFSClient dFSClient, HdfsFileStatus hdfsFileStatus);

        void end(DFSClient dFSClient, HdfsFileStatus hdfsFileStatus);
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper$NameNodeException.class */
    public static class NameNodeException extends IOException {
        private static final long serialVersionUID = 3143237406477095390L;

        public NameNodeException(Throwable th) {
            super(th);
        }
    }

    private FanOutOneBlockAsyncDFSOutputHelper() {
    }

    private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
        final Method declaredMethod = DFSClient.class.getDeclaredMethod("isClientRunning", new Class[0]);
        declaredMethod.setAccessible(true);
        return new DFSClientAdaptor() { // from class: org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.1
            @Override // org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.DFSClientAdaptor
            public boolean isClientRunning(DFSClient dFSClient) {
                try {
                    return ((Boolean) declaredMethod.invoke(dFSClient, new Object[0])).booleanValue();
                } catch (IllegalAccessException | InvocationTargetException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    private static LeaseManager createLeaseManager3_4() throws NoSuchMethodException {
        final Method declaredMethod = DFSClient.class.getDeclaredMethod("beginFileLease", String.class, DFSOutputStream.class);
        declaredMethod.setAccessible(true);
        final Method declaredMethod2 = DFSClient.class.getDeclaredMethod("endFileLease", String.class);
        declaredMethod2.setAccessible(true);
        final Method declaredMethod3 = DFSClient.class.getDeclaredMethod("getConfiguration", new Class[0]);
        declaredMethod3.setAccessible(true);
        final Method declaredMethod4 = HdfsFileStatus.class.getDeclaredMethod("getNamespace", new Class[0]);
        return new LeaseManager() { // from class: org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.2
            private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY = "dfs.client.output.stream.uniq.default.key";
            private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = "DEFAULT";

            private String getUniqId(DFSClient dFSClient, HdfsFileStatus hdfsFileStatus) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
                long fileId = hdfsFileStatus.getFileId();
                String str = (String) declaredMethod4.invoke(hdfsFileStatus, new Object[0]);
                if (str != null) {
                    return str + "_" + fileId;
                }
                return ((Configuration) declaredMethod3.invoke(dFSClient, new Object[0])).get(DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY, DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT) + "_" + fileId;
            }

            @Override // org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.LeaseManager
            public void begin(DFSClient dFSClient, HdfsFileStatus hdfsFileStatus) {
                try {
                    declaredMethod.invoke(dFSClient, getUniqId(dFSClient, hdfsFileStatus), null);
                } catch (IllegalAccessException | InvocationTargetException e) {
                    throw new RuntimeException(e);
                }
            }

            @Override // org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.LeaseManager
            public void end(DFSClient dFSClient, HdfsFileStatus hdfsFileStatus) {
                try {
                    declaredMethod2.invoke(dFSClient, getUniqId(dFSClient, hdfsFileStatus));
                } catch (IllegalAccessException | InvocationTargetException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    private static LeaseManager createLeaseManager3() throws NoSuchMethodException {
        final Method declaredMethod = DFSClient.class.getDeclaredMethod("beginFileLease", Long.TYPE, DFSOutputStream.class);
        declaredMethod.setAccessible(true);
        final Method declaredMethod2 = DFSClient.class.getDeclaredMethod("endFileLease", Long.TYPE);
        declaredMethod2.setAccessible(true);
        return new LeaseManager() { // from class: org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.3
            @Override // org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.LeaseManager
            public void begin(DFSClient dFSClient, HdfsFileStatus hdfsFileStatus) {
                try {
                    declaredMethod.invoke(dFSClient, Long.valueOf(hdfsFileStatus.getFileId()), null);
                } catch (IllegalAccessException | InvocationTargetException e) {
                    throw new RuntimeException(e);
                }
            }

            @Override // org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.LeaseManager
            public void end(DFSClient dFSClient, HdfsFileStatus hdfsFileStatus) {
                try {
                    declaredMethod2.invoke(dFSClient, Long.valueOf(hdfsFileStatus.getFileId()));
                } catch (IllegalAccessException | InvocationTargetException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    private static LeaseManager createLeaseManager() throws NoSuchMethodException {
        try {
            return createLeaseManager3_4();
        } catch (NoSuchMethodException e) {
            LOG.debug("DFSClient::beginFileLease wrong arguments, should be hadoop 3.3 or below");
            return createLeaseManager3();
        }
    }

    private static FileCreator createFileCreator3_3() throws NoSuchMethodException {
        Method method = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, String.class, EnumSetWritable.class, Boolean.TYPE, Short.TYPE, Long.TYPE, CryptoProtocolVersion[].class, String.class, String.class);
        return (clientProtocol, str, fsPermission, str2, enumSetWritable, z, s, j, cryptoProtocolVersionArr) -> {
            return (HdfsFileStatus) method.invoke(clientProtocol, str, fsPermission, str2, enumSetWritable, Boolean.valueOf(z), Short.valueOf(s), Long.valueOf(j), cryptoProtocolVersionArr, null, null);
        };
    }

    private static FileCreator createFileCreator3() throws NoSuchMethodException {
        Method method = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, String.class, EnumSetWritable.class, Boolean.TYPE, Short.TYPE, Long.TYPE, CryptoProtocolVersion[].class, String.class);
        return (clientProtocol, str, fsPermission, str2, enumSetWritable, z, s, j, cryptoProtocolVersionArr) -> {
            return (HdfsFileStatus) method.invoke(clientProtocol, str, fsPermission, str2, enumSetWritable, Boolean.valueOf(z), Short.valueOf(s), Long.valueOf(j), cryptoProtocolVersionArr, null);
        };
    }

    private static FileCreator createFileCreator2() throws NoSuchMethodException {
        Method method = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, String.class, EnumSetWritable.class, Boolean.TYPE, Short.TYPE, Long.TYPE, CryptoProtocolVersion[].class);
        return (clientProtocol, str, fsPermission, str2, enumSetWritable, z, s, j, cryptoProtocolVersionArr) -> {
            return (HdfsFileStatus) method.invoke(clientProtocol, str, fsPermission, str2, enumSetWritable, Boolean.valueOf(z), Short.valueOf(s), Long.valueOf(j), cryptoProtocolVersionArr);
        };
    }

    private static FileCreator createFileCreator() throws NoSuchMethodException {
        try {
            return createFileCreator3_3();
        } catch (NoSuchMethodException e) {
            LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 3.2 or below");
            try {
                return createFileCreator3();
            } catch (NoSuchMethodException e2) {
                LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x");
                return createFileCreator2();
            }
        }
    }

    private static CreateFlag loadShouldReplicateFlag() {
        try {
            return CreateFlag.valueOf("SHOULD_REPLICATE");
        } catch (IllegalArgumentException e) {
            LOG.debug("can not find SHOULD_REPLICATE flag, should be hadoop 2.x", e);
            return null;
        }
    }

    static void beginFileLease(DFSClient dFSClient, HdfsFileStatus hdfsFileStatus) {
        LEASE_MANAGER.begin(dFSClient, hdfsFileStatus);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void endFileLease(DFSClient dFSClient, HdfsFileStatus hdfsFileStatus) {
        LEASE_MANAGER.end(dFSClient, hdfsFileStatus);
    }

    static DataChecksum createChecksum(DFSClient dFSClient) {
        return dFSClient.getConf().createChecksum((Options.ChecksumOpt) null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static DataTransferProtos.Status getStatus(DataTransferProtos.PipelineAckProto pipelineAckProto) {
        Integer num;
        List flagList = pipelineAckProto.getFlagList();
        if (flagList.isEmpty()) {
            num = Integer.valueOf(PipelineAck.combineHeader(PipelineAck.ECN.DISABLED, pipelineAckProto.getReply(0)));
        } else {
            num = (Integer) flagList.get(0);
        }
        return PipelineAck.getStatusFromHeader(num.intValue());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void processWriteBlockResponse(Channel channel, final DatanodeInfo datanodeInfo, final Promise<Channel> promise, final int i) {
        channel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(i, 0L, 0L, TimeUnit.MILLISECONDS), new ProtobufVarint32FrameDecoder(), new ProtobufDecoder(DataTransferProtos.BlockOpResponseProto.getDefaultInstance()), new SimpleChannelInboundHandler<DataTransferProtos.BlockOpResponseProto>() { // from class: org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.4
            /* JADX INFO: Access modifiers changed from: protected */
            public void channelRead0(ChannelHandlerContext channelHandlerContext, DataTransferProtos.BlockOpResponseProto blockOpResponseProto) throws Exception {
                ChannelHandler removeLast;
                if (PipelineAck.isRestartOOBStatus(blockOpResponseProto.getStatus())) {
                    throw new IOException("datanode " + datanodeInfo + " is restarting");
                }
                String str = "ack with firstBadLink as " + blockOpResponseProto.getFirstBadLink();
                if (blockOpResponseProto.getStatus() != DataTransferProtos.Status.SUCCESS) {
                    if (blockOpResponseProto.getStatus() != DataTransferProtos.Status.ERROR_ACCESS_TOKEN) {
                        throw new IOException("Got error, status=" + blockOpResponseProto.getStatus().name() + ", status message " + blockOpResponseProto.getMessage() + ", " + str);
                    }
                    throw new InvalidBlockTokenException("Got access token error, status message " + blockOpResponseProto.getMessage() + ", " + str);
                }
                ChannelPipeline pipeline = channelHandlerContext.pipeline();
                do {
                    removeLast = pipeline.removeLast();
                    if (removeLast == null) {
                        break;
                    }
                } while (!(removeLast instanceof IdleStateHandler));
                channelHandlerContext.channel().config().setAutoRead(false);
                promise.trySuccess(channelHandlerContext.channel());
            }

            public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
                promise.tryFailure(new IOException("connection to " + datanodeInfo + " is closed"));
            }

            public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
                if ((obj instanceof IdleStateEvent) && ((IdleStateEvent) obj).state() == IdleState.READER_IDLE) {
                    promise.tryFailure(new IOException("Timeout(" + i + "ms) waiting for response"));
                } else {
                    super.userEventTriggered(channelHandlerContext, obj);
                }
            }

            public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
                promise.tryFailure(th);
            }
        }});
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void requestWriteBlock(Channel channel, StorageType storageType, DataTransferProtos.OpWriteBlockProto.Builder builder) throws IOException {
        DataTransferProtos.OpWriteBlockProto build = builder.setStorageType(PBHelperClient.convertStorageType(storageType)).build();
        int serializedSize = build.getSerializedSize();
        ByteBuf buffer = channel.alloc().buffer(3 + CodedOutputStream.computeUInt32SizeNoTag(serializedSize) + serializedSize);
        buffer.writeShort(28);
        buffer.writeByte(Op.WRITE_BLOCK.code);
        build.writeDelimitedTo(new ByteBufOutputStream(buffer));
        NettyFutureUtils.safeWriteAndFlush(channel, buffer);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void initialize(Configuration configuration, final Channel channel, final DatanodeInfo datanodeInfo, final StorageType storageType, final DataTransferProtos.OpWriteBlockProto.Builder builder, final int i, DFSClient dFSClient, Token<BlockTokenIdentifier> token, final Promise<Channel> promise) throws IOException {
        Promise newPromise = channel.eventLoop().newPromise();
        FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate(configuration, channel, datanodeInfo, i, dFSClient, token, newPromise);
        NettyFutureUtils.addListener(newPromise, new FutureListener<Void>() { // from class: org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.5
            public void operationComplete(Future<Void> future) throws Exception {
                if (!future.isSuccess()) {
                    promise.tryFailure(future.cause());
                } else {
                    FanOutOneBlockAsyncDFSOutputHelper.processWriteBlockResponse(channel, datanodeInfo, promise, i);
                    FanOutOneBlockAsyncDFSOutputHelper.requestWriteBlock(channel, storageType, builder);
                }
            }
        });
    }

    private static List<Future<Channel>> connectToDataNodes(final Configuration configuration, final DFSClient dFSClient, String str, final LocatedBlock locatedBlock, long j, long j2, BlockConstructionStage blockConstructionStage, DataChecksum dataChecksum, EventLoopGroup eventLoopGroup, Class<? extends Channel> cls) {
        StorageType[] storageTypes = locatedBlock.getStorageTypes();
        DatanodeInfo[] locatedBlockLocations = LocatedBlockHelper.getLocatedBlockLocations(locatedBlock);
        boolean z = configuration.getBoolean("dfs.client.use.datanode.hostname", false);
        final int i = configuration.getInt("dfs.client.socket-timeout", READ_TIMEOUT);
        ExtendedBlock extendedBlock = new ExtendedBlock(locatedBlock.getBlock());
        extendedBlock.setNumBytes(locatedBlock.getBlockSize());
        final DataTransferProtos.OpWriteBlockProto.Builder cachingStrategy = DataTransferProtos.OpWriteBlockProto.newBuilder().setHeader(DataTransferProtos.ClientOperationHeaderProto.newBuilder().setBaseHeader(DataTransferProtos.BaseHeaderProto.newBuilder().setBlock(PBHelperClient.convert(extendedBlock)).setToken(PBHelperClient.convert(locatedBlock.getBlockToken()))).setClientName(str).build()).setStage(DataTransferProtos.OpWriteBlockProto.BlockConstructionStage.valueOf(blockConstructionStage.name())).setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()).setMaxBytesRcvd(j).setLatestGenerationStamp(j2).setRequestedChecksum(DataTransferProtoUtil.toProto(dataChecksum)).setCachingStrategy(DataTransferProtos.CachingStrategyProto.newBuilder().setDropBehind(true).build());
        ArrayList arrayList = new ArrayList(locatedBlockLocations.length);
        for (int i2 = 0; i2 < locatedBlockLocations.length; i2++) {
            final DatanodeInfo datanodeInfo = locatedBlockLocations[i2];
            final StorageType storageType = storageTypes[i2];
            final Promise newPromise = eventLoopGroup.next().newPromise();
            arrayList.add(newPromise);
            NettyFutureUtils.addListener(new Bootstrap().group(eventLoopGroup).channel(cls).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(i)).handler(new ChannelInitializer<Channel>() { // from class: org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.7
                protected void initChannel(Channel channel) throws Exception {
                }
            }).connect(NetUtils.createSocketAddr(datanodeInfo.getXferAddr(z))), new ChannelFutureListener() { // from class: org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.6
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isSuccess()) {
                        FanOutOneBlockAsyncDFSOutputHelper.initialize(configuration, channelFuture.channel(), datanodeInfo, storageType, cachingStrategy, i, dFSClient, locatedBlock.getBlockToken(), newPromise);
                    } else {
                        newPromise.tryFailure(channelFuture.cause());
                    }
                }
            });
        }
        return arrayList;
    }

    private static EnumSetWritable<CreateFlag> getCreateFlags(boolean z, boolean z2) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(CreateFlag.CREATE);
        if (z) {
            arrayList.add(CreateFlag.OVERWRITE);
        }
        if (SHOULD_REPLICATE_FLAG != null) {
            arrayList.add(SHOULD_REPLICATE_FLAG);
        }
        if (z2) {
            arrayList.add(CreateFlag.NO_LOCAL_WRITE);
        }
        return new EnumSetWritable<>(EnumSet.copyOf((Collection) arrayList));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem distributedFileSystem, String str, boolean z, boolean z2, short s, long j, EventLoopGroup eventLoopGroup, Class<? extends Channel> cls, StreamSlowMonitor streamSlowMonitor, boolean z3) throws IOException {
        Configuration conf = distributedFileSystem.getConf();
        DFSClient client = distributedFileSystem.getClient();
        String clientName = client.getClientName();
        ClientProtocol namenode = client.getNamenode();
        int i = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, 10);
        ExcludeDatanodeManager excludeDatanodeManager = streamSlowMonitor.getExcludeDatanodeManager();
        HashSet<DatanodeInfo> hashSet = new HashSet(excludeDatanodeManager.getExcludeDNs().keySet());
        if (conf.getBoolean(ExcludeDatanodeManager.HBASE_REGIONSERVER_ASYNC_WAL_WRITE_WITH_EXCLUDED_DATANODES, false)) {
            DatanodeInfo[] datanodeReport = client.datanodeReport(HdfsConstants.DatanodeReportType.LIVE);
            ArrayList<DatanodeInfo> arrayList = new ArrayList();
            for (DatanodeInfo datanodeInfo : datanodeReport) {
                if (datanodeInfo.getAdminState() == DatanodeInfo.AdminStates.DECOMMISSIONED || datanodeInfo.getAdminState() == DatanodeInfo.AdminStates.DECOMMISSION_INPROGRESS || datanodeInfo.getAdminState() == DatanodeInfo.AdminStates.ENTERING_MAINTENANCE || datanodeInfo.getAdminState() == DatanodeInfo.AdminStates.IN_MAINTENANCE) {
                    arrayList.add(datanodeInfo);
                }
            }
            for (DatanodeInfo datanodeInfo2 : arrayList) {
                if (hashSet.contains(datanodeInfo2)) {
                    hashSet.remove(datanodeInfo2);
                }
            }
            if (datanodeReport.length == hashSet.size() || (datanodeReport.length - arrayList.size()) - hashSet.size() < s) {
                ArrayList arrayList2 = new ArrayList(hashSet.size());
                for (DatanodeInfo datanodeInfo3 : hashSet) {
                    if (!ExcludeDatanodeManager.ExcludeCause.CONNECT_ERROR.getName().equals(excludeDatanodeManager.getExcludeDNs().get(datanodeInfo3).getFirst()) && !arrayList.contains(datanodeInfo3)) {
                        arrayList2.add(datanodeInfo3);
                    }
                }
                Logger logger = LOG;
                Object[] objArr = new Object[3];
                objArr[0] = arrayList;
                objArr[1] = hashSet;
                objArr[2] = arrayList2.isEmpty() ? "empty" : arrayList2;
                logger.info("Maintenance DNs: {}, excluded DNs: {} and possible candidate DNs: {}", objArr);
                Random random = new Random();
                while ((datanodeReport.length - arrayList.size()) - hashSet.size() < s && !arrayList2.isEmpty()) {
                    DatanodeInfo datanodeInfo4 = (DatanodeInfo) arrayList2.get(random.nextInt(arrayList2.size()));
                    hashSet.remove(datanodeInfo4);
                    arrayList2.remove(datanodeInfo4);
                }
                hashSet.addAll(arrayList);
                LOG.info("Datanodes {} will be excluded while creating the WAL output stream", hashSet);
            }
        }
        int i2 = 0;
        while (true) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("When create output stream for {}, exclude list is {}, retry={}", new Object[]{str, getDataNodeInfo(hashSet), Integer.valueOf(i2)});
            }
            try {
                HdfsFileStatus create = FILE_CREATOR.create(namenode, str, FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, getCreateFlags(z, z3), z2, s, j, CryptoProtocolVersion.supported());
                beginFileLease(client, create);
                List list = null;
                try {
                    try {
                        DataChecksum createChecksum = createChecksum(client);
                        LocatedBlock addBlock = namenode.addBlock(str, client.getClientName(), (ExtendedBlock) null, (DatanodeInfo[]) hashSet.toArray(new DatanodeInfo[0]), create.getFileId(), (String[]) null, (EnumSet) null);
                        IdentityHashMap identityHashMap = new IdentityHashMap();
                        List<Future<Channel>> connectToDataNodes = connectToDataNodes(conf, client, clientName, addBlock, 0L, 0L, BlockConstructionStage.PIPELINE_SETUP_CREATE, createChecksum, eventLoopGroup, cls);
                        int size = connectToDataNodes.size();
                        for (int i3 = 0; i3 < size; i3++) {
                            DatanodeInfo datanodeInfo5 = LocatedBlockHelper.getLocatedBlockLocations(addBlock)[i3];
                            try {
                                identityHashMap.put((Channel) connectToDataNodes.get(i3).syncUninterruptibly().getNow(), datanodeInfo5);
                            } catch (Exception e) {
                                hashSet.add(datanodeInfo5);
                                excludeDatanodeManager.tryAddExcludeDN(datanodeInfo5, ExcludeDatanodeManager.ExcludeCause.CONNECT_ERROR.getName());
                                throw e;
                            }
                        }
                        FanOutOneBlockAsyncDFSOutput fanOutOneBlockAsyncDFSOutput = new FanOutOneBlockAsyncDFSOutput(conf, distributedFileSystem, client, namenode, clientName, str, create, addBlock, FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor(conf, create, client), identityHashMap, createChecksum, ALLOC, streamSlowMonitor);
                        if (1 == 0) {
                            if (connectToDataNodes != null) {
                                Iterator<Future<Channel>> it = connectToDataNodes.iterator();
                                while (it.hasNext()) {
                                    NettyFutureUtils.addListener(it.next(), new FutureListener<Channel>() { // from class: org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.8
                                        public void operationComplete(Future<Channel> future) throws Exception {
                                            if (future.isSuccess()) {
                                                NettyFutureUtils.safeClose((ChannelOutboundInvoker) future.getNow());
                                            }
                                        }
                                    });
                                }
                            }
                            endFileLease(client, create);
                        }
                        return fanOutOneBlockAsyncDFSOutput;
                    } catch (RemoteException e2) {
                        try {
                            LOG.warn("create fan-out dfs output {} failed, retry = {}", new Object[]{str, Integer.valueOf(i2), e2});
                            if (!shouldRetryCreate(e2)) {
                                throw e2.unwrapRemoteException();
                            }
                            if (i2 >= i) {
                                throw e2.unwrapRemoteException();
                            }
                            if (0 == 0) {
                                if (0 != 0) {
                                    Iterator it2 = list.iterator();
                                    while (it2.hasNext()) {
                                        NettyFutureUtils.addListener((Future) it2.next(), new FutureListener<Channel>() { // from class: org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.8
                                            public void operationComplete(Future<Channel> future) throws Exception {
                                                if (future.isSuccess()) {
                                                    NettyFutureUtils.safeClose((ChannelOutboundInvoker) future.getNow());
                                                }
                                            }
                                        });
                                    }
                                }
                                endFileLease(client, create);
                            }
                        } catch (Throwable th) {
                            if (0 == 0) {
                                if (0 != 0) {
                                    Iterator it3 = list.iterator();
                                    while (it3.hasNext()) {
                                        NettyFutureUtils.addListener((Future) it3.next(), new FutureListener<Channel>() { // from class: org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.8
                                            public void operationComplete(Future<Channel> future) throws Exception {
                                                if (future.isSuccess()) {
                                                    NettyFutureUtils.safeClose((ChannelOutboundInvoker) future.getNow());
                                                }
                                            }
                                        });
                                    }
                                }
                                endFileLease(client, create);
                            }
                            throw th;
                        }
                    }
                } catch (IOException e3) {
                    LOG.warn("create fan-out dfs output {} failed, retry = {}", new Object[]{str, Integer.valueOf(i2), e3});
                    if (i2 >= i) {
                        throw e3;
                    }
                    z = true;
                    try {
                        Thread.sleep(ConnectionUtils.getPauseTime(100L, i2));
                        if (0 == 0) {
                            if (0 != 0) {
                                Iterator it4 = list.iterator();
                                while (it4.hasNext()) {
                                    NettyFutureUtils.addListener((Future) it4.next(), new FutureListener<Channel>() { // from class: org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.8
                                        public void operationComplete(Future<Channel> future) throws Exception {
                                            if (future.isSuccess()) {
                                                NettyFutureUtils.safeClose((ChannelOutboundInvoker) future.getNow());
                                            }
                                        }
                                    });
                                }
                            }
                            endFileLease(client, create);
                        }
                    } catch (InterruptedException e4) {
                        throw new InterruptedIOException();
                    }
                }
                i2++;
            } catch (Exception e5) {
                if (e5 instanceof RemoteException) {
                    throw e5;
                }
                throw new NameNodeException(e5);
            }
        }
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper$9] */
    public static FanOutOneBlockAsyncDFSOutput createOutput(final DistributedFileSystem distributedFileSystem, Path path, final boolean z, final boolean z2, final short s, final long j, final EventLoopGroup eventLoopGroup, final Class<? extends Channel> cls, final StreamSlowMonitor streamSlowMonitor, final boolean z3) throws IOException {
        return (FanOutOneBlockAsyncDFSOutput) new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() { // from class: org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.9
            /* renamed from: doCall, reason: merged with bridge method [inline-methods] */
            public FanOutOneBlockAsyncDFSOutput m3doCall(Path path2) throws IOException, UnresolvedLinkException {
                return FanOutOneBlockAsyncDFSOutputHelper.createOutput(distributedFileSystem, path2.toUri().getPath(), z, z2, s, j, eventLoopGroup, (Class<? extends Channel>) cls, streamSlowMonitor, z3);
            }

            /* renamed from: next, reason: merged with bridge method [inline-methods] */
            public FanOutOneBlockAsyncDFSOutput m2next(FileSystem fileSystem, Path path2) throws IOException {
                throw new UnsupportedOperationException();
            }
        }.resolve(distributedFileSystem, path);
    }

    public static boolean shouldRetryCreate(RemoteException remoteException) {
        return remoteException.getClassName().endsWith("RetryStartFileException");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void completeFile(DFSClient dFSClient, ClientProtocol clientProtocol, String str, String str2, ExtendedBlock extendedBlock, HdfsFileStatus hdfsFileStatus) throws IOException {
        int numBlockWriteLocateFollowingRetry = dFSClient.getConf().getNumBlockWriteLocateFollowingRetry();
        for (int i = 0; i < numBlockWriteLocateFollowingRetry; i++) {
            try {
                if (clientProtocol.complete(str, str2, extendedBlock, hdfsFileStatus.getFileId())) {
                    endFileLease(dFSClient, hdfsFileStatus);
                    return;
                } else {
                    LOG.warn("complete file " + str + " not finished, retry = " + i);
                    sleepIgnoreInterrupt(i);
                }
            } catch (RemoteException e) {
                throw e.unwrapRemoteException();
            }
        }
        throw new IOException("can not complete file after retrying " + numBlockWriteLocateFollowingRetry + " times");
    }

    static void sleepIgnoreInterrupt(int i) {
        try {
            Thread.sleep(ConnectionUtils.getPauseTime(100L, i));
        } catch (InterruptedException e) {
        }
    }

    public static String getDataNodeInfo(Collection<DatanodeInfo> collection) {
        return collection.isEmpty() ? "[]" : (String) collection.stream().map(datanodeInfo -> {
            return "(" + datanodeInfo.getHostName() + "/" + datanodeInfo.getInfoAddr() + ":" + datanodeInfo.getInfoPort() + ")";
        }).collect(Collectors.joining(",", "[", "]"));
    }

    static {
        try {
            LEASE_MANAGER = createLeaseManager();
            DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
            FILE_CREATOR = createFileCreator();
            SHOULD_REPLICATE_FLAG = loadShouldReplicateFlag();
        } catch (Exception e) {
            LOG.error("Couldn't properly initialize access to HDFS internals. Please update your WAL Provider to not make use of the 'asyncfs' provider. See HBASE-16110 for more information.", e);
            throw new Error("Couldn't properly initialize access to HDFS internals. Please update your WAL Provider to not make use of the 'asyncfs' provider. See HBASE-16110 for more information.", e);
        }
    }
}
