package org.apache.iotdb.db.mpp.execution.exchange;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.Validate;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus;
import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.class */
public class SinkHandle implements ISinkHandle {
    private static final Logger logger = LoggerFactory.getLogger(SinkHandle.class);
    public static final int MAX_ATTEMPT_TIMES = 3;
    private static final long DEFAULT_RETRY_INTERVAL_IN_MS = 1000;
    private final TEndPoint remoteEndpoint;
    private final TFragmentInstanceId remoteFragmentInstanceId;
    private final String remotePlanNodeId;
    private final TFragmentInstanceId localFragmentInstanceId;
    private final LocalMemoryManager localMemoryManager;
    private final ExecutorService executorService;
    private final TsBlockSerde serde;
    private final MPPDataExchangeManager.SinkHandleListener sinkHandleListener;
    private final String threadName;
    private final IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mppDataExchangeServiceClientManager;
    private volatile ListenableFuture<Void> blocked;
    private final LinkedHashMap<Integer, Pair<TsBlock, Long>> sequenceIdToTsBlock = new LinkedHashMap<>();
    private int nextSequenceId = 0;
    private boolean aborted = false;
    private boolean closed = false;
    private boolean noMoreTsBlocks = false;
    private long retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
    private long bufferRetainedSizeInBytes = TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
    private long currentTsBlockSize = TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/iotdb/db/mpp/execution/exchange/SinkHandle$SendNewDataBlockEventTask.class */
    public class SendNewDataBlockEventTask implements Runnable {
        private final int startSequenceId;
        private final List<Long> blockSizes;

        SendNewDataBlockEventTask(int i, List<Long> list) {
            Validate.isTrue(i >= 0, "Start sequence ID should be greater than or equal to zero, but was: " + i + ".", new Object[0]);
            this.startSequenceId = i;
            this.blockSizes = (List) Validate.notNull(list);
        }

        /* JADX WARN: Code restructure failed: missing block: B:22:0x0134, code lost:
        
            if (r0 == null) goto L61;
         */
        /* JADX WARN: Code restructure failed: missing block: B:24:0x0138, code lost:
        
            if (0 == 0) goto L46;
         */
        /* JADX WARN: Code restructure failed: missing block: B:25:0x014b, code lost:
        
            r0.close();
         */
        /* JADX WARN: Code restructure failed: missing block: B:26:?, code lost:
        
            return;
         */
        /* JADX WARN: Code restructure failed: missing block: B:29:0x013b, code lost:
        
            r0.close();
         */
        /* JADX WARN: Code restructure failed: missing block: B:31:?, code lost:
        
            return;
         */
        /* JADX WARN: Code restructure failed: missing block: B:32:0x0142, code lost:
        
            r11 = move-exception;
         */
        /* JADX WARN: Code restructure failed: missing block: B:33:0x0143, code lost:
        
            r0.addSuppressed(r11);
         */
        /* JADX WARN: Code restructure failed: missing block: B:34:?, code lost:
        
            return;
         */
        /* JADX WARN: Code restructure failed: missing block: B:35:0x017a, code lost:
        
            return;
         */
        @Override // java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 379
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.iotdb.db.mpp.execution.exchange.SinkHandle.SendNewDataBlockEventTask.run():void");
        }
    }

    public SinkHandle(TEndPoint tEndPoint, TFragmentInstanceId tFragmentInstanceId, String str, TFragmentInstanceId tFragmentInstanceId2, LocalMemoryManager localMemoryManager, ExecutorService executorService, TsBlockSerde tsBlockSerde, MPPDataExchangeManager.SinkHandleListener sinkHandleListener, IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> iClientManager) {
        this.remoteEndpoint = (TEndPoint) Validate.notNull(tEndPoint);
        this.remoteFragmentInstanceId = (TFragmentInstanceId) Validate.notNull(tFragmentInstanceId);
        this.remotePlanNodeId = (String) Validate.notNull(str);
        this.localFragmentInstanceId = (TFragmentInstanceId) Validate.notNull(tFragmentInstanceId2);
        this.localMemoryManager = (LocalMemoryManager) Validate.notNull(localMemoryManager);
        this.executorService = (ExecutorService) Validate.notNull(executorService);
        this.serde = (TsBlockSerde) Validate.notNull(tsBlockSerde);
        this.sinkHandleListener = (MPPDataExchangeManager.SinkHandleListener) Validate.notNull(sinkHandleListener);
        this.mppDataExchangeServiceClientManager = iClientManager;
        this.threadName = MPPDataExchangeManager.createFullIdFrom(tFragmentInstanceId2, "SinkHandle");
        this.blocked = (ListenableFuture) localMemoryManager.getQueryPool().reserve(tFragmentInstanceId2.getQueryId(), TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES).left;
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle
    public synchronized ListenableFuture<?> isFull() {
        checkState();
        return Futures.nonCancellationPropagating(this.blocked);
    }

    private void submitSendNewDataBlockEventTask(int i, List<Long> list) {
        this.executorService.submit(new SendNewDataBlockEventTask(i, list));
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle
    public synchronized void send(TsBlock tsBlock) {
        Validate.notNull(tsBlock, "tsBlocks is null", new Object[0]);
        checkState();
        if (!this.blocked.isDone()) {
            throw new IllegalStateException("Sink handle is blocked.");
        }
        if (this.noMoreTsBlocks) {
            return;
        }
        long retainedSizeInBytes = tsBlock.getRetainedSizeInBytes();
        int i = this.nextSequenceId;
        this.blocked = (ListenableFuture) this.localMemoryManager.getQueryPool().reserve(this.localFragmentInstanceId.getQueryId(), retainedSizeInBytes).left;
        this.bufferRetainedSizeInBytes += retainedSizeInBytes;
        this.sequenceIdToTsBlock.put(Integer.valueOf(this.nextSequenceId), new Pair<>(tsBlock, Long.valueOf(this.currentTsBlockSize)));
        this.nextSequenceId++;
        this.currentTsBlockSize = retainedSizeInBytes;
        submitSendNewDataBlockEventTask(i, ImmutableList.of(Long.valueOf(retainedSizeInBytes)));
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle
    public synchronized void send(int i, List<TsBlock> list) {
        throw new UnsupportedOperationException();
    }

    private void sendEndOfDataBlockEvent() throws Exception {
        int i;
        logger.debug("send end of data block event");
        int i2 = 0;
        TEndOfDataBlockEvent tEndOfDataBlockEvent = new TEndOfDataBlockEvent(this.remoteFragmentInstanceId, this.remotePlanNodeId, this.localFragmentInstanceId, this.nextSequenceId - 1);
        while (i2 < 3) {
            i2++;
            try {
                SyncDataNodeMPPDataExchangeServiceClient syncDataNodeMPPDataExchangeServiceClient = (SyncDataNodeMPPDataExchangeServiceClient) this.mppDataExchangeServiceClientManager.borrowClient(this.remoteEndpoint);
                Throwable th = null;
                try {
                    try {
                        syncDataNodeMPPDataExchangeServiceClient.onEndOfDataBlockEvent(tEndOfDataBlockEvent);
                        if (syncDataNodeMPPDataExchangeServiceClient != null) {
                            if (0 != 0) {
                                try {
                                    syncDataNodeMPPDataExchangeServiceClient.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                syncDataNodeMPPDataExchangeServiceClient.close();
                            }
                        }
                        return;
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                        break;
                    }
                } finally {
                }
            } finally {
                if (i2 == i) {
                }
            }
        }
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle
    public synchronized void setNoMoreTsBlocks() {
        logger.debug("start to set no-more-tsblocks");
        if (this.aborted || this.closed) {
            return;
        }
        try {
            sendEndOfDataBlockEvent();
            logger.debug("set noMoreTsBlocks to true");
            this.noMoreTsBlocks = true;
            if (isFinished()) {
                logger.debug("revoke onFinish() of sinkHandleListener");
                this.sinkHandleListener.onFinish(this);
            }
            logger.debug("revoke onEndOfBlocks() of sinkHandleListener");
            this.sinkHandleListener.onEndOfBlocks(this);
        } catch (Exception e) {
            throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
        }
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle
    public synchronized void abort() {
        logger.debug("SinkHandle is being aborted.");
        this.sequenceIdToTsBlock.clear();
        this.aborted = true;
        this.bufferRetainedSizeInBytes -= this.localMemoryManager.getQueryPool().tryCancel(this.blocked);
        if (this.bufferRetainedSizeInBytes > 0) {
            this.localMemoryManager.getQueryPool().free(this.localFragmentInstanceId.getQueryId(), this.bufferRetainedSizeInBytes);
            this.bufferRetainedSizeInBytes = 0L;
        }
        this.sinkHandleListener.onAborted(this);
        logger.info("SinkHandle is aborted");
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle
    public synchronized void close() {
        logger.debug("SinkHandle is being closed.");
        this.sequenceIdToTsBlock.clear();
        this.closed = true;
        this.bufferRetainedSizeInBytes -= this.localMemoryManager.getQueryPool().tryComplete(this.blocked);
        if (this.bufferRetainedSizeInBytes > 0) {
            this.localMemoryManager.getQueryPool().free(this.localFragmentInstanceId.getQueryId(), this.bufferRetainedSizeInBytes);
            this.bufferRetainedSizeInBytes = 0L;
        }
        this.sinkHandleListener.onFinish(this);
        logger.info("SinkHandle is closed");
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle
    public synchronized boolean isAborted() {
        return this.aborted;
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle
    public synchronized boolean isFinished() {
        return this.noMoreTsBlocks && this.sequenceIdToTsBlock.isEmpty();
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle
    public synchronized long getBufferRetainedSizeInBytes() {
        return this.bufferRetainedSizeInBytes;
    }

    public int getNumOfBufferedTsBlocks() {
        return this.sequenceIdToTsBlock.size();
    }

    ByteBuffer getSerializedTsBlock(int i, int i2) {
        throw new UnsupportedOperationException();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized ByteBuffer getSerializedTsBlock(int i) throws IOException {
        if (this.aborted || this.closed) {
            logger.warn("SinkHandle still receive getting TsBlock request after being aborted={} or closed={}", Boolean.valueOf(this.aborted), Boolean.valueOf(this.closed));
            throw new IllegalStateException("Sink handle is aborted or closed. ");
        }
        Pair<TsBlock, Long> pair = this.sequenceIdToTsBlock.get(Integer.valueOf(i));
        if (pair != null && pair.left != null) {
            return this.serde.serialize((TsBlock) pair.left);
        }
        logger.error("The TsBlock doesn't exist. Sequence ID is {}, remaining map is {}", Integer.valueOf(i), this.sequenceIdToTsBlock.entrySet());
        throw new IllegalStateException("The data block doesn't exist. Sequence ID: " + i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void acknowledgeTsBlock(int i, int i2) {
        long j = 0;
        synchronized (this) {
            if (this.aborted || this.closed) {
                return;
            }
            Iterator<Map.Entry<Integer, Pair<TsBlock, Long>>> it = this.sequenceIdToTsBlock.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Integer, Pair<TsBlock, Long>> next = it.next();
                if (next.getKey().intValue() >= i) {
                    if (next.getKey().intValue() >= i2) {
                        break;
                    }
                    j += ((Long) next.getValue().right).longValue();
                    this.bufferRetainedSizeInBytes -= ((Long) next.getValue().right).longValue();
                    it.remove();
                    logger.debug("ack TsBlock {}.", next.getKey());
                }
            }
            if (isFinished()) {
                this.sinkHandleListener.onFinish(this);
            }
            if (j > 0) {
                this.localMemoryManager.getQueryPool().free(this.localFragmentInstanceId.getQueryId(), j);
            }
        }
    }

    public TEndPoint getRemoteEndpoint() {
        return this.remoteEndpoint;
    }

    public TFragmentInstanceId getRemoteFragmentInstanceId() {
        return this.remoteFragmentInstanceId;
    }

    public String getRemotePlanNodeId() {
        return this.remotePlanNodeId;
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle
    public TFragmentInstanceId getLocalFragmentInstanceId() {
        return this.localFragmentInstanceId;
    }

    public String toString() {
        return String.format("Query[%s]-[%s-%s-SinkHandle]:", this.localFragmentInstanceId.queryId, Integer.valueOf(this.localFragmentInstanceId.fragmentId), this.localFragmentInstanceId.instanceId);
    }

    private void checkState() {
        if (this.aborted) {
            throw new IllegalStateException("Sink handle is aborted.");
        }
        if (this.closed) {
            throw new IllegalStateException("SinkHandle is closed.");
        }
    }

    public void setRetryIntervalInMs(long j) {
        this.retryIntervalInMs = j;
    }
}
