package org.apache.iotdb.db.mpp.execution.exchange;

import io.airlift.concurrent.SetThreadName;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import org.apache.commons.lang3.Validate;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockRequest;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockResponse;
import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.class */
public class MPPDataExchangeManager implements IMPPDataExchangeManager {
    private static final Logger logger = LoggerFactory.getLogger(MPPDataExchangeManager.class);
    private final LocalMemoryManager localMemoryManager;
    private final Supplier<TsBlockSerde> tsBlockSerdeFactory;
    private final ExecutorService executorService;
    private final IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mppDataExchangeServiceClientManager;
    private final Map<TFragmentInstanceId, Map<String, ISourceHandle>> sourceHandles = new ConcurrentHashMap();
    private final Map<TFragmentInstanceId, ISinkHandle> sinkHandles = new ConcurrentHashMap();
    private MPPDataExchangeServiceImpl mppDataExchangeService;

    /* loaded from: input_file:org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager$MPPDataExchangeServiceImpl.class */
    class MPPDataExchangeServiceImpl implements MPPDataExchangeService.Iface {
        MPPDataExchangeServiceImpl() {
        }

        public TGetDataBlockResponse getDataBlock(TGetDataBlockRequest tGetDataBlockRequest) throws TException {
            SetThreadName setThreadName = new SetThreadName(MPPDataExchangeManager.createFullIdFrom(tGetDataBlockRequest.sourceFragmentInstanceId, "SinkHandle"), new Object[0]);
            Throwable th = null;
            try {
                MPPDataExchangeManager.logger.debug("Get data block request received, for data blocks whose sequence ID in [{}, {}) from {}.", new Object[]{Integer.valueOf(tGetDataBlockRequest.getStartSequenceId()), Integer.valueOf(tGetDataBlockRequest.getEndSequenceId()), tGetDataBlockRequest.getSourceFragmentInstanceId()});
                if (!MPPDataExchangeManager.this.sinkHandles.containsKey(tGetDataBlockRequest.getSourceFragmentInstanceId())) {
                    throw new TException("Source fragment instance not found. Fragment instance ID: " + tGetDataBlockRequest.getSourceFragmentInstanceId() + ".");
                }
                TGetDataBlockResponse tGetDataBlockResponse = new TGetDataBlockResponse();
                SinkHandle sinkHandle = (SinkHandle) MPPDataExchangeManager.this.sinkHandles.get(tGetDataBlockRequest.getSourceFragmentInstanceId());
                for (int startSequenceId = tGetDataBlockRequest.getStartSequenceId(); startSequenceId < tGetDataBlockRequest.getEndSequenceId(); startSequenceId++) {
                    try {
                        tGetDataBlockResponse.addToTsBlocks(sinkHandle.getSerializedTsBlock(startSequenceId));
                    } catch (IOException e) {
                        throw new TException(e);
                    }
                }
                return tGetDataBlockResponse;
            } finally {
                if (setThreadName != null) {
                    if (0 != 0) {
                        try {
                            setThreadName.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        setThreadName.close();
                    }
                }
            }
        }

        /* JADX WARN: Failed to calculate best type for var: r10v0 ??
        java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
         */
        /* JADX WARN: Failed to calculate best type for var: r10v0 ??
        java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
        	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
         */
        /* JADX WARN: Failed to calculate best type for var: r9v0 ??
        java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
         */
        /* JADX WARN: Failed to calculate best type for var: r9v0 ??
        java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
        	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
         */
        /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
        	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
        	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
        	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
        	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
         */
        /* JADX WARN: Not initialized variable reg: 10, insn: 0x00cd: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r10 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:42:0x00cd */
        /* JADX WARN: Not initialized variable reg: 9, insn: 0x00c9: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r9 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:40:0x00c9 */
        /* JADX WARN: Type inference failed for: r10v0, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r9v0, types: [io.airlift.concurrent.SetThreadName] */
        public void onAcknowledgeDataBlockEvent(TAcknowledgeDataBlockEvent tAcknowledgeDataBlockEvent) throws TException {
            try {
                try {
                    SetThreadName setThreadName = new SetThreadName(MPPDataExchangeManager.createFullIdFrom(tAcknowledgeDataBlockEvent.sourceFragmentInstanceId, "SinkHandle"), new Object[0]);
                    Throwable th = null;
                    MPPDataExchangeManager.logger.debug("Acknowledge data block event received, for data blocks whose sequence ID in [{}, {}) from {}.", new Object[]{Integer.valueOf(tAcknowledgeDataBlockEvent.getStartSequenceId()), Integer.valueOf(tAcknowledgeDataBlockEvent.getEndSequenceId()), tAcknowledgeDataBlockEvent.getSourceFragmentInstanceId()});
                    if (MPPDataExchangeManager.this.sinkHandles.containsKey(tAcknowledgeDataBlockEvent.getSourceFragmentInstanceId())) {
                        ((SinkHandle) MPPDataExchangeManager.this.sinkHandles.get(tAcknowledgeDataBlockEvent.getSourceFragmentInstanceId())).acknowledgeTsBlock(tAcknowledgeDataBlockEvent.getStartSequenceId(), tAcknowledgeDataBlockEvent.getEndSequenceId());
                        if (setThreadName != null) {
                            if (0 != 0) {
                                try {
                                    setThreadName.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                setThreadName.close();
                            }
                        }
                        return;
                    }
                    MPPDataExchangeManager.logger.warn("received ACK event but target FragmentInstance[{}] is not found.", tAcknowledgeDataBlockEvent.getSourceFragmentInstanceId());
                    if (setThreadName != null) {
                        if (0 == 0) {
                            setThreadName.close();
                            return;
                        }
                        try {
                            setThreadName.close();
                            return;
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                            return;
                        }
                    }
                    return;
                } finally {
                }
            } catch (Throwable th4) {
                MPPDataExchangeManager.logger.error("ack TsBlock [{}, {}) failed.", new Object[]{Integer.valueOf(tAcknowledgeDataBlockEvent.getStartSequenceId()), Integer.valueOf(tAcknowledgeDataBlockEvent.getEndSequenceId()), th4});
                throw th4;
            }
            MPPDataExchangeManager.logger.error("ack TsBlock [{}, {}) failed.", new Object[]{Integer.valueOf(tAcknowledgeDataBlockEvent.getStartSequenceId()), Integer.valueOf(tAcknowledgeDataBlockEvent.getEndSequenceId()), th4});
            throw th4;
        }

        public void onNewDataBlockEvent(TNewDataBlockEvent tNewDataBlockEvent) throws TException {
            SetThreadName setThreadName = new SetThreadName(MPPDataExchangeManager.createFullIdFrom(tNewDataBlockEvent.targetFragmentInstanceId, tNewDataBlockEvent.targetPlanNodeId + ".SourceHandle"), new Object[0]);
            Throwable th = null;
            try {
                MPPDataExchangeManager.logger.debug("New data block event received, for plan node {} of {} from {}.", new Object[]{tNewDataBlockEvent.getTargetPlanNodeId(), tNewDataBlockEvent.getTargetFragmentInstanceId(), tNewDataBlockEvent.getSourceFragmentInstanceId()});
                if (!MPPDataExchangeManager.this.sourceHandles.containsKey(tNewDataBlockEvent.getTargetFragmentInstanceId()) || !((Map) MPPDataExchangeManager.this.sourceHandles.get(tNewDataBlockEvent.getTargetFragmentInstanceId())).containsKey(tNewDataBlockEvent.getTargetPlanNodeId()) || ((ISourceHandle) ((Map) MPPDataExchangeManager.this.sourceHandles.get(tNewDataBlockEvent.getTargetFragmentInstanceId())).get(tNewDataBlockEvent.getTargetPlanNodeId())).isAborted() || ((ISourceHandle) ((Map) MPPDataExchangeManager.this.sourceHandles.get(tNewDataBlockEvent.getTargetFragmentInstanceId())).get(tNewDataBlockEvent.getTargetPlanNodeId())).isFinished()) {
                    MPPDataExchangeManager.logger.warn("received NewDataBlockEvent but the downstream FragmentInstance[{}] is not found", tNewDataBlockEvent.getTargetFragmentInstanceId());
                    if (setThreadName != null) {
                        if (0 == 0) {
                            setThreadName.close();
                            return;
                        }
                        try {
                            setThreadName.close();
                            return;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                            return;
                        }
                    }
                    return;
                }
                ((SourceHandle) ((Map) MPPDataExchangeManager.this.sourceHandles.get(tNewDataBlockEvent.getTargetFragmentInstanceId())).get(tNewDataBlockEvent.getTargetPlanNodeId())).updatePendingDataBlockInfo(tNewDataBlockEvent.getStartSequenceId(), tNewDataBlockEvent.getBlockSizes());
                if (setThreadName != null) {
                    if (0 == 0) {
                        setThreadName.close();
                        return;
                    }
                    try {
                        setThreadName.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                if (setThreadName != null) {
                    if (0 != 0) {
                        try {
                            setThreadName.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        setThreadName.close();
                    }
                }
                throw th4;
            }
        }

        public void onEndOfDataBlockEvent(TEndOfDataBlockEvent tEndOfDataBlockEvent) throws TException {
            SetThreadName setThreadName = new SetThreadName(MPPDataExchangeManager.createFullIdFrom(tEndOfDataBlockEvent.targetFragmentInstanceId, tEndOfDataBlockEvent.targetPlanNodeId + ".SourceHandle"), new Object[0]);
            Throwable th = null;
            try {
                MPPDataExchangeManager.logger.debug("End of data block event received, for plan node {} of {} from {}.", new Object[]{tEndOfDataBlockEvent.getTargetPlanNodeId(), tEndOfDataBlockEvent.getTargetFragmentInstanceId(), tEndOfDataBlockEvent.getSourceFragmentInstanceId()});
                if (!MPPDataExchangeManager.this.sourceHandles.containsKey(tEndOfDataBlockEvent.getTargetFragmentInstanceId()) || !((Map) MPPDataExchangeManager.this.sourceHandles.get(tEndOfDataBlockEvent.getTargetFragmentInstanceId())).containsKey(tEndOfDataBlockEvent.getTargetPlanNodeId()) || ((ISourceHandle) ((Map) MPPDataExchangeManager.this.sourceHandles.get(tEndOfDataBlockEvent.getTargetFragmentInstanceId())).get(tEndOfDataBlockEvent.getTargetPlanNodeId())).isAborted() || ((ISourceHandle) ((Map) MPPDataExchangeManager.this.sourceHandles.get(tEndOfDataBlockEvent.getTargetFragmentInstanceId())).get(tEndOfDataBlockEvent.getTargetPlanNodeId())).isFinished()) {
                    MPPDataExchangeManager.logger.warn("received onEndOfDataBlockEvent but the downstream FragmentInstance[{}] is not found", tEndOfDataBlockEvent.getTargetFragmentInstanceId());
                    if (setThreadName != null) {
                        if (0 == 0) {
                            setThreadName.close();
                            return;
                        }
                        try {
                            setThreadName.close();
                            return;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                            return;
                        }
                    }
                    return;
                }
                ((SourceHandle) ((Map) MPPDataExchangeManager.this.sourceHandles.getOrDefault(tEndOfDataBlockEvent.getTargetFragmentInstanceId(), Collections.emptyMap())).get(tEndOfDataBlockEvent.getTargetPlanNodeId())).setNoMoreTsBlocks(tEndOfDataBlockEvent.getLastSequenceId());
                if (setThreadName != null) {
                    if (0 == 0) {
                        setThreadName.close();
                        return;
                    }
                    try {
                        setThreadName.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                if (setThreadName != null) {
                    if (0 != 0) {
                        try {
                            setThreadName.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        setThreadName.close();
                    }
                }
                throw th4;
            }
        }
    }

    /* loaded from: input_file:org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager$SinkHandleListener.class */
    public interface SinkHandleListener {
        void onFinish(ISinkHandle iSinkHandle);

        void onEndOfBlocks(ISinkHandle iSinkHandle);

        void onAborted(ISinkHandle iSinkHandle);

        void onFailure(ISinkHandle iSinkHandle, Throwable th);
    }

    /* loaded from: input_file:org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager$SinkHandleListenerImpl.class */
    class SinkHandleListenerImpl implements SinkHandleListener {
        private final FragmentInstanceContext context;
        private final IMPPDataExchangeManagerCallback<Throwable> onFailureCallback;

        public SinkHandleListenerImpl(FragmentInstanceContext fragmentInstanceContext, IMPPDataExchangeManagerCallback<Throwable> iMPPDataExchangeManagerCallback) {
            this.context = fragmentInstanceContext;
            this.onFailureCallback = iMPPDataExchangeManagerCallback;
        }

        @Override // org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener
        public void onFinish(ISinkHandle iSinkHandle) {
            MPPDataExchangeManager.logger.debug("onFinish is invoked");
            removeFromMPPDataExchangeManager(iSinkHandle);
            this.context.finished();
        }

        @Override // org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener
        public void onEndOfBlocks(ISinkHandle iSinkHandle) {
            this.context.transitionToFlushing();
        }

        @Override // org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener
        public void onAborted(ISinkHandle iSinkHandle) {
            MPPDataExchangeManager.logger.debug("onAborted is invoked");
            removeFromMPPDataExchangeManager(iSinkHandle);
        }

        private void removeFromMPPDataExchangeManager(ISinkHandle iSinkHandle) {
            MPPDataExchangeManager.logger.debug("release resources of finished sink handle");
            if (!MPPDataExchangeManager.this.sinkHandles.containsKey(iSinkHandle.getLocalFragmentInstanceId())) {
                MPPDataExchangeManager.logger.info("resources already been released");
            }
            MPPDataExchangeManager.this.sinkHandles.remove(iSinkHandle.getLocalFragmentInstanceId());
        }

        @Override // org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener
        public void onFailure(ISinkHandle iSinkHandle, Throwable th) {
            MPPDataExchangeManager.logger.error("Sink handle failed due to", th);
            if (this.onFailureCallback != null) {
                this.onFailureCallback.call(th);
            }
        }
    }

    /* loaded from: input_file:org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager$SourceHandleListener.class */
    public interface SourceHandleListener {
        void onFinished(ISourceHandle iSourceHandle);

        void onAborted(ISourceHandle iSourceHandle);

        void onFailure(ISourceHandle iSourceHandle, Throwable th);
    }

    /* loaded from: input_file:org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager$SourceHandleListenerImpl.class */
    class SourceHandleListenerImpl implements SourceHandleListener {
        private final IMPPDataExchangeManagerCallback<Throwable> onFailureCallback;

        public SourceHandleListenerImpl(IMPPDataExchangeManagerCallback<Throwable> iMPPDataExchangeManagerCallback) {
            this.onFailureCallback = iMPPDataExchangeManagerCallback;
        }

        @Override // org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener
        public void onFinished(ISourceHandle iSourceHandle) {
            MPPDataExchangeManager.logger.debug("finished and release resources");
            if (MPPDataExchangeManager.this.sourceHandles.containsKey(iSourceHandle.getLocalFragmentInstanceId()) && ((Map) MPPDataExchangeManager.this.sourceHandles.get(iSourceHandle.getLocalFragmentInstanceId())).containsKey(iSourceHandle.getLocalPlanNodeId())) {
                ((Map) MPPDataExchangeManager.this.sourceHandles.get(iSourceHandle.getLocalFragmentInstanceId())).remove(iSourceHandle.getLocalPlanNodeId());
            } else {
                MPPDataExchangeManager.logger.debug("resources has already been released");
            }
            if (MPPDataExchangeManager.this.sourceHandles.containsKey(iSourceHandle.getLocalFragmentInstanceId()) && ((Map) MPPDataExchangeManager.this.sourceHandles.get(iSourceHandle.getLocalFragmentInstanceId())).isEmpty()) {
                MPPDataExchangeManager.this.sourceHandles.remove(iSourceHandle.getLocalFragmentInstanceId());
            }
        }

        @Override // org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener
        public void onAborted(ISourceHandle iSourceHandle) {
            MPPDataExchangeManager.logger.debug("onAborted is invoked");
            onFinished(iSourceHandle);
        }

        @Override // org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener
        public void onFailure(ISourceHandle iSourceHandle, Throwable th) {
            MPPDataExchangeManager.logger.error("Source handle failed due to: ", th);
            if (this.onFailureCallback != null) {
                this.onFailureCallback.call(th);
            }
        }
    }

    public MPPDataExchangeManager(LocalMemoryManager localMemoryManager, Supplier<TsBlockSerde> supplier, ExecutorService executorService, IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> iClientManager) {
        this.localMemoryManager = (LocalMemoryManager) Validate.notNull(localMemoryManager);
        this.tsBlockSerdeFactory = (Supplier) Validate.notNull(supplier);
        this.executorService = (ExecutorService) Validate.notNull(executorService);
        this.mppDataExchangeServiceClientManager = (IClientManager) Validate.notNull(iClientManager);
    }

    public MPPDataExchangeServiceImpl getOrCreateMPPDataExchangeServiceImpl() {
        if (this.mppDataExchangeService == null) {
            this.mppDataExchangeService = new MPPDataExchangeServiceImpl();
        }
        return this.mppDataExchangeService;
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager
    public synchronized ISinkHandle createLocalSinkHandle(TFragmentInstanceId tFragmentInstanceId, TFragmentInstanceId tFragmentInstanceId2, String str, FragmentInstanceContext fragmentInstanceContext) {
        SharedTsBlockQueue sharedTsBlockQueue;
        if (this.sinkHandles.containsKey(tFragmentInstanceId)) {
            throw new IllegalStateException("Local sink handle for " + tFragmentInstanceId + " exists.");
        }
        logger.debug("Create local sink handle to plan node {} of {} for {}", new Object[]{str, tFragmentInstanceId2, tFragmentInstanceId});
        if (this.sourceHandles.containsKey(tFragmentInstanceId2) && this.sourceHandles.get(tFragmentInstanceId2).containsKey(str)) {
            logger.debug("Get shared tsblock queue from local source handle");
            sharedTsBlockQueue = ((LocalSourceHandle) this.sourceHandles.get(tFragmentInstanceId2).get(str)).getSharedTsBlockQueue();
        } else {
            logger.debug("Create shared tsblock queue");
            sharedTsBlockQueue = new SharedTsBlockQueue(tFragmentInstanceId2, this.localMemoryManager);
        }
        fragmentInstanceContext.getClass();
        LocalSinkHandle localSinkHandle = new LocalSinkHandle(tFragmentInstanceId2, str, tFragmentInstanceId, sharedTsBlockQueue, new SinkHandleListenerImpl(fragmentInstanceContext, fragmentInstanceContext::failed));
        this.sinkHandles.put(tFragmentInstanceId, localSinkHandle);
        return localSinkHandle;
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager
    public ISinkHandle createSinkHandle(TFragmentInstanceId tFragmentInstanceId, TEndPoint tEndPoint, TFragmentInstanceId tFragmentInstanceId2, String str, FragmentInstanceContext fragmentInstanceContext) {
        if (this.sinkHandles.containsKey(tFragmentInstanceId)) {
            throw new IllegalStateException("Sink handle for " + tFragmentInstanceId + " exists.");
        }
        logger.debug("Create sink handle to plan node {} of {} for {}", new Object[]{str, tFragmentInstanceId2, tFragmentInstanceId});
        LocalMemoryManager localMemoryManager = this.localMemoryManager;
        ExecutorService executorService = this.executorService;
        TsBlockSerde tsBlockSerde = this.tsBlockSerdeFactory.get();
        fragmentInstanceContext.getClass();
        SinkHandle sinkHandle = new SinkHandle(tEndPoint, tFragmentInstanceId2, str, tFragmentInstanceId, localMemoryManager, executorService, tsBlockSerde, new SinkHandleListenerImpl(fragmentInstanceContext, fragmentInstanceContext::failed), this.mppDataExchangeServiceClientManager);
        this.sinkHandles.put(tFragmentInstanceId, sinkHandle);
        return sinkHandle;
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager
    public synchronized ISourceHandle createLocalSourceHandle(TFragmentInstanceId tFragmentInstanceId, String str, TFragmentInstanceId tFragmentInstanceId2, IMPPDataExchangeManagerCallback<Throwable> iMPPDataExchangeManagerCallback) {
        SharedTsBlockQueue sharedTsBlockQueue;
        if (this.sourceHandles.containsKey(tFragmentInstanceId) && this.sourceHandles.get(tFragmentInstanceId).containsKey(str)) {
            throw new IllegalStateException("Source handle for plan node " + str + " of " + tFragmentInstanceId + " exists.");
        }
        logger.debug("Create local source handle from {} for plan node {} of {}", new Object[]{tFragmentInstanceId2, str, tFragmentInstanceId});
        if (this.sinkHandles.containsKey(tFragmentInstanceId2)) {
            logger.debug("Get shared tsblock queue from local sink handle");
            sharedTsBlockQueue = ((LocalSinkHandle) this.sinkHandles.get(tFragmentInstanceId2)).getSharedTsBlockQueue();
        } else {
            logger.debug("Create shared tsblock queue");
            sharedTsBlockQueue = new SharedTsBlockQueue(tFragmentInstanceId, this.localMemoryManager);
        }
        LocalSourceHandle localSourceHandle = new LocalSourceHandle(tFragmentInstanceId2, tFragmentInstanceId, str, sharedTsBlockQueue, new SourceHandleListenerImpl(iMPPDataExchangeManagerCallback));
        this.sourceHandles.computeIfAbsent(tFragmentInstanceId, tFragmentInstanceId3 -> {
            return new ConcurrentHashMap();
        }).put(str, localSourceHandle);
        return localSourceHandle;
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager
    public ISourceHandle createSourceHandle(TFragmentInstanceId tFragmentInstanceId, String str, TEndPoint tEndPoint, TFragmentInstanceId tFragmentInstanceId2, IMPPDataExchangeManagerCallback<Throwable> iMPPDataExchangeManagerCallback) {
        if (this.sourceHandles.containsKey(tFragmentInstanceId) && this.sourceHandles.get(tFragmentInstanceId).containsKey(str)) {
            throw new IllegalStateException("Source handle for plan node " + str + " of " + tFragmentInstanceId + " exists.");
        }
        logger.debug("Create source handle from {} for plan node {} of {}", new Object[]{tFragmentInstanceId2, str, tFragmentInstanceId});
        SourceHandle sourceHandle = new SourceHandle(tEndPoint, tFragmentInstanceId2, tFragmentInstanceId, str, this.localMemoryManager, this.executorService, this.tsBlockSerdeFactory.get(), new SourceHandleListenerImpl(iMPPDataExchangeManagerCallback), this.mppDataExchangeServiceClientManager);
        this.sourceHandles.computeIfAbsent(tFragmentInstanceId, tFragmentInstanceId3 -> {
            return new ConcurrentHashMap();
        }).put(str, sourceHandle);
        return sourceHandle;
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager
    public void forceDeregisterFragmentInstance(TFragmentInstanceId tFragmentInstanceId) {
        logger.debug("Force deregister fragment instance");
        if (this.sinkHandles.containsKey(tFragmentInstanceId)) {
            this.sinkHandles.get(tFragmentInstanceId).abort();
            this.sinkHandles.remove(tFragmentInstanceId);
        }
        if (this.sourceHandles.containsKey(tFragmentInstanceId)) {
            for (Map.Entry<String, ISourceHandle> entry : this.sourceHandles.get(tFragmentInstanceId).entrySet()) {
                logger.info("Close source handle {}", this.sourceHandles);
                entry.getValue().abort();
            }
            this.sourceHandles.remove(tFragmentInstanceId);
        }
    }

    public static String createFullIdFrom(TFragmentInstanceId tFragmentInstanceId, String str) {
        return FragmentInstanceId.createFullId(tFragmentInstanceId.queryId, tFragmentInstanceId.fragmentId, tFragmentInstanceId.instanceId) + "." + str;
    }
}
