package org.apache.hadoop.hdfs;

import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.apache.commons.configuration2.tree.DefaultExpressionEngineSymbols;
import org.apache.flink.hadoop.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.hadoop.shaded.com.google.common.cache.CacheBuilder;
import org.apache.flink.hadoop.shaded.com.google.common.cache.CacheLoader;
import org.apache.flink.hadoop.shaded.com.google.common.cache.LoadingCache;
import org.apache.flink.hadoop.shaded.com.google.common.cache.RemovalListener;
import org.apache.flink.hadoop.shaded.com.google.common.cache.RemovalNotification;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.BlockPlacementViolationException;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
import org.apache.htrace.core.Span;
import org.apache.htrace.core.SpanId;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException
    */
@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hdfs/DataStreamer.class */
public class DataStreamer extends Daemon {
    static final Logger LOG;
    private volatile boolean streamerClosed;
    protected final BlockToWrite block;
    protected Token<BlockTokenIdentifier> accessToken;
    private DataOutputStream blockStream;
    private DataInputStream blockReplyStream;
    private ResponseProcessor response;
    private volatile DatanodeInfo[] nodes;
    private volatile StorageType[] storageTypes;
    private volatile String[] storageIDs;
    private final ErrorState errorState;
    private volatile BlockConstructionStage stage;
    protected long bytesSent;
    private final boolean isLazyPersistFile;
    private final List<DatanodeInfo> failed;
    private List<DatanodeInfo> restartingNodes;
    private volatile int pipelineRecoveryCount;
    private boolean isHflushed;
    private final boolean isAppend;
    private long currentSeqno;
    private long lastQueuedSeqno;
    private long lastAckedSeqno;
    private long bytesCurBlock;
    private final LastExceptionInStreamer lastException;
    private Socket s;
    protected final DFSClient dfsClient;
    protected final String src;
    final DataChecksum checksum4WriteBlock;
    final Progressable progress;
    protected final HdfsFileStatus stat;
    private volatile boolean appendChunk;
    protected final LinkedList<DFSPacket> dataQueue;
    private final Map<Long, Long> packetSendTime;
    private final LinkedList<DFSPacket> ackQueue;
    private final AtomicReference<CachingStrategy> cachingStrategy;
    private final ByteArrayManager byteArrayManager;
    private final AtomicBoolean persistBlocks;
    private boolean failPacket;
    private final long dfsclientSlowLogThresholdMs;
    private long artificialSlowdown;
    private final List<DatanodeInfo> congestedNodes;
    private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000;
    private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS = 50000;
    private int lastCongestionBackoffTime;
    protected final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
    private final String[] favoredNodes;
    private final EnumSet<AddBlockFlag> addBlockFlags;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* renamed from: org.apache.hadoop.hdfs.DataStreamer$1 */
    /* loaded from: input_file:org/apache/hadoop/hdfs/DataStreamer$1.class */
    public static class AnonymousClass1 extends CacheLoader<DatanodeInfo, DatanodeInfo> {
        AnonymousClass1() {
        }

        @Override // org.apache.flink.hadoop.shaded.com.google.common.cache.CacheLoader
        public DatanodeInfo load(DatanodeInfo datanodeInfo) throws Exception {
            return datanodeInfo;
        }
    }

    /* renamed from: org.apache.hadoop.hdfs.DataStreamer$2 */
    /* loaded from: input_file:org/apache/hadoop/hdfs/DataStreamer$2.class */
    public static class AnonymousClass2 implements RemovalListener<DatanodeInfo, DatanodeInfo> {
        AnonymousClass2() {
        }

        @Override // org.apache.flink.hadoop.shaded.com.google.common.cache.RemovalListener
        public void onRemoval(@Nonnull RemovalNotification<DatanodeInfo, DatanodeInfo> removalNotification) {
            DataStreamer.LOG.info("Removing node " + removalNotification.getKey() + " from the excluded nodes list");
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hdfs/DataStreamer$BlockToWrite.class */
    public static class BlockToWrite {
        private ExtendedBlock currentBlock;
        static final /* synthetic */ boolean $assertionsDisabled;

        BlockToWrite(ExtendedBlock extendedBlock) {
            setCurrentBlock(extendedBlock);
        }

        public synchronized ExtendedBlock getCurrentBlock() {
            if (this.currentBlock == null) {
                return null;
            }
            return new ExtendedBlock(this.currentBlock);
        }

        synchronized long getNumBytes() {
            if (this.currentBlock == null) {
                return 0L;
            }
            return this.currentBlock.getNumBytes();
        }

        public synchronized void setCurrentBlock(ExtendedBlock extendedBlock) {
            this.currentBlock = (extendedBlock == null || extendedBlock.getLocalBlock() == null) ? null : new ExtendedBlock(extendedBlock);
        }

        public synchronized void setNumBytes(long j) {
            if (!$assertionsDisabled && this.currentBlock == null) {
                throw new AssertionError();
            }
            this.currentBlock.setNumBytes(j);
        }

        synchronized void setGenerationStamp(long j) {
            if (!$assertionsDisabled && this.currentBlock == null) {
                throw new AssertionError();
            }
            this.currentBlock.setGenerationStamp(j);
        }

        public synchronized String toString() {
            return this.currentBlock == null ? "null" : this.currentBlock.toString();
        }

        static {
            $assertionsDisabled = !DataStreamer.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hdfs/DataStreamer$ErrorState.class */
    public static class ErrorState {
        ErrorType error = ErrorType.NONE;
        private int badNodeIndex = -1;
        private boolean waitForRestart = true;
        private int restartingNodeIndex = -1;
        private long restartingNodeDeadline = 0;
        private final long datanodeRestartTimeout;

        ErrorState(long j) {
            this.datanodeRestartTimeout = j;
        }

        synchronized void resetInternalError() {
            if (hasInternalError()) {
                this.error = ErrorType.NONE;
            }
            this.badNodeIndex = -1;
            this.restartingNodeIndex = -1;
            this.restartingNodeDeadline = 0L;
            this.waitForRestart = true;
        }

        public synchronized void reset() {
            this.error = ErrorType.NONE;
            this.badNodeIndex = -1;
            this.restartingNodeIndex = -1;
            this.restartingNodeDeadline = 0L;
            this.waitForRestart = true;
        }

        public synchronized boolean hasInternalError() {
            return this.error == ErrorType.INTERNAL;
        }

        public synchronized boolean hasExternalError() {
            return this.error == ErrorType.EXTERNAL;
        }

        synchronized boolean hasError() {
            return this.error != ErrorType.NONE;
        }

        synchronized boolean hasDatanodeError() {
            return this.error == ErrorType.INTERNAL && isNodeMarked();
        }

        public synchronized void setInternalError() {
            this.error = ErrorType.INTERNAL;
        }

        public synchronized void setExternalError() {
            if (hasInternalError()) {
                return;
            }
            this.error = ErrorType.EXTERNAL;
        }

        synchronized void setBadNodeIndex(int i) {
            this.badNodeIndex = i;
        }

        public synchronized int getBadNodeIndex() {
            return this.badNodeIndex;
        }

        synchronized int getRestartingNodeIndex() {
            return this.restartingNodeIndex;
        }

        synchronized void initRestartingNode(int i, String str, boolean z) {
            this.restartingNodeIndex = i;
            if (z) {
                this.restartingNodeDeadline = Time.monotonicNow() + this.datanodeRestartTimeout;
                this.badNodeIndex = -1;
            } else {
                this.waitForRestart = false;
            }
            DataStreamer.LOG.info(str);
        }

        synchronized boolean isRestartingNode() {
            return this.restartingNodeIndex >= 0;
        }

        synchronized boolean isNodeMarked() {
            return this.badNodeIndex >= 0 || (isRestartingNode() && doWaitForRestart());
        }

        synchronized void markFirstNodeIfNotMarked() {
            if (isNodeMarked()) {
                return;
            }
            this.badNodeIndex = 0;
        }

        synchronized void adjustState4RestartingNode() {
            if (this.restartingNodeIndex >= 0) {
                if (this.badNodeIndex > this.restartingNodeIndex) {
                    this.restartingNodeIndex = -1;
                } else if (this.badNodeIndex < this.restartingNodeIndex) {
                    this.restartingNodeIndex--;
                } else if (this.waitForRestart) {
                    throw new IllegalStateException("badNodeIndex = " + this.badNodeIndex + " = restartingNodeIndex = " + this.restartingNodeIndex);
                }
            }
            if (!isRestartingNode()) {
                this.error = ErrorType.NONE;
            }
            this.badNodeIndex = -1;
        }

        public synchronized void checkRestartingNodeDeadline(DatanodeInfo[] datanodeInfoArr) {
            if (this.restartingNodeIndex >= 0) {
                if (this.error == ErrorType.NONE) {
                    throw new IllegalStateException("error=false while checking restarting node deadline");
                }
                if (this.badNodeIndex == this.restartingNodeIndex) {
                    this.badNodeIndex = -1;
                }
                if (Time.monotonicNow() >= this.restartingNodeDeadline) {
                    this.restartingNodeDeadline = 0L;
                    int i = this.restartingNodeIndex;
                    this.restartingNodeIndex = -1;
                    DataStreamer.LOG.warn("Datanode " + i + " did not restart within " + this.datanodeRestartTimeout + "ms: " + datanodeInfoArr[i]);
                    if (this.badNodeIndex == -1) {
                        this.badNodeIndex = i;
                    }
                }
            }
        }

        boolean doWaitForRestart() {
            return this.waitForRestart;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hdfs/DataStreamer$ErrorType.class */
    public enum ErrorType {
        NONE,
        INTERNAL,
        EXTERNAL
    }

    /* loaded from: input_file:org/apache/hadoop/hdfs/DataStreamer$LastExceptionInStreamer.class */
    public class LastExceptionInStreamer extends ExceptionLastSeen {
        LastExceptionInStreamer() {
        }

        @Override // org.apache.hadoop.hdfs.ExceptionLastSeen
        public synchronized void check(boolean z) throws IOException {
            IOException iOException = get();
            if (iOException != null) {
                if (DataStreamer.LOG.isTraceEnabled()) {
                    DataStreamer.LOG.trace("Got Exception while checking, " + DataStreamer.this, new Throwable(iOException));
                }
                super.check(z);
            }
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hdfs/DataStreamer$RefetchEncryptionKeyPolicy.class */
    public class RefetchEncryptionKeyPolicy {
        private int fetchEncryptionKeyTimes = 0;
        private InvalidEncryptionKeyException lastException;
        private final DatanodeInfo src;

        RefetchEncryptionKeyPolicy(DatanodeInfo datanodeInfo) {
            this.src = datanodeInfo;
        }

        boolean continueRetryingOrThrow() throws InvalidEncryptionKeyException {
            if (this.fetchEncryptionKeyTimes >= 2) {
                throw this.lastException;
            }
            DataStreamer.LOG.info("Will fetch a new encryption key and retry, encryption key was invalid when connecting to " + this.src + ": ", this.lastException);
            DataStreamer.this.dfsClient.clearDataEncryptionKey();
            return true;
        }

        void recordFailure(InvalidEncryptionKeyException invalidEncryptionKeyException) throws InvalidEncryptionKeyException {
            this.fetchEncryptionKeyTimes++;
            this.lastException = invalidEncryptionKeyException;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hdfs/DataStreamer$ResponseProcessor.class */
    public class ResponseProcessor extends Daemon {
        private DatanodeInfo[] targets;
        static final /* synthetic */ boolean $assertionsDisabled;
        private volatile boolean responderClosed = false;
        private boolean isLastPacketInBlock = false;

        ResponseProcessor(DatanodeInfo[] datanodeInfoArr) {
            this.targets = null;
            this.targets = datanodeInfoArr;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            long seqno;
            DFSPacket dFSPacket;
            TraceScope traceScope;
            Long l;
            setName("ResponseProcessor for block " + DataStreamer.this.block);
            PipelineAck pipelineAck = new PipelineAck();
            long datanodeAckTimeout = DataStreamer.this.dfsClient.getConf().getDatanodeAckTimeout();
            while (true) {
                AutoCloseable autoCloseable = null;
                if (this.responderClosed || !DataStreamer.this.dfsClient.clientRunning || this.isLastPacketInBlock) {
                    return;
                }
                try {
                    try {
                        pipelineAck.readFields(DataStreamer.this.blockReplyStream);
                        if (pipelineAck.getSeqno() != -1 && (l = (Long) DataStreamer.this.packetSendTime.get(Long.valueOf(pipelineAck.getSeqno()))) != null) {
                            long monotonicNow = Time.monotonicNow() - l.longValue();
                            if (monotonicNow > DataStreamer.this.dfsclientSlowLogThresholdMs) {
                                DataStreamer.LOG.info("Slow ReadProcessor read fields for block " + DataStreamer.this.block + " took " + monotonicNow + "ms (threshold=" + DataStreamer.this.dfsclientSlowLogThresholdMs + "ms); ack: " + pipelineAck + ", targets: " + Arrays.asList(this.targets));
                            }
                        }
                        if (DataStreamer.LOG.isDebugEnabled()) {
                            DataStreamer.LOG.debug("DFSClient {}", pipelineAck);
                        }
                        seqno = pipelineAck.getSeqno();
                        ArrayList arrayList = new ArrayList();
                        for (int numOfReplies = pipelineAck.getNumOfReplies() - 1; numOfReplies >= 0 && DataStreamer.this.dfsClient.clientRunning; numOfReplies--) {
                            DataTransferProtos.Status statusFromHeader = PipelineAck.getStatusFromHeader(pipelineAck.getHeaderFlag(numOfReplies));
                            if (PipelineAck.getECNFromHeader(pipelineAck.getHeaderFlag(numOfReplies)) == PipelineAck.ECN.CONGESTED) {
                                arrayList.add(this.targets[numOfReplies]);
                            }
                            if (PipelineAck.isRestartOOBStatus(statusFromHeader)) {
                                String str = "Datanode " + numOfReplies + " is restarting: " + this.targets[numOfReplies];
                                DataStreamer.this.errorState.initRestartingNode(numOfReplies, str, DataStreamer.this.shouldWaitForRestart(numOfReplies));
                                throw new IOException(str);
                            }
                            if (statusFromHeader != DataTransferProtos.Status.SUCCESS) {
                                DataStreamer.this.errorState.setBadNodeIndex(numOfReplies);
                                throw new IOException("Bad response " + statusFromHeader + " for " + DataStreamer.this.block + " from datanode " + this.targets[numOfReplies]);
                            }
                        }
                        if (arrayList.isEmpty()) {
                            synchronized (DataStreamer.this.congestedNodes) {
                                DataStreamer.this.congestedNodes.clear();
                                DataStreamer.this.lastCongestionBackoffTime = 0;
                            }
                        } else {
                            synchronized (DataStreamer.this.congestedNodes) {
                                DataStreamer.this.congestedNodes.clear();
                                DataStreamer.this.congestedNodes.addAll(arrayList);
                            }
                        }
                    } catch (Exception e) {
                        if (!this.responderClosed) {
                            DataStreamer.this.lastException.set(e);
                            DataStreamer.this.errorState.setInternalError();
                            DataStreamer.this.errorState.markFirstNodeIfNotMarked();
                            synchronized (DataStreamer.this.dataQueue) {
                                DataStreamer.this.dataQueue.notifyAll();
                                if (!DataStreamer.this.errorState.isRestartingNode()) {
                                    DataStreamer.LOG.warn("Exception for " + DataStreamer.this.block, e);
                                }
                                this.responderClosed = true;
                            }
                        }
                        if (0 != 0) {
                            autoCloseable.close();
                        }
                    }
                    if (!$assertionsDisabled && seqno == -2) {
                        throw new AssertionError("Ack for unknown seqno should be a failed ack: " + pipelineAck);
                    }
                    synchronized (DataStreamer.this.dataQueue) {
                        dFSPacket = DataStreamer.this.ackQueue.isEmpty() ? null : (DFSPacket) DataStreamer.this.ackQueue.getFirst();
                    }
                    if (seqno == -1) {
                        if (dFSPacket != null) {
                            long monotonicNow2 = Time.monotonicNow() - ((Long) DataStreamer.this.packetSendTime.get(Long.valueOf(dFSPacket.getSeqno()))).longValue();
                            if (monotonicNow2 >= datanodeAckTimeout) {
                                throw new IOException(datanodeAckTimeout + " milliseconds time out while waiting for data packet acknowledegment. Waiting from last " + monotonicNow2 + " milliseconds. ");
                            }
                        }
                        if (0 != 0) {
                            autoCloseable.close();
                        }
                    } else {
                        if (dFSPacket.getSeqno() != seqno) {
                            throw new IOException("ResponseProcessor: Expecting seqno  for block " + DataStreamer.this.block + dFSPacket.getSeqno() + " but received " + seqno);
                        }
                        this.isLastPacketInBlock = dFSPacket.isLastPacketInBlock();
                        if (DFSClientFaultInjector.get().failPacket() && this.isLastPacketInBlock) {
                            DataStreamer.this.failPacket = true;
                            throw new IOException("Failing the last packet for testing.");
                        }
                        DataStreamer.this.block.setNumBytes(dFSPacket.getLastByteOffsetBlock());
                        synchronized (DataStreamer.this.dataQueue) {
                            traceScope = dFSPacket.getTraceScope();
                            if (traceScope != null) {
                                traceScope.reattach();
                                dFSPacket.setTraceScope(null);
                            }
                            DataStreamer.access$802(DataStreamer.this, seqno);
                            DataStreamer.this.pipelineRecoveryCount = 0;
                            DataStreamer.this.ackQueue.removeFirst();
                            DataStreamer.this.packetSendTime.remove(Long.valueOf(seqno));
                            DataStreamer.this.dataQueue.notifyAll();
                            dFSPacket.releaseBuffer(DataStreamer.this.byteArrayManager);
                        }
                        if (traceScope != null) {
                            traceScope.close();
                        }
                    }
                } catch (Throwable th) {
                    if (0 != 0) {
                        autoCloseable.close();
                    }
                    throw th;
                }
            }
        }

        void close() {
            this.responderClosed = true;
            interrupt();
        }

        static {
            $assertionsDisabled = !DataStreamer.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hdfs/DataStreamer$StreamerStreams.class */
    public class StreamerStreams implements Closeable {
        private Socket sock;
        private DataOutputStream out;
        private DataInputStream in;

        StreamerStreams(DatanodeInfo datanodeInfo, long j, long j2, Token<BlockTokenIdentifier> token) throws IOException {
            this.sock = null;
            this.out = null;
            this.in = null;
            this.sock = DataStreamer.createSocketForPipeline(datanodeInfo, 2, DataStreamer.this.dfsClient);
            IOStreamPair socketSend = DataStreamer.this.dfsClient.saslClient.socketSend(this.sock, NetUtils.getOutputStream(this.sock, j), NetUtils.getInputStream(this.sock, j2), DataStreamer.this.dfsClient, token, datanodeInfo);
            OutputStream outputStream = socketSend.out;
            InputStream inputStream = socketSend.in;
            this.out = new DataOutputStream(new BufferedOutputStream(outputStream, DFSUtilClient.getSmallBufferSize(DataStreamer.this.dfsClient.getConfiguration())));
            this.in = new DataInputStream(inputStream);
        }

        void sendTransferBlock(DatanodeInfo[] datanodeInfoArr, StorageType[] storageTypeArr, String[] strArr, Token<BlockTokenIdentifier> token) throws IOException {
            new Sender(this.out).transferBlock(DataStreamer.this.block.getCurrentBlock(), token, DataStreamer.this.dfsClient.clientName, datanodeInfoArr, storageTypeArr, strArr);
            this.out.flush();
            DataTransferProtos.BlockOpResponseProto parseFrom = DataTransferProtos.BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(this.in));
            if (DataTransferProtos.Status.SUCCESS != parseFrom.getStatus()) {
                throw new IOException("Failed to add a datanode. Response status: " + parseFrom.getStatus());
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            IOUtils.closeStream(this.in);
            IOUtils.closeStream(this.out);
            IOUtils.closeSocket(this.sock);
        }
    }

    static Socket createSocketForPipeline(DatanodeInfo datanodeInfo, int i, DFSClient dFSClient) throws IOException {
        DfsClientConf conf = dFSClient.getConf();
        String xferAddr = datanodeInfo.getXferAddr(conf.isConnectToDnViaHostname());
        LOG.debug("Connecting to datanode {}", xferAddr);
        InetSocketAddress createSocketAddr = NetUtils.createSocketAddr(xferAddr);
        Socket createSocket = dFSClient.socketFactory.createSocket();
        int datanodeReadTimeout = dFSClient.getDatanodeReadTimeout(i);
        NetUtils.connect(createSocket, createSocketAddr, dFSClient.getRandomLocalInterfaceAddr(), conf.getSocketTimeout());
        createSocket.setTcpNoDelay(conf.getDataTransferTcpNoDelay());
        createSocket.setSoTimeout(datanodeReadTimeout);
        createSocket.setKeepAlive(true);
        if (conf.getSocketSendBufferSize() > 0) {
            createSocket.setSendBufferSize(conf.getSocketSendBufferSize());
        }
        LOG.debug("Send buf size {}", Integer.valueOf(createSocket.getSendBufferSize()));
        return createSocket;
    }

    public static boolean isLazyPersist(HdfsFileStatus hdfsFileStatus) {
        return hdfsFileStatus.getStoragePolicy() == 15;
    }

    private static void releaseBuffer(List<DFSPacket> list, ByteArrayManager byteArrayManager) {
        Iterator<DFSPacket> it = list.iterator();
        while (it.hasNext()) {
            it.next().releaseBuffer(byteArrayManager);
        }
        list.clear();
    }

    private DataStreamer(HdfsFileStatus hdfsFileStatus, ExtendedBlock extendedBlock, DFSClient dFSClient, String str, Progressable progressable, DataChecksum dataChecksum, AtomicReference<CachingStrategy> atomicReference, ByteArrayManager byteArrayManager, boolean z, String[] strArr, EnumSet<AddBlockFlag> enumSet) {
        this.streamerClosed = false;
        this.response = null;
        this.nodes = null;
        this.storageTypes = null;
        this.storageIDs = null;
        this.bytesSent = 0L;
        this.failed = new ArrayList();
        this.restartingNodes = new ArrayList();
        this.pipelineRecoveryCount = 0;
        this.isHflushed = false;
        this.currentSeqno = 0L;
        this.lastQueuedSeqno = -1L;
        this.lastAckedSeqno = -1L;
        this.bytesCurBlock = 0L;
        this.lastException = new LastExceptionInStreamer();
        this.appendChunk = false;
        this.dataQueue = new LinkedList<>();
        this.packetSendTime = new HashMap();
        this.ackQueue = new LinkedList<>();
        this.persistBlocks = new AtomicBoolean(false);
        this.failPacket = false;
        this.artificialSlowdown = 0L;
        this.congestedNodes = new ArrayList();
        this.block = new BlockToWrite(extendedBlock);
        this.dfsClient = dFSClient;
        this.src = str;
        this.progress = progressable;
        this.stat = hdfsFileStatus;
        this.checksum4WriteBlock = dataChecksum;
        this.cachingStrategy = atomicReference;
        this.byteArrayManager = byteArrayManager;
        this.isLazyPersistFile = isLazyPersist(hdfsFileStatus);
        this.isAppend = z;
        this.favoredNodes = strArr;
        DfsClientConf conf = dFSClient.getConf();
        this.dfsclientSlowLogThresholdMs = conf.getSlowIoWarningThresholdMs();
        this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry());
        this.errorState = new ErrorState(conf.getDatanodeRestartTimeout());
        this.addBlockFlags = enumSet;
    }

    public DataStreamer(HdfsFileStatus hdfsFileStatus, ExtendedBlock extendedBlock, DFSClient dFSClient, String str, Progressable progressable, DataChecksum dataChecksum, AtomicReference<CachingStrategy> atomicReference, ByteArrayManager byteArrayManager, String[] strArr, EnumSet<AddBlockFlag> enumSet) {
        this(hdfsFileStatus, extendedBlock, dFSClient, str, progressable, dataChecksum, atomicReference, byteArrayManager, false, strArr, enumSet);
        this.stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
    }

    public DataStreamer(LocatedBlock locatedBlock, HdfsFileStatus hdfsFileStatus, DFSClient dFSClient, String str, Progressable progressable, DataChecksum dataChecksum, AtomicReference<CachingStrategy> atomicReference, ByteArrayManager byteArrayManager) {
        this(hdfsFileStatus, locatedBlock.getBlock(), dFSClient, str, progressable, dataChecksum, atomicReference, byteArrayManager, true, null, null);
        this.stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
        this.bytesSent = this.block.getNumBytes();
        this.accessToken = locatedBlock.getBlockToken();
    }

    public void setPipelineInConstruction(LocatedBlock locatedBlock) throws IOException {
        setPipeline(locatedBlock);
        if (this.nodes.length < 1) {
            throw new IOException("Unable to retrieve blocks locations  for last block " + this.block + " of file " + this.src);
        }
    }

    public void setAccessToken(Token<BlockTokenIdentifier> token) {
        this.accessToken = token;
    }

    private void setPipeline(LocatedBlock locatedBlock) {
        setPipeline(locatedBlock.getLocations(), locatedBlock.getStorageTypes(), locatedBlock.getStorageIDs());
    }

    private void setPipeline(DatanodeInfo[] datanodeInfoArr, StorageType[] storageTypeArr, String[] strArr) {
        this.nodes = datanodeInfoArr;
        this.storageTypes = storageTypeArr;
        this.storageIDs = strArr;
    }

    private void initDataStreaming() {
        setName("DataStreamer for file " + this.src + " block " + this.block);
        if (LOG.isDebugEnabled()) {
            LOG.debug("nodes {} storageTypes {} storageIDs {}", new Object[]{Arrays.toString(this.nodes), Arrays.toString(this.storageTypes), Arrays.toString(this.storageIDs)});
        }
        this.response = new ResponseProcessor(this.nodes);
        this.response.start();
        this.stage = BlockConstructionStage.DATA_STREAMING;
    }

    public void endBlock() {
        LOG.debug("Closing old block {}", this.block);
        setName("DataStreamer for file " + this.src);
        closeResponder();
        closeStream();
        setPipeline(null, null, null);
        this.stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
    }

    private boolean shouldStop() {
        return this.streamerClosed || this.errorState.hasError() || !this.dfsClient.clientRunning;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        boolean z;
        boolean z2;
        AssertionError assertionError;
        DFSPacket first;
        long monotonicNow = Time.monotonicNow();
        TraceScope traceScope = null;
        while (!this.streamerClosed && this.dfsClient.clientRunning) {
            if (this.errorState.hasError()) {
                closeResponder();
            }
            try {
                try {
                    boolean processDatanodeOrExternalError = processDatanodeOrExternalError();
                    int socketTimeout = this.dfsClient.getConf().getSocketTimeout() / 2;
                    synchronized (this.dataQueue) {
                        long monotonicNow2 = Time.monotonicNow();
                        while (true) {
                            if ((shouldStop() || this.dataQueue.size() != 0 || (this.stage == BlockConstructionStage.DATA_STREAMING && monotonicNow2 - monotonicNow >= socketTimeout)) && !processDatanodeOrExternalError) {
                                break;
                            }
                            long j = socketTimeout - (monotonicNow2 - monotonicNow);
                            try {
                                this.dataQueue.wait(this.stage == BlockConstructionStage.DATA_STREAMING ? j <= 0 ? 1000L : j : 1000L);
                            } catch (InterruptedException e) {
                                LOG.warn("Caught exception", e);
                            }
                            processDatanodeOrExternalError = false;
                            monotonicNow2 = Time.monotonicNow();
                        }
                        if (!shouldStop()) {
                            if (this.dataQueue.isEmpty()) {
                                first = createHeartbeatPacket();
                            } else {
                                try {
                                    backOffIfNecessary();
                                } catch (InterruptedException e2) {
                                    LOG.warn("Caught exception", e2);
                                }
                                first = this.dataQueue.getFirst();
                                SpanId[] traceParents = first.getTraceParents();
                                if (traceParents.length > 0) {
                                    traceScope = this.dfsClient.getTracer().newScope("dataStreamer", traceParents[0]);
                                    traceScope.getSpan().setParents(traceParents);
                                }
                            }
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("stage=" + this.stage + ", " + this);
                            }
                            if (this.stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
                                LOG.debug("Allocating new block: {}", this);
                                setPipeline(nextBlockOutputStream());
                                initDataStreaming();
                            } else if (this.stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
                                LOG.debug("Append to block {}", this.block);
                                setupPipelineForAppendOrRecovery();
                                if (!this.streamerClosed) {
                                    initDataStreaming();
                                } else if (traceScope != null) {
                                    traceScope.close();
                                    traceScope = null;
                                }
                            }
                            if (first.getLastByteOffsetBlock() > this.stat.getBlockSize()) {
                                throw new IOException("BlockSize " + this.stat.getBlockSize() + " < lastByteOffsetInBlock, " + this + ", " + first);
                            }
                            if (first.isLastPacketInBlock()) {
                                synchronized (this.dataQueue) {
                                    while (!shouldStop() && this.ackQueue.size() != 0) {
                                        try {
                                            this.dataQueue.wait(1000L);
                                        } catch (InterruptedException e3) {
                                            LOG.warn("Caught exception", e3);
                                        }
                                    }
                                }
                                if (!shouldStop()) {
                                    this.stage = BlockConstructionStage.PIPELINE_CLOSE;
                                } else if (traceScope != null) {
                                    traceScope.close();
                                    traceScope = null;
                                }
                            }
                            SpanId spanId = SpanId.INVALID;
                            synchronized (this.dataQueue) {
                                if (!first.isHeartbeatPacket()) {
                                    if (traceScope != null) {
                                        spanId = traceScope.getSpanId();
                                        traceScope.detach();
                                        first.setTraceScope(traceScope);
                                    }
                                    traceScope = null;
                                    this.dataQueue.removeFirst();
                                    this.ackQueue.addLast(first);
                                    this.packetSendTime.put(Long.valueOf(first.getSeqno()), Long.valueOf(Time.monotonicNow()));
                                    this.dataQueue.notifyAll();
                                }
                            }
                            LOG.debug("{} sending {}", this, first);
                            try {
                                TraceScope newScope = this.dfsClient.getTracer().newScope("DataStreamer#writeTo", spanId);
                                Throwable th = null;
                                try {
                                    try {
                                        first.writeTo(this.blockStream);
                                        this.blockStream.flush();
                                        if (newScope != null) {
                                            if (0 != 0) {
                                                try {
                                                    newScope.close();
                                                } catch (Throwable th2) {
                                                    th.addSuppressed(th2);
                                                }
                                            } else {
                                                newScope.close();
                                            }
                                        }
                                        monotonicNow = Time.monotonicNow();
                                        long lastByteOffsetBlock = first.getLastByteOffsetBlock();
                                        if (this.bytesSent < lastByteOffsetBlock) {
                                            this.bytesSent = lastByteOffsetBlock;
                                        }
                                        if (!shouldStop()) {
                                            if (first.isLastPacketInBlock()) {
                                                synchronized (this.dataQueue) {
                                                    while (!shouldStop() && this.ackQueue.size() != 0) {
                                                        this.dataQueue.wait(1000L);
                                                    }
                                                }
                                                if (!shouldStop()) {
                                                    endBlock();
                                                } else if (traceScope != null) {
                                                    traceScope.close();
                                                    traceScope = null;
                                                }
                                            }
                                            if (this.progress != null) {
                                                this.progress.progress();
                                            }
                                            if (this.artificialSlowdown != 0 && this.dfsClient.clientRunning) {
                                                Thread.sleep(this.artificialSlowdown);
                                            }
                                            if (traceScope != null) {
                                                traceScope.close();
                                                traceScope = null;
                                            }
                                        } else if (traceScope != null) {
                                            traceScope.close();
                                            traceScope = null;
                                        }
                                    } finally {
                                    }
                                } catch (Throwable th3) {
                                    if (newScope != null) {
                                        if (th != null) {
                                            try {
                                                newScope.close();
                                            } catch (Throwable th4) {
                                                th.addSuppressed(th4);
                                            }
                                        } else {
                                            newScope.close();
                                        }
                                    }
                                    throw th3;
                                }
                            } catch (IOException e4) {
                                this.errorState.markFirstNodeIfNotMarked();
                                throw e4;
                            }
                        } else if (traceScope != null) {
                            traceScope.close();
                            traceScope = null;
                        }
                    }
                } finally {
                    if (!z) {
                        if (z2) {
                        }
                    }
                }
            } catch (Throwable th5) {
                if (traceScope != null) {
                    traceScope.close();
                }
                throw th5;
            }
        }
        closeInternal();
    }

    private void closeInternal() {
        closeResponder();
        closeStream();
        this.streamerClosed = true;
        release();
        synchronized (this.dataQueue) {
            this.dataQueue.notifyAll();
        }
    }

    public void release() {
        synchronized (this.dataQueue) {
            releaseBuffer(this.dataQueue, this.byteArrayManager);
            releaseBuffer(this.ackQueue, this.byteArrayManager);
        }
    }

    public void waitForAckedSeqno(long j) throws IOException {
        TraceScope newScope = this.dfsClient.getTracer().newScope("waitForAckedSeqno");
        Throwable th = null;
        try {
            LOG.debug("{} waiting for ack for: {}", this, Long.valueOf(j));
            long monotonicNow = Time.monotonicNow();
            try {
                synchronized (this.dataQueue) {
                    while (!this.streamerClosed) {
                        checkClosed();
                        if (this.lastAckedSeqno >= j) {
                            break;
                        }
                        try {
                            this.dataQueue.wait(1000L);
                        } catch (InterruptedException e) {
                            throw new InterruptedIOException("Interrupted while waiting for data to be acknowledged by pipeline");
                        }
                    }
                }
                checkClosed();
            } catch (ClosedChannelException e2) {
            }
            long monotonicNow2 = Time.monotonicNow() - monotonicNow;
            if (monotonicNow2 > this.dfsclientSlowLogThresholdMs) {
                LOG.warn("Slow waitForAckedSeqno took {}ms (threshold={}ms). File being written: {}, block: {}, Write pipeline datanodes: {}.", new Object[]{Long.valueOf(monotonicNow2), Long.valueOf(this.dfsclientSlowLogThresholdMs), this.src, this.block, this.nodes});
            }
            if (newScope != null) {
                if (0 == 0) {
                    newScope.close();
                    return;
                }
                try {
                    newScope.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (newScope != null) {
                if (0 != 0) {
                    try {
                        newScope.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newScope.close();
                }
            }
            throw th3;
        }
    }

    public void waitAndQueuePacket(DFSPacket dFSPacket) throws IOException {
        synchronized (this.dataQueue) {
            boolean z = true;
            while (!this.streamerClosed && this.dataQueue.size() + this.ackQueue.size() > this.dfsClient.getConf().getWriteMaxPackets()) {
                try {
                    try {
                        if (z) {
                            Span currentSpan = Tracer.getCurrentSpan();
                            if (currentSpan != null) {
                                currentSpan.addTimelineAnnotation("dataQueue.wait");
                            }
                            z = false;
                        }
                        try {
                            this.dataQueue.wait();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    } catch (Throwable th) {
                        Span currentSpan2 = Tracer.getCurrentSpan();
                        if (currentSpan2 != null && !z) {
                            currentSpan2.addTimelineAnnotation("end.wait");
                        }
                        throw th;
                    }
                } catch (ClosedChannelException e2) {
                }
            }
            Span currentSpan3 = Tracer.getCurrentSpan();
            if (currentSpan3 != null && !z) {
                currentSpan3.addTimelineAnnotation("end.wait");
            }
            checkClosed();
            queuePacket(dFSPacket);
        }
    }

    public void close(boolean z) {
        this.streamerClosed = true;
        synchronized (this.dataQueue) {
            this.dataQueue.notifyAll();
        }
        if (z) {
            interrupt();
        }
    }

    public void setStreamerAsClosed() {
        this.streamerClosed = true;
    }

    private void checkClosed() throws IOException {
        if (this.streamerClosed) {
            this.lastException.throwException4Close();
        }
    }

    private void closeResponder() {
        try {
        } catch (InterruptedException e) {
            LOG.warn("Caught exception", e);
        } finally {
            this.response = null;
        }
        if (this.response != null) {
            this.response.close();
            this.response.join();
        }
    }

    public void closeStream() {
        MultipleIOException.Builder builder = new MultipleIOException.Builder();
        try {
        } catch (IOException e) {
            builder.add(e);
        } finally {
            this.blockStream = null;
        }
        if (this.blockStream != null) {
            this.blockStream.close();
        }
        if (this.blockReplyStream != null) {
            try {
                try {
                    this.blockReplyStream.close();
                    this.blockReplyStream = null;
                } catch (IOException e2) {
                    builder.add(e2);
                    this.blockReplyStream = null;
                }
            } catch (Throwable th) {
                this.blockReplyStream = null;
                throw th;
            }
        }
        if (null != this.s) {
            try {
                try {
                    this.s.close();
                    this.s = null;
                } catch (IOException e3) {
                    builder.add(e3);
                    this.s = null;
                }
            } catch (Throwable th2) {
                this.s = null;
                throw th2;
            }
        }
        IOException build = builder.build();
        if (build != null) {
            this.lastException.set(build);
        }
    }

    boolean shouldWaitForRestart(int i) {
        if (this.nodes.length == 1) {
            return true;
        }
        if (DFSClientFaultInjector.get().skipRollingRestartWait()) {
            return false;
        }
        InetAddress inetAddress = null;
        try {
            inetAddress = InetAddress.getByName(this.nodes[i].getIpAddr());
        } catch (UnknownHostException e) {
            if (!$assertionsDisabled) {
                throw new AssertionError();
            }
        }
        return inetAddress != null && NetUtils.isLocalAddress(inetAddress);
    }

    private boolean shouldHandleExternalError() {
        return this.errorState.hasExternalError() && this.blockStream != null;
    }

    private boolean processDatanodeOrExternalError() throws IOException {
        if (!this.errorState.hasDatanodeError() && !shouldHandleExternalError()) {
            return false;
        }
        LOG.debug("start process datanode/external error, {}", this);
        if (this.response != null) {
            LOG.info("Error Recovery for " + this.block + " waiting for responder to exit. ");
            return true;
        }
        closeStream();
        synchronized (this.dataQueue) {
            this.dataQueue.addAll(0, this.ackQueue);
            this.ackQueue.clear();
            this.packetSendTime.clear();
        }
        if (!this.errorState.isRestartingNode()) {
            int i = this.pipelineRecoveryCount + 1;
            this.pipelineRecoveryCount = i;
            if (i > 5) {
                LOG.warn("Error recovering pipeline for writing " + this.block + ". Already retried 5 times for the same packet.");
                this.lastException.set(new IOException("Failing write. Tried pipeline recovery 5 times without success."));
                this.streamerClosed = true;
                return false;
            }
        }
        setupPipelineForAppendOrRecovery();
        if (this.streamerClosed || !this.dfsClient.clientRunning) {
            return false;
        }
        if (this.stage != BlockConstructionStage.PIPELINE_CLOSE) {
            initDataStreaming();
            return false;
        }
        synchronized (this.dataQueue) {
            DFSPacket remove = this.dataQueue.remove();
            TraceScope traceScope = remove.getTraceScope();
            if (traceScope != null) {
                traceScope.reattach();
                traceScope.close();
                remove.setTraceScope(null);
            }
            if (!$assertionsDisabled && !remove.isLastPacketInBlock()) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && this.lastAckedSeqno != remove.getSeqno() - 1) {
                throw new AssertionError();
            }
            this.lastAckedSeqno = remove.getSeqno();
            this.pipelineRecoveryCount = 0;
            this.dataQueue.notifyAll();
        }
        endBlock();
        return false;
    }

    public void setHflush() {
        this.isHflushed = true;
    }

    private int[] findNewDatanode(DatanodeInfo[] datanodeInfoArr) throws IOException {
        if (this.nodes.length <= datanodeInfoArr.length) {
            throw new IOException("Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=" + Arrays.asList(this.nodes) + ", original=" + Arrays.asList(datanodeInfoArr) + "). The current failed datanode replacement policy is " + this.dfsClient.dtpReplaceDatanodeOnFailure + ", and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.");
        }
        int[] iArr = new int[this.nodes.length - datanodeInfoArr.length];
        int i = 0;
        for (int i2 = 0; i2 < this.nodes.length; i2++) {
            int i3 = 0;
            while (i3 < datanodeInfoArr.length && !this.nodes[i2].equals(datanodeInfoArr[i3])) {
                i3++;
            }
            if (i3 == datanodeInfoArr.length) {
                int i4 = i;
                i++;
                iArr[i4] = i2;
            }
        }
        if (i > 0) {
            return iArr;
        }
        throw new IOException("Failed: new datanode not found: nodes=" + Arrays.asList(this.nodes) + ", original=" + Arrays.asList(datanodeInfoArr));
    }

    private void addDatanode2ExistingPipeline() throws IOException {
        DataTransferProtocol.LOG.debug("lastAckedSeqno = {}", Long.valueOf(this.lastAckedSeqno));
        if ((!this.isAppend && this.lastAckedSeqno < 0 && this.stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) || this.stage == BlockConstructionStage.PIPELINE_CLOSE || this.stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
            return;
        }
        DatanodeInfo[] datanodeInfoArr = this.nodes;
        StorageType[] storageTypeArr = this.storageTypes;
        String[] strArr = this.storageIDs;
        IOException iOException = null;
        ArrayList arrayList = new ArrayList(this.failed);
        for (int i = 0; i < 3; i++) {
            try {
                LocatedBlock additionalDatanode = this.dfsClient.namenode.getAdditionalDatanode(this.src, this.stat.getFileId(), this.block.getCurrentBlock(), this.nodes, this.storageIDs, (DatanodeInfo[]) arrayList.toArray(new DatanodeInfo[arrayList.size()]), 1, this.dfsClient.clientName);
                setPipeline(additionalDatanode);
                try {
                    int[] findNewDatanode = findNewDatanode(datanodeInfoArr);
                    boolean z = false;
                    ArrayList<Integer> arrayList2 = new ArrayList();
                    for (int i2 = 0; i2 < findNewDatanode.length; i2++) {
                        DatanodeInfo datanodeInfo = datanodeInfoArr[i % datanodeInfoArr.length];
                        try {
                            transfer(datanodeInfo, new DatanodeInfo[]{this.nodes[findNewDatanode[i2]]}, new StorageType[]{this.storageTypes[findNewDatanode[i2]]}, new String[]{this.storageIDs[findNewDatanode[i2]]}, additionalDatanode.getBlockToken());
                            arrayList2.add(Integer.valueOf(i2));
                        } catch (IOException e) {
                            DFSClient.LOG.warn("Error transferring data from " + datanodeInfo + " to " + this.nodes[findNewDatanode[i2]] + ": " + e.getMessage());
                            iOException = e;
                            arrayList.add(this.nodes[findNewDatanode[i2]]);
                            z = true;
                        }
                    }
                    if (!z) {
                        return;
                    }
                    if (arrayList2.size() > 0) {
                        ArrayList arrayList3 = new ArrayList(Arrays.asList(datanodeInfoArr));
                        ArrayList arrayList4 = new ArrayList(Arrays.asList(storageTypeArr));
                        ArrayList arrayList5 = new ArrayList(Arrays.asList(strArr));
                        for (Integer num : arrayList2) {
                            arrayList3.add(this.nodes[findNewDatanode[num.intValue()]]);
                            arrayList4.add(this.storageTypes[findNewDatanode[num.intValue()]]);
                            arrayList5.add(this.storageIDs[findNewDatanode[num.intValue()]]);
                        }
                        datanodeInfoArr = (DatanodeInfo[]) arrayList3.toArray(new DatanodeInfo[arrayList3.size()]);
                        storageTypeArr = (StorageType[]) arrayList4.toArray(new StorageType[arrayList4.size()]);
                        strArr = (String[]) arrayList5.toArray(new String[arrayList5.size()]);
                    }
                    setPipeline(datanodeInfoArr, storageTypeArr, strArr);
                } catch (IOException e2) {
                    if (this.dfsClient.dtpReplaceDatanodeOnFailureReplication <= 0 || this.nodes.length < this.dfsClient.dtpReplaceDatanodeOnFailureReplication) {
                        throw e2;
                    }
                    DFSClient.LOG.warn("Failed to find a new datanode to add to the write pipeline,  continue to write to the pipeline with " + this.nodes.length + " nodes since it's no less than minimum replication: " + ((int) this.dfsClient.dtpReplaceDatanodeOnFailureReplication) + " configured by " + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION + ".", e2);
                    return;
                }
            } catch (RemoteException e3) {
                throw e3.unwrapRemoteException(BlockPlacementViolationException.class);
            }
        }
        if (iOException == null) {
            throw new IOException("Failed to add a node");
        }
    }

    private long computeTransferWriteTimeout() {
        return this.dfsClient.getDatanodeWriteTimeout(2);
    }

    private long computeTransferReadTimeout() {
        return this.dfsClient.getDatanodeReadTimeout(2 + (((int) (this.bytesSent / this.dfsClient.getConf().getWritePacketSize())) / 200));
    }

    private void transfer(DatanodeInfo datanodeInfo, DatanodeInfo[] datanodeInfoArr, StorageType[] storageTypeArr, String[] strArr, Token<BlockTokenIdentifier> token) throws IOException {
        RefetchEncryptionKeyPolicy refetchEncryptionKeyPolicy = new RefetchEncryptionKeyPolicy(datanodeInfo);
        do {
            StreamerStreams streamerStreams = null;
            try {
                streamerStreams = new StreamerStreams(datanodeInfo, computeTransferWriteTimeout(), computeTransferReadTimeout(), token);
                streamerStreams.sendTransferBlock(datanodeInfoArr, storageTypeArr, strArr, token);
                IOUtils.closeStream(streamerStreams);
                return;
            } catch (InvalidEncryptionKeyException e) {
                try {
                    refetchEncryptionKeyPolicy.recordFailure(e);
                    IOUtils.closeStream(streamerStreams);
                } catch (Throwable th) {
                    IOUtils.closeStream(streamerStreams);
                    throw th;
                }
            }
        } while (refetchEncryptionKeyPolicy.continueRetryingOrThrow());
    }

    private void setupPipelineForAppendOrRecovery() throws IOException {
        if (this.nodes != null && this.nodes.length != 0) {
            setupPipelineInternal(this.nodes, this.storageTypes, this.storageIDs);
            return;
        }
        String str = "Could not get block locations. Source file \"" + this.src + "\" - Aborting..." + this;
        LOG.warn(str);
        this.lastException.set(new IOException(str));
        this.streamerClosed = true;
    }

    protected void setupPipelineInternal(DatanodeInfo[] datanodeInfoArr, StorageType[] storageTypeArr, String[] strArr) throws IOException {
        boolean z = false;
        long j = 0;
        while (!z && !this.streamerClosed && this.dfsClient.clientRunning) {
            if (!handleRestartingDatanode()) {
                return;
            }
            boolean hasInternalError = this.errorState.hasInternalError();
            if (!handleBadDatanode()) {
                return;
            }
            handleDatanodeReplacement();
            LocatedBlock updateBlockForPipeline = updateBlockForPipeline();
            j = updateBlockForPipeline.getBlock().getGenerationStamp();
            this.accessToken = updateBlockForPipeline.getBlockToken();
            z = createBlockOutputStream(this.nodes, this.storageTypes, this.storageIDs, j, hasInternalError);
            failPacket4Testing();
            this.errorState.checkRestartingNodeDeadline(this.nodes);
        }
        if (z) {
            updatePipeline(j);
        }
    }

    public boolean handleRestartingDatanode() {
        if (!this.errorState.isRestartingNode()) {
            return true;
        }
        if (!this.errorState.doWaitForRestart()) {
            this.errorState.setBadNodeIndex(this.errorState.getRestartingNodeIndex());
            return true;
        }
        try {
            Thread.sleep(Math.min(this.errorState.datanodeRestartTimeout, 4000L));
            return true;
        } catch (InterruptedException e) {
            this.lastException.set(new IOException("Interrupted while waiting for restarting " + this.nodes[this.errorState.getRestartingNodeIndex()]));
            this.streamerClosed = true;
            return false;
        }
    }

    public boolean handleBadDatanode() {
        int badNodeIndex = this.errorState.getBadNodeIndex();
        if (badNodeIndex < 0) {
            return true;
        }
        if (this.nodes.length <= 1) {
            this.lastException.set(new IOException("All datanodes " + Arrays.toString(this.nodes) + " are bad. Aborting..."));
            this.streamerClosed = true;
            return false;
        }
        String str = "bad.";
        if (this.errorState.getRestartingNodeIndex() == badNodeIndex) {
            str = "restarting.";
            this.restartingNodes.add(this.nodes[badNodeIndex]);
        }
        LOG.warn("Error Recovery for " + this.block + " in pipeline " + Arrays.toString(this.nodes) + ": datanode " + badNodeIndex + DefaultExpressionEngineSymbols.DEFAULT_INDEX_START + this.nodes[badNodeIndex] + ") is " + str);
        this.failed.add(this.nodes[badNodeIndex]);
        DatanodeInfo[] datanodeInfoArr = new DatanodeInfo[this.nodes.length - 1];
        arraycopy(this.nodes, datanodeInfoArr, badNodeIndex);
        StorageType[] storageTypeArr = new StorageType[datanodeInfoArr.length];
        arraycopy(this.storageTypes, storageTypeArr, badNodeIndex);
        String[] strArr = new String[datanodeInfoArr.length];
        arraycopy(this.storageIDs, strArr, badNodeIndex);
        setPipeline(datanodeInfoArr, storageTypeArr, strArr);
        this.errorState.adjustState4RestartingNode();
        this.lastException.clear();
        return true;
    }

    private void handleDatanodeReplacement() throws IOException {
        if (this.dfsClient.dtpReplaceDatanodeOnFailure.satisfy(this.stat.getReplication(), this.nodes, this.isAppend, this.isHflushed)) {
            try {
                addDatanode2ExistingPipeline();
            } catch (BlockPlacementViolationException e) {
                LOG.error("Failed to replace datanode. Existing nodes fails to comply with minimum placement requirements. Cannot continue to write.", e);
                throw e;
            } catch (IOException e2) {
                if (!this.dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
                    throw e2;
                }
                LOG.warn("Failed to replace datanode. Continue with the remaining datanodes since dfs.client.block.write.replace-datanode-on-failure.best-effort is set to true.", e2);
            }
        }
    }

    public void failPacket4Testing() {
        if (this.failPacket) {
            this.failPacket = false;
            try {
                Thread.sleep(2000L);
            } catch (InterruptedException e) {
            }
        }
    }

    private LocatedBlock updateBlockForPipeline() throws IOException {
        return this.dfsClient.namenode.updateBlockForPipeline(this.block.getCurrentBlock(), this.dfsClient.clientName);
    }

    public void updateBlockGS(long j) {
        this.block.setGenerationStamp(j);
    }

    @VisibleForTesting
    public void updatePipeline(long j) throws IOException {
        ExtendedBlock currentBlock = this.block.getCurrentBlock();
        updateBlockGS(j);
        this.dfsClient.namenode.updatePipeline(this.dfsClient.clientName, currentBlock, this.block.getCurrentBlock(), this.nodes, this.storageIDs);
    }

    public DatanodeInfo[] getExcludedNodes() {
        return (DatanodeInfo[]) this.excludedNodes.getAllPresent(this.excludedNodes.asMap().keySet()).keySet().toArray(new DatanodeInfo[0]);
    }

    protected LocatedBlock nextBlockOutputStream() throws IOException {
        LocatedBlock locateFollowingBlock;
        boolean createBlockOutputStream;
        int numBlockWriteRetry = this.dfsClient.getConf().getNumBlockWriteRetry();
        ExtendedBlock currentBlock = this.block.getCurrentBlock();
        do {
            this.errorState.resetInternalError();
            this.lastException.clear();
            DatanodeInfo[] excludedNodes = getExcludedNodes();
            locateFollowingBlock = locateFollowingBlock(excludedNodes.length > 0 ? excludedNodes : null, currentBlock);
            this.block.setCurrentBlock(locateFollowingBlock.getBlock());
            this.block.setNumBytes(0L);
            this.bytesSent = 0L;
            this.accessToken = locateFollowingBlock.getBlockToken();
            DatanodeInfo[] locations = locateFollowingBlock.getLocations();
            createBlockOutputStream = createBlockOutputStream(locations, locateFollowingBlock.getStorageTypes(), locateFollowingBlock.getStorageIDs(), 0L, false);
            if (!createBlockOutputStream) {
                LOG.warn("Abandoning " + this.block);
                this.dfsClient.namenode.abandonBlock(this.block.getCurrentBlock(), this.stat.getFileId(), this.src, this.dfsClient.clientName);
                this.block.setCurrentBlock(null);
                DatanodeInfo datanodeInfo = locations[this.errorState.getBadNodeIndex()];
                LOG.warn("Excluding datanode " + datanodeInfo);
                this.excludedNodes.put(datanodeInfo, datanodeInfo);
            }
            if (createBlockOutputStream) {
                break;
            }
            numBlockWriteRetry--;
        } while (numBlockWriteRetry >= 0);
        if (createBlockOutputStream) {
            return locateFollowingBlock;
        }
        throw new IOException("Unable to create new block.");
    }

    /* JADX WARN: Removed duplicated region for block: B:70:0x0320  */
    /* JADX WARN: Removed duplicated region for block: B:81:0x0377 A[Catch: all -> 0x03e0, TryCatch #0 {all -> 0x03e0, blocks: (B:12:0x0073, B:14:0x0079, B:16:0x0081, B:17:0x008b, B:18:0x008c, B:20:0x0092, B:22:0x009a, B:23:0x00a4, B:24:0x00a5, B:26:0x013e, B:27:0x014c, B:29:0x01b0, B:32:0x01bc, B:34:0x01e7, B:36:0x01f1, B:37:0x01fe, B:38:0x01ff, B:40:0x0223, B:42:0x022b, B:43:0x0235, B:44:0x0236, B:50:0x0148, B:53:0x028b, B:55:0x0295, B:56:0x02b3, B:60:0x02c0, B:68:0x0318, B:71:0x0323, B:73:0x032a, B:77:0x0339, B:75:0x0345, B:79:0x0369, B:81:0x0377, B:82:0x03a8, B:86:0x034e, B:90:0x0359, B:91:0x0360, B:92:0x0361), top: B:52:0x028b, inners: #1 }] */
    /* JADX WARN: Removed duplicated region for block: B:84:0x03c0  */
    /* JADX WARN: Removed duplicated region for block: B:86:0x034e A[Catch: all -> 0x03e0, TryCatch #0 {all -> 0x03e0, blocks: (B:12:0x0073, B:14:0x0079, B:16:0x0081, B:17:0x008b, B:18:0x008c, B:20:0x0092, B:22:0x009a, B:23:0x00a4, B:24:0x00a5, B:26:0x013e, B:27:0x014c, B:29:0x01b0, B:32:0x01bc, B:34:0x01e7, B:36:0x01f1, B:37:0x01fe, B:38:0x01ff, B:40:0x0223, B:42:0x022b, B:43:0x0235, B:44:0x0236, B:50:0x0148, B:53:0x028b, B:55:0x0295, B:56:0x02b3, B:60:0x02c0, B:68:0x0318, B:71:0x0323, B:73:0x032a, B:77:0x0339, B:75:0x0345, B:79:0x0369, B:81:0x0377, B:82:0x03a8, B:86:0x034e, B:90:0x0359, B:91:0x0360, B:92:0x0361), top: B:52:0x028b, inners: #1 }] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public boolean createBlockOutputStream(org.apache.hadoop.hdfs.protocol.DatanodeInfo[] r25, org.apache.hadoop.fs.StorageType[] r26, java.lang.String[] r27, long r28, boolean r30) {
        /*
            Method dump skipped, instructions count: 1034
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(org.apache.hadoop.hdfs.protocol.DatanodeInfo[], org.apache.hadoop.fs.StorageType[], java.lang.String[], long, boolean):boolean");
    }

    private boolean[] getPinnings(DatanodeInfo[] datanodeInfoArr) {
        if (this.favoredNodes == null) {
            return null;
        }
        boolean[] zArr = new boolean[datanodeInfoArr.length];
        HashSet hashSet = new HashSet(Arrays.asList(this.favoredNodes));
        for (int i = 0; i < datanodeInfoArr.length; i++) {
            zArr[i] = hashSet.remove(datanodeInfoArr[i].getXferAddrWithHostname());
            LOG.debug("{} was chosen by name node (favored={}).", datanodeInfoArr[i].getXferAddrWithHostname(), Boolean.valueOf(zArr[i]));
        }
        if (!hashSet.isEmpty()) {
            LOG.warn("These favored nodes were specified but not chosen: " + hashSet + " Specified favored nodes: " + Arrays.toString(this.favoredNodes));
        }
        return zArr;
    }

    private LocatedBlock locateFollowingBlock(DatanodeInfo[] datanodeInfoArr, ExtendedBlock extendedBlock) throws IOException {
        return DFSOutputStream.addBlock(datanodeInfoArr, this.dfsClient, this.src, extendedBlock, this.stat.getFileId(), this.favoredNodes, this.addBlockFlags);
    }

    private void backOffIfNecessary() throws InterruptedException {
        int i = 0;
        synchronized (this.congestedNodes) {
            if (!this.congestedNodes.isEmpty()) {
                StringBuilder sb = new StringBuilder("DataNode");
                Iterator<DatanodeInfo> it = this.congestedNodes.iterator();
                while (it.hasNext()) {
                    sb.append(' ').append(it.next());
                }
                i = Math.min(CONGESTION_BACK_OFF_MAX_TIME_IN_MS, (int) (Math.min(this.lastCongestionBackoffTime * 3, 5000) + (Math.random() * Math.abs((this.lastCongestionBackoffTime * 3) - 5000))));
                this.lastCongestionBackoffTime = i;
                sb.append(" are congested. Backing off for ").append(i).append(" ms");
                LOG.info(sb.toString());
                this.congestedNodes.clear();
            }
        }
        if (i != 0) {
            Thread.sleep(i);
        }
    }

    public ExtendedBlock getBlock() {
        return this.block.getCurrentBlock();
    }

    public DatanodeInfo[] getNodes() {
        return this.nodes;
    }

    public String[] getStorageIDs() {
        return this.storageIDs;
    }

    public BlockConstructionStage getStage() {
        return this.stage;
    }

    public Token<BlockTokenIdentifier> getBlockToken() {
        return this.accessToken;
    }

    public ErrorState getErrorState() {
        return this.errorState;
    }

    public void queuePacket(DFSPacket dFSPacket) {
        synchronized (this.dataQueue) {
            if (dFSPacket == null) {
                return;
            }
            dFSPacket.addTraceParent(Tracer.getCurrentSpanId());
            this.dataQueue.addLast(dFSPacket);
            this.lastQueuedSeqno = dFSPacket.getSeqno();
            LOG.debug("Queued {}, {}", dFSPacket, this);
            this.dataQueue.notifyAll();
        }
    }

    private DFSPacket createHeartbeatPacket() {
        return new DFSPacket(new byte[PacketHeader.PKT_MAX_HEADER_LEN], 0, 0L, -1L, 0, false);
    }

    private static LoadingCache<DatanodeInfo, DatanodeInfo> initExcludedNodes(long j) {
        return CacheBuilder.newBuilder().expireAfterWrite(j, TimeUnit.MILLISECONDS).removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() { // from class: org.apache.hadoop.hdfs.DataStreamer.2
            AnonymousClass2() {
            }

            @Override // org.apache.flink.hadoop.shaded.com.google.common.cache.RemovalListener
            public void onRemoval(@Nonnull RemovalNotification<DatanodeInfo, DatanodeInfo> removalNotification) {
                DataStreamer.LOG.info("Removing node " + removalNotification.getKey() + " from the excluded nodes list");
            }
        }).build(new CacheLoader<DatanodeInfo, DatanodeInfo>() { // from class: org.apache.hadoop.hdfs.DataStreamer.1
            AnonymousClass1() {
            }

            @Override // org.apache.flink.hadoop.shaded.com.google.common.cache.CacheLoader
            public DatanodeInfo load(DatanodeInfo datanodeInfo) throws Exception {
                return datanodeInfo;
            }
        });
    }

    private static <T> void arraycopy(T[] tArr, T[] tArr2, int i) {
        System.arraycopy(tArr, 0, tArr2, 0, i);
        System.arraycopy(tArr, i + 1, tArr2, i, tArr2.length - i);
    }

    public AtomicBoolean getPersistBlocks() {
        return this.persistBlocks;
    }

    public void setAppendChunk(boolean z) {
        this.appendChunk = z;
    }

    public boolean getAppendChunk() {
        return this.appendChunk;
    }

    public LastExceptionInStreamer getLastException() {
        return this.lastException;
    }

    public void setSocketToNull() {
        this.s = null;
    }

    public long getAndIncCurrentSeqno() {
        long j = this.currentSeqno;
        this.currentSeqno++;
        return j;
    }

    public long getLastQueuedSeqno() {
        return this.lastQueuedSeqno;
    }

    public long getBytesCurBlock() {
        return this.bytesCurBlock;
    }

    public void setBytesCurBlock(long j) {
        this.bytesCurBlock = j;
    }

    public void incBytesCurBlock(long j) {
        this.bytesCurBlock += j;
    }

    public void setArtificialSlowdown(long j) {
        this.artificialSlowdown = j;
    }

    public boolean streamerClosed() {
        return this.streamerClosed;
    }

    @VisibleForTesting
    int getPipelineRecoveryCount() {
        return this.pipelineRecoveryCount;
    }

    public void closeSocket() throws IOException {
        if (this.s != null) {
            this.s.close();
        }
    }

    @Override // java.lang.Thread
    public String toString() {
        ExtendedBlock currentBlock = this.block.getCurrentBlock();
        return currentBlock == null ? "block==null" : "" + currentBlock.getLocalBlock();
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.hadoop.hdfs.DataStreamer.access$802(org.apache.hadoop.hdfs.DataStreamer, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$802(org.apache.hadoop.hdfs.DataStreamer r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.lastAckedSeqno = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.hadoop.hdfs.DataStreamer.access$802(org.apache.hadoop.hdfs.DataStreamer, long):long");
    }

    static {
        $assertionsDisabled = !DataStreamer.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(DataStreamer.class);
    }
}
