package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.types.Either;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionJobVertex.class */
public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable<ArchivedExecutionJobVertex> {
    private static final Logger LOG = ExecutionGraph.LOG;
    public static final int VALUE_NOT_SET = -1;
    private final ExecutionGraph graph;
    private final JobVertex jobVertex;
    private final ExecutionVertex[] taskVertices;
    private final IntermediateResult[] producedDataSets;
    private final List<IntermediateResult> inputs;
    private final int parallelism;
    private final SlotSharingGroup slotSharingGroup;

    @Nullable
    private final CoLocationGroup coLocationGroup;
    private final InputSplit[] inputSplits;
    private final boolean maxParallelismConfigured;
    private int maxParallelism;
    private final ResourceProfile resourceProfile;
    private final Collection<OperatorCoordinatorHolder> operatorCoordinators;
    private InputSplitAssigner splitAssigner;
    private final Object stateMonitor = new Object();
    private Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Finally extract failed */
    public ExecutionJobVertex(ExecutionGraph executionGraph, JobVertex jobVertex, int i, int i2, Time time, long j, long j2) throws JobException {
        if (executionGraph == null || jobVertex == null) {
            throw new NullPointerException();
        }
        this.graph = executionGraph;
        this.jobVertex = jobVertex;
        int parallelism = jobVertex.getParallelism();
        int i3 = parallelism > 0 ? parallelism : i;
        int maxParallelism = jobVertex.getMaxParallelism();
        this.maxParallelismConfigured = -1 != maxParallelism;
        setMaxParallelismInternal(this.maxParallelismConfigured ? maxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(i3));
        if (i3 > this.maxParallelism) {
            throw new JobException(String.format("Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.", jobVertex.getName(), Integer.valueOf(i3), Integer.valueOf(this.maxParallelism)));
        }
        this.parallelism = i3;
        this.resourceProfile = ResourceProfile.fromResourceSpec(jobVertex.getMinResources(), MemorySize.ZERO);
        this.taskVertices = new ExecutionVertex[i3];
        this.inputs = new ArrayList(jobVertex.getInputs().size());
        this.slotSharingGroup = (SlotSharingGroup) Preconditions.checkNotNull(jobVertex.getSlotSharingGroup());
        this.coLocationGroup = jobVertex.getCoLocationGroup();
        this.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];
        for (int i4 = 0; i4 < jobVertex.getProducedDataSets().size(); i4++) {
            IntermediateDataSet intermediateDataSet = jobVertex.getProducedDataSets().get(i4);
            this.producedDataSets[i4] = new IntermediateResult(intermediateDataSet.getId(), this, i3, intermediateDataSet.getResultType());
        }
        for (int i5 = 0; i5 < i3; i5++) {
            this.taskVertices[i5] = new ExecutionVertex(this, i5, this.producedDataSets, time, j, j2, i2);
        }
        for (IntermediateResult intermediateResult : this.producedDataSets) {
            if (intermediateResult.getNumberOfAssignedPartitions() != this.parallelism) {
                throw new RuntimeException("The intermediate result's partitions were not correctly assigned.");
            }
        }
        List<SerializedValue<OperatorCoordinator.Provider>> operatorCoordinators = getJobVertex().getOperatorCoordinators();
        if (operatorCoordinators.isEmpty()) {
            this.operatorCoordinators = Collections.emptyList();
        } else {
            ArrayList arrayList = new ArrayList(operatorCoordinators.size());
            try {
                Iterator<SerializedValue<OperatorCoordinator.Provider>> it = operatorCoordinators.iterator();
                while (it.hasNext()) {
                    arrayList.add(OperatorCoordinatorHolder.create(it.next(), this, executionGraph.getUserClassLoader()));
                }
                this.operatorCoordinators = Collections.unmodifiableList(arrayList);
            } catch (Exception | LinkageError e) {
                IOUtils.closeAllQuietly(arrayList);
                throw new JobException("Cannot instantiate the coordinator for operator " + getName(), e);
            }
        }
        try {
            InputSplitSource<?> inputSplitSource = jobVertex.getInputSplitSource();
            if (inputSplitSource != null) {
                Thread currentThread = Thread.currentThread();
                ClassLoader contextClassLoader = currentThread.getContextClassLoader();
                currentThread.setContextClassLoader(executionGraph.getUserClassLoader());
                try {
                    this.inputSplits = inputSplitSource.createInputSplits(i3);
                    if (this.inputSplits != null) {
                        this.splitAssigner = inputSplitSource.getInputSplitAssigner(this.inputSplits);
                    }
                    currentThread.setContextClassLoader(contextClassLoader);
                } catch (Throwable th) {
                    currentThread.setContextClassLoader(contextClassLoader);
                    throw th;
                }
            } else {
                this.inputSplits = null;
            }
        } catch (Throwable th2) {
            throw new JobException("Creating the input splits caused an error: " + th2.getMessage(), th2);
        }
    }

    public List<OperatorIDPair> getOperatorIDs() {
        return this.jobVertex.getOperatorIDs();
    }

    public void setMaxParallelism(int i) {
        Preconditions.checkState(!this.maxParallelismConfigured, "Attempt to override a configured max parallelism. Configured: " + this.maxParallelism + ", argument: " + i);
        setMaxParallelismInternal(i);
    }

    private void setMaxParallelismInternal(int i) {
        if (i == Integer.MAX_VALUE) {
            i = 32768;
        }
        Preconditions.checkArgument(i > 0 && i <= 32768, "Overriding max parallelism is not in valid bounds (1..%s), found: %s", new Object[]{32768, Integer.valueOf(i)});
        this.maxParallelism = i;
    }

    public ExecutionGraph getGraph() {
        return this.graph;
    }

    public JobVertex getJobVertex() {
        return this.jobVertex;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex
    public String getName() {
        return getJobVertex().getName();
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex
    public int getParallelism() {
        return this.parallelism;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex
    public int getMaxParallelism() {
        return this.maxParallelism;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex
    public ResourceProfile getResourceProfile() {
        return this.resourceProfile;
    }

    public boolean isMaxParallelismConfigured() {
        return this.maxParallelismConfigured;
    }

    public JobID getJobId() {
        return this.graph.getJobID();
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex
    public JobVertexID getJobVertexId() {
        return this.jobVertex.getID();
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex
    public ExecutionVertex[] getTaskVertices() {
        return this.taskVertices;
    }

    public IntermediateResult[] getProducedDataSets() {
        return this.producedDataSets;
    }

    public InputSplitAssigner getSplitAssigner() {
        return this.splitAssigner;
    }

    public SlotSharingGroup getSlotSharingGroup() {
        return this.slotSharingGroup;
    }

    @Nullable
    public CoLocationGroup getCoLocationGroup() {
        return this.coLocationGroup;
    }

    public List<IntermediateResult> getInputs() {
        return this.inputs;
    }

    public InputDependencyConstraint getInputDependencyConstraint() {
        return getJobVertex().getInputDependencyConstraint();
    }

    public Collection<OperatorCoordinatorHolder> getOperatorCoordinators() {
        return this.operatorCoordinators;
    }

    public Either<SerializedValue<TaskInformation>, PermanentBlobKey> getTaskInformationOrBlobKey() throws IOException {
        Either<SerializedValue<TaskInformation>, PermanentBlobKey> either;
        synchronized (this.stateMonitor) {
            if (this.taskInformationOrBlobKey == null) {
                this.taskInformationOrBlobKey = BlobWriter.serializeAndTryOffload(new TaskInformation(this.jobVertex.getID(), this.jobVertex.getName(), this.parallelism, this.maxParallelism, this.jobVertex.getInvokableClassName(), this.jobVertex.getConfiguration()), getJobId(), this.graph.getBlobWriter());
            }
            either = this.taskInformationOrBlobKey;
        }
        return either;
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex
    public ExecutionState getAggregateState() {
        int[] iArr = new int[ExecutionState.values().length];
        for (ExecutionVertex executionVertex : this.taskVertices) {
            int ordinal = executionVertex.getExecutionState().ordinal();
            iArr[ordinal] = iArr[ordinal] + 1;
        }
        return getAggregateJobVertexState(iArr, this.parallelism);
    }

    public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> map) throws JobException {
        List<JobEdge> inputs = this.jobVertex.getInputs();
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.", this.jobVertex.getID(), this.jobVertex.getName(), Integer.valueOf(inputs.size())));
        }
        for (int i = 0; i < inputs.size(); i++) {
            JobEdge jobEdge = inputs.get(i);
            if (LOG.isDebugEnabled()) {
                if (jobEdge.getSource() == null) {
                    LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.", Integer.valueOf(i), this.jobVertex.getID(), this.jobVertex.getName(), jobEdge.getSourceId()));
                } else {
                    LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).", Integer.valueOf(i), this.jobVertex.getID(), this.jobVertex.getName(), jobEdge.getSource().getProducer().getID(), jobEdge.getSource().getProducer().getName()));
                }
            }
            IntermediateResult intermediateResult = map.get(jobEdge.getSourceId());
            if (intermediateResult == null) {
                throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID " + jobEdge.getSourceId());
            }
            this.inputs.add(intermediateResult);
            int registerConsumer = intermediateResult.registerConsumer();
            for (int i2 = 0; i2 < this.parallelism; i2++) {
                this.taskVertices[i2].connectSource(i, intermediateResult, jobEdge, registerConsumer);
            }
        }
    }

    public void cancel() {
        for (ExecutionVertex executionVertex : getTaskVertices()) {
            executionVertex.cancel();
        }
    }

    public CompletableFuture<Void> cancelWithFuture() {
        return FutureUtils.waitForAll(mapExecutionVertices((v0) -> {
            return v0.cancel();
        }));
    }

    public CompletableFuture<Void> suspend() {
        return FutureUtils.waitForAll(mapExecutionVertices((v0) -> {
            return v0.suspend();
        }));
    }

    @Nonnull
    private Collection<CompletableFuture<?>> mapExecutionVertices(Function<ExecutionVertex, CompletableFuture<?>> function) {
        return (Collection) Arrays.stream(getTaskVertices()).map(function).collect(Collectors.toList());
    }

    public void fail(Throwable th) {
        for (ExecutionVertex executionVertex : getTaskVertices()) {
            executionVertex.fail(th);
        }
    }

    public void resetForNewExecution(long j, long j2) throws GlobalModVersionMismatch {
        synchronized (this.stateMonitor) {
            for (int i = 0; i < this.parallelism; i++) {
                this.taskVertices[i].resetForNewExecution(j, j2);
            }
            try {
                if (this.inputSplits != null) {
                    this.splitAssigner = this.jobVertex.getInputSplitSource().getInputSplitAssigner(this.inputSplits);
                }
            } catch (Throwable th) {
                throw new RuntimeException("Re-creating the input split assigner failed: " + th.getMessage(), th);
            }
        }
    }

    @Override // org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex
    public StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified() {
        HashMap hashMap = new HashMap();
        for (ExecutionVertex executionVertex : this.taskVertices) {
            Map<String, Accumulator<?, ?>> userAccumulators = executionVertex.getCurrentExecutionAttempt().getUserAccumulators();
            if (userAccumulators != null) {
                AccumulatorHelper.mergeInto(hashMap, userAccumulators);
            }
        }
        return StringifiedAccumulatorResult.stringifyAccumulatorResults(hashMap);
    }

    /* renamed from: archive, reason: merged with bridge method [inline-methods] */
    public ArchivedExecutionJobVertex m2106archive() {
        return new ArchivedExecutionJobVertex(this);
    }

    public static ExecutionState getAggregateJobVertexState(int[] iArr, int i) {
        if (iArr == null || iArr.length != ExecutionState.values().length) {
            throw new IllegalArgumentException("Must provide an array as large as there are execution states.");
        }
        return iArr[ExecutionState.FAILED.ordinal()] > 0 ? ExecutionState.FAILED : iArr[ExecutionState.CANCELING.ordinal()] > 0 ? ExecutionState.CANCELING : iArr[ExecutionState.CANCELED.ordinal()] > 0 ? ExecutionState.CANCELED : iArr[ExecutionState.RUNNING.ordinal()] > 0 ? ExecutionState.RUNNING : iArr[ExecutionState.FINISHED.ordinal()] > 0 ? iArr[ExecutionState.FINISHED.ordinal()] == i ? ExecutionState.FINISHED : ExecutionState.RUNNING : ExecutionState.CREATED;
    }
}
