package org.apache.flink.runtime.io.network;

import java.io.IOException;
import java.util.Optional;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/NetworkEnvironment.class */
public class NetworkEnvironment {
    private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);
    private final Object lock;
    private final NetworkBufferPool networkBufferPool;
    private final ConnectionManager connectionManager;
    private final ResultPartitionManager resultPartitionManager;
    private final TaskEventDispatcher taskEventDispatcher;
    private KvStateServer kvStateServer;
    private KvStateClientProxy kvStateProxy;
    private final KvStateRegistry kvStateRegistry;
    private final IOManager.IOMode defaultIOMode;
    private final int partitionRequestInitialBackoff;
    private final int partitionRequestMaxBackoff;
    private final int networkBuffersPerChannel;
    private final int extraNetworkBuffersPerGate;
    private final boolean enableCreditBased;
    private boolean isShutdown;

    public NetworkEnvironment(int i, int i2, int i3, int i4, int i5, int i6, boolean z) {
        this(new NetworkBufferPool(i, i2), new LocalConnectionManager(), new ResultPartitionManager(), new TaskEventDispatcher(), new KvStateRegistry(), null, null, IOManager.IOMode.SYNC, i3, i4, i5, i6, z);
    }

    public NetworkEnvironment(NetworkBufferPool networkBufferPool, ConnectionManager connectionManager, ResultPartitionManager resultPartitionManager, TaskEventDispatcher taskEventDispatcher, KvStateRegistry kvStateRegistry, KvStateServer kvStateServer, KvStateClientProxy kvStateClientProxy, IOManager.IOMode iOMode, int i, int i2, int i3, int i4, boolean z) {
        this.lock = new Object();
        this.networkBufferPool = (NetworkBufferPool) Preconditions.checkNotNull(networkBufferPool);
        this.connectionManager = (ConnectionManager) Preconditions.checkNotNull(connectionManager);
        this.resultPartitionManager = (ResultPartitionManager) Preconditions.checkNotNull(resultPartitionManager);
        this.taskEventDispatcher = (TaskEventDispatcher) Preconditions.checkNotNull(taskEventDispatcher);
        this.kvStateRegistry = (KvStateRegistry) Preconditions.checkNotNull(kvStateRegistry);
        this.kvStateServer = kvStateServer;
        this.kvStateProxy = kvStateClientProxy;
        this.defaultIOMode = iOMode;
        this.partitionRequestInitialBackoff = i;
        this.partitionRequestMaxBackoff = i2;
        this.isShutdown = false;
        this.networkBuffersPerChannel = i3;
        this.extraNetworkBuffersPerGate = i4;
        this.enableCreditBased = z;
    }

    public ResultPartitionManager getResultPartitionManager() {
        return this.resultPartitionManager;
    }

    public TaskEventDispatcher getTaskEventDispatcher() {
        return this.taskEventDispatcher;
    }

    public ConnectionManager getConnectionManager() {
        return this.connectionManager;
    }

    public NetworkBufferPool getNetworkBufferPool() {
        return this.networkBufferPool;
    }

    public IOManager.IOMode getDefaultIOMode() {
        return this.defaultIOMode;
    }

    public int getPartitionRequestInitialBackoff() {
        return this.partitionRequestInitialBackoff;
    }

    public int getPartitionRequestMaxBackoff() {
        return this.partitionRequestMaxBackoff;
    }

    public boolean isCreditBased() {
        return this.enableCreditBased;
    }

    public KvStateRegistry getKvStateRegistry() {
        return this.kvStateRegistry;
    }

    public KvStateServer getKvStateServer() {
        return this.kvStateServer;
    }

    public KvStateClientProxy getKvStateProxy() {
        return this.kvStateProxy;
    }

    public TaskKvStateRegistry createKvStateTaskRegistry(JobID jobID, JobVertexID jobVertexID) {
        return this.kvStateRegistry.createTaskRegistry(jobID, jobVertexID);
    }

    public void registerTask(Task task) throws IOException {
        ResultPartition[] producedPartitions = task.getProducedPartitions();
        synchronized (this.lock) {
            if (this.isShutdown) {
                throw new IllegalStateException("NetworkEnvironment is shut down");
            }
            for (ResultPartition resultPartition : producedPartitions) {
                setupPartition(resultPartition);
            }
            for (SingleInputGate singleInputGate : task.getAllInputGates()) {
                setupInputGate(singleInputGate);
            }
        }
    }

    @VisibleForTesting
    public void setupPartition(ResultPartition resultPartition) throws IOException {
        BufferPool bufferPool = null;
        try {
            bufferPool = this.networkBufferPool.createBufferPool(resultPartition.getNumberOfSubpartitions(), resultPartition.getPartitionType().isBounded() ? (resultPartition.getNumberOfSubpartitions() * this.networkBuffersPerChannel) + this.extraNetworkBuffersPerGate : Integer.MAX_VALUE, resultPartition.getPartitionType().hasBackPressure() ? Optional.empty() : Optional.of(resultPartition));
            resultPartition.registerBufferPool(bufferPool);
            this.resultPartitionManager.registerResultPartition(resultPartition);
            this.taskEventDispatcher.registerPartition(resultPartition.getPartitionId());
        } catch (Throwable th) {
            if (bufferPool != null) {
                bufferPool.lazyDestroy();
            }
            if (!(th instanceof IOException)) {
                throw new IOException(th.getMessage(), th);
            }
            throw ((IOException) th);
        }
    }

    @VisibleForTesting
    public void setupInputGate(SingleInputGate singleInputGate) throws IOException {
        BufferPool bufferPool = null;
        try {
            if (this.enableCreditBased) {
                int i = singleInputGate.getConsumedPartitionType().isBounded() ? this.extraNetworkBuffersPerGate : Integer.MAX_VALUE;
                singleInputGate.assignExclusiveSegments(this.networkBufferPool, this.networkBuffersPerChannel);
                bufferPool = this.networkBufferPool.createBufferPool(0, i);
            } else {
                bufferPool = this.networkBufferPool.createBufferPool(singleInputGate.getNumberOfInputChannels(), singleInputGate.getConsumedPartitionType().isBounded() ? (singleInputGate.getNumberOfInputChannels() * this.networkBuffersPerChannel) + this.extraNetworkBuffersPerGate : Integer.MAX_VALUE);
            }
            singleInputGate.setBufferPool(bufferPool);
        } catch (Throwable th) {
            if (bufferPool != null) {
                bufferPool.lazyDestroy();
            }
            ExceptionUtils.rethrowIOException(th);
        }
    }

    public void unregisterTask(Task task) {
        LOG.debug("Unregister task {} from network environment (state: {}).", task.getTaskInfo().getTaskNameWithSubtasks(), task.getExecutionState());
        ExecutionAttemptID executionId = task.getExecutionId();
        synchronized (this.lock) {
            if (this.isShutdown) {
                return;
            }
            if (task.isCanceledOrFailed()) {
                this.resultPartitionManager.releasePartitionsProducedBy(executionId, task.getFailureCause());
            }
            for (ResultPartition resultPartition : task.getProducedPartitions()) {
                this.taskEventDispatcher.unregisterPartition(resultPartition.getPartitionId());
                resultPartition.destroyBufferPool();
            }
            SingleInputGate[] allInputGates = task.getAllInputGates();
            if (allInputGates != null) {
                for (SingleInputGate singleInputGate : allInputGates) {
                    if (singleInputGate != null) {
                        try {
                            singleInputGate.releaseAllResources();
                        } catch (IOException e) {
                            LOG.error("Error during release of reader resources: " + e.getMessage(), e);
                        }
                    }
                }
            }
        }
    }

    public void start() throws IOException {
        synchronized (this.lock) {
            Preconditions.checkState(!this.isShutdown, "The NetworkEnvironment has already been shut down.");
            LOG.info("Starting the network environment and its components.");
            try {
                LOG.debug("Starting network connection manager");
                this.connectionManager.start(this.resultPartitionManager, this.taskEventDispatcher);
                if (this.kvStateServer != null) {
                    try {
                        this.kvStateServer.start();
                    } catch (Throwable th) {
                        this.kvStateServer.shutdown();
                        this.kvStateServer = null;
                        throw new IOException("Failed to start the Queryable State Data Server.", th);
                    }
                }
                if (this.kvStateProxy != null) {
                    try {
                        this.kvStateProxy.start();
                    } catch (Throwable th2) {
                        this.kvStateProxy.shutdown();
                        this.kvStateProxy = null;
                        throw new IOException("Failed to start the Queryable State Client Proxy.", th2);
                    }
                }
            } catch (IOException e) {
                throw new IOException("Failed to instantiate network connection manager.", e);
            }
        }
    }

    public void shutdown() {
        synchronized (this.lock) {
            if (this.isShutdown) {
                return;
            }
            LOG.info("Shutting down the network environment and its components.");
            if (this.kvStateProxy != null) {
                try {
                    LOG.debug("Shutting down Queryable State Client Proxy.");
                    this.kvStateProxy.shutdown();
                } catch (Throwable th) {
                    LOG.warn("Cannot shut down Queryable State Client Proxy.", th);
                }
            }
            if (this.kvStateServer != null) {
                try {
                    LOG.debug("Shutting down Queryable State Data Server.");
                    this.kvStateServer.shutdown();
                } catch (Throwable th2) {
                    LOG.warn("Cannot shut down Queryable State Data Server.", th2);
                }
            }
            try {
                LOG.debug("Shutting down network connection manager");
                this.connectionManager.shutdown();
            } catch (Throwable th3) {
                LOG.warn("Cannot shut down the network connection manager.", th3);
            }
            try {
                LOG.debug("Shutting down intermediate result partition manager");
                this.resultPartitionManager.shutdown();
            } catch (Throwable th4) {
                LOG.warn("Cannot shut down the result partition manager.", th4);
            }
            this.taskEventDispatcher.clearAll();
            this.networkBufferPool.destroyAllBufferPools();
            try {
                this.networkBufferPool.destroy();
            } catch (Throwable th5) {
                LOG.warn("Network buffer pool did not shut down properly.", th5);
            }
            this.isShutdown = true;
        }
    }

    public boolean isShutdown() {
        boolean z;
        synchronized (this.lock) {
            z = this.isShutdown;
        }
        return z;
    }
}
