package org.apache.flink.runtime.scheduler;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RuntimeRescaleConfigurationOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.FailureHandlingResult;
import org.apache.flink.runtime.executiongraph.failover.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.failure.DefaultFailureEnricherContext;
import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.io.network.partition.PartitionException;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.rescale.RescaledPipelinedRegion;
import org.apache.flink.runtime.rescale.RescalingExecutionDeploymentHandle;
import org.apache.flink.runtime.rescale.RuntimeRescaleCoordinator;
import org.apache.flink.runtime.rescale.RuntimeRescaleException;
import org.apache.flink.runtime.rescale.RuntimeRescaleFailureReason;
import org.apache.flink.runtime.rescale.RuntimeRescaleIDCounter;
import org.apache.flink.runtime.rescale.RuntimeRescaleRuntimeException;
import org.apache.flink.runtime.rescale.RuntimeRescaleSchedulerOperations;
import org.apache.flink.runtime.rescale.RuntimeRescaleSupporting;
import org.apache.flink.runtime.rescale.RuntimeRescaleUtils;
import org.apache.flink.runtime.scheduler.ExecutionDeployer;
import org.apache.flink.runtime.scheduler.adaptive.JobGraphJobInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultScheduler.class */
public class DefaultScheduler extends SchedulerBase implements SchedulerOperations, RuntimeRescaleSchedulerOperations {
    protected final Logger log;
    private final ClassLoader userCodeLoader;
    protected final ExecutionSlotAllocator executionSlotAllocator;
    private final ExecutionFailureHandler executionFailureHandler;
    private final ScheduledExecutor delayExecutor;
    protected final SchedulingStrategy schedulingStrategy;
    private final ExecutionOperations executionOperations;
    private final Set<ExecutionVertexID> verticesWaitingForRestart;
    protected final ShuffleMaster<?> shuffleMaster;
    private final Map<AllocationID, Long> reservedAllocationRefCounters;
    private final Map<ExecutionVertexID, AllocationID> reservedAllocationByExecutionVertex;
    protected final ExecutionDeployer executionDeployer;
    private JobGraphJobInformation jobInformation;
    private Time rpcTimeout;
    protected final FailoverStrategy failoverStrategy;
    private final RuntimeRescaleConfigurationOptions.RuntimeRescaleCheckpointBehavior runtimeRescaleCheckpointBehavior;

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultScheduler$DefaultExecutionSlotAllocationContext.class */
    private class DefaultExecutionSlotAllocationContext implements ExecutionSlotAllocationContext {
        private DefaultExecutionSlotAllocationContext() {
        }

        @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext
        public ResourceProfile getResourceProfile(ExecutionVertexID executionVertexID) {
            return DefaultScheduler.this.getExecutionVertex(executionVertexID).getResourceProfile();
        }

        @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext
        public Optional<AllocationID> findPriorAllocationId(ExecutionVertexID executionVertexID) {
            return DefaultScheduler.this.getExecutionVertex(executionVertexID).findLastAllocation();
        }

        @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext
        public SchedulingTopology getSchedulingTopology() {
            return DefaultScheduler.this.getSchedulingTopology();
        }

        @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext
        public Set<SlotSharingGroup> getLogicalSlotSharingGroups() {
            return DefaultScheduler.this.getJobGraph().getSlotSharingGroups();
        }

        @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext
        public Set<CoLocationGroup> getCoLocationGroups() {
            return DefaultScheduler.this.getJobGraph().getCoLocationGroups();
        }

        @Override // org.apache.flink.runtime.scheduler.InputsLocationsRetriever
        public Collection<ConsumedPartitionGroup> getConsumedPartitionGroups(ExecutionVertexID executionVertexID) {
            return DefaultScheduler.this.inputsLocationsRetriever.getConsumedPartitionGroups(executionVertexID);
        }

        @Override // org.apache.flink.runtime.scheduler.InputsLocationsRetriever
        public Collection<ExecutionVertexID> getProducersOfConsumedPartitionGroup(ConsumedPartitionGroup consumedPartitionGroup) {
            return DefaultScheduler.this.inputsLocationsRetriever.getProducersOfConsumedPartitionGroup(consumedPartitionGroup);
        }

        @Override // org.apache.flink.runtime.scheduler.InputsLocationsRetriever
        public Optional<CompletableFuture<TaskManagerLocation>> getTaskManagerLocation(ExecutionVertexID executionVertexID) {
            return DefaultScheduler.this.inputsLocationsRetriever.getTaskManagerLocation(executionVertexID);
        }

        @Override // org.apache.flink.runtime.scheduler.StateLocationRetriever
        public Optional<TaskManagerLocation> getStateLocation(ExecutionVertexID executionVertexID) {
            return DefaultScheduler.this.stateLocationRetriever.getStateLocation(executionVertexID);
        }

        @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext
        public Set<AllocationID> getReservedAllocations() {
            return DefaultScheduler.this.reservedAllocationRefCounters.keySet();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DefaultScheduler(Logger logger, JobGraph jobGraph, Executor executor, Configuration configuration, Consumer<ComponentMainThreadExecutor> consumer, ScheduledExecutor scheduledExecutor, ClassLoader classLoader, CheckpointsCleaner checkpointsCleaner, CheckpointRecoveryFactory checkpointRecoveryFactory, Function<JobID, RuntimeRescaleIDCounter> function, JobManagerJobMetricGroup jobManagerJobMetricGroup, SchedulingStrategyFactory schedulingStrategyFactory, FailoverStrategy.Factory factory, RestartBackoffTimeStrategy restartBackoffTimeStrategy, ExecutionOperations executionOperations, ExecutionVertexVersioner executionVertexVersioner, ExecutionSlotAllocatorFactory executionSlotAllocatorFactory, long j, ComponentMainThreadExecutor componentMainThreadExecutor, JobStatusListener jobStatusListener, Collection<FailureEnricher> collection, ExecutionGraphFactory executionGraphFactory, ShuffleMaster<?> shuffleMaster, Time time, VertexParallelismStore vertexParallelismStore, ExecutionDeployer.Factory factory2) throws Exception {
        super(logger, jobGraph, executor, configuration, checkpointsCleaner, checkpointRecoveryFactory, function, jobManagerJobMetricGroup, executionVertexVersioner, j, componentMainThreadExecutor, jobStatusListener, executionGraphFactory, changeVertexParallelismStoreByJobResourceRequirements(jobGraph, vertexParallelismStore));
        this.log = logger;
        this.delayExecutor = (ScheduledExecutor) Preconditions.checkNotNull(scheduledExecutor);
        this.userCodeLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
        this.executionOperations = (ExecutionOperations) Preconditions.checkNotNull(executionOperations);
        this.shuffleMaster = (ShuffleMaster) Preconditions.checkNotNull(shuffleMaster);
        this.reservedAllocationRefCounters = new HashMap();
        this.reservedAllocationByExecutionVertex = new HashMap();
        this.failoverStrategy = factory.create(getSchedulingTopology(), getResultPartitionAvailabilityChecker());
        logger.info("Using failover strategy {} for {} ({}).", new Object[]{this.failoverStrategy, jobGraph.getName(), jobGraph.getJobID()});
        this.executionFailureHandler = new ExecutionFailureHandler(configuration, getSchedulingTopology(), this.failoverStrategy, restartBackoffTimeStrategy, componentMainThreadExecutor, collection, DefaultFailureEnricherContext.forTaskFailure(this.jobInfo, jobManagerJobMetricGroup, executor, classLoader), DefaultFailureEnricherContext.forGlobalFailure(this.jobInfo, jobManagerJobMetricGroup, executor, classLoader), jobManagerJobMetricGroup);
        this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology());
        this.executionSlotAllocator = ((ExecutionSlotAllocatorFactory) Preconditions.checkNotNull(executionSlotAllocatorFactory)).createInstance(new DefaultExecutionSlotAllocationContext());
        this.verticesWaitingForRestart = new HashSet();
        consumer.accept(componentMainThreadExecutor);
        this.executionDeployer = factory2.createInstance(logger, this.executionSlotAllocator, executionOperations, executionVertexVersioner, time, this::startReserveAllocation, componentMainThreadExecutor);
        this.rpcTimeout = time;
        this.jobInformation = new JobGraphJobInformation(jobGraph, vertexParallelismStore);
        this.runtimeRescaleCheckpointBehavior = (RuntimeRescaleConfigurationOptions.RuntimeRescaleCheckpointBehavior) configuration.get(RuntimeRescaleConfigurationOptions.RUNTIME_RESCALING_CHECKPOINT_WAIT_BEHAVIOR);
    }

    private static VertexParallelismStore changeVertexParallelismStoreByJobResourceRequirements(JobGraph jobGraph, VertexParallelismStore vertexParallelismStore) throws IOException {
        VertexParallelismStore vertexParallelismStore2 = (VertexParallelismStore) JobResourceRequirements.readFromJobGraph(jobGraph).flatMap(jobResourceRequirements -> {
            return DefaultVertexParallelismStore.applyJobResourceRequirements(vertexParallelismStore, jobResourceRequirements, jobGraph);
        }).orElse(vertexParallelismStore);
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            jobVertex.setParallelism(vertexParallelismStore2.getParallelismInfo(jobVertex.getID()).getParallelism());
        }
        return vertexParallelismStore2;
    }

    @Override // org.apache.flink.runtime.rescale.RuntimeRescaleSchedulerOperations
    public void startRuntimeRescale(Map<JobVertexID, Integer> map) {
        try {
            getExecutionGraph().createFallbackState();
            getExecutionGraph().modifyByRescale(map);
        } catch (JobException e) {
            throw new RuntimeRescaleRuntimeException("Cannot start runtime rescale event.", e);
        }
    }

    @Override // org.apache.flink.runtime.rescale.RuntimeRescaleSchedulerOperations
    public void updateSchedulingTopologyByUpscaledVertices() {
        getExecutionGraph().updateSchedulingTopologyByUpscaledVertices();
    }

    public CompletableFuture<Boolean> isReadyToRescaling(Set<JobVertexID> set) {
        LinkedList linkedList = new LinkedList();
        for (ExecutionVertex executionVertex : getExecutionGraph().getAllExecutionVertices()) {
            if (set.contains(executionVertex.getID().getJobVertexId())) {
                executionVertex.getCurrentExecutions().forEach(execution -> {
                    linkedList.add(execution.isReadyToRescale());
                });
            }
        }
        return FutureUtils.combineAll(linkedList).thenApply(collection -> {
            return (Boolean) collection.stream().reduce((bool, bool2) -> {
                return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
            }).orElse(true);
        });
    }

    @Override // org.apache.flink.runtime.rescale.RuntimeRescaleSchedulerOperations
    public List<RescaledPipelinedRegion> rescheduleByRescale() {
        this.log.info("Starting rescheduling by rescale with scheduling strategy [{}]", this.schedulingStrategy.getClass().getName());
        return this.schedulingStrategy.rescheduleByRescale();
    }

    @Override // org.apache.flink.runtime.rescale.RuntimeRescaleSchedulerOperations
    public void updateSchedulingTopologyByDownscaledVertices() {
        getExecutionGraph().updateSchedulingTopologyByDownscaledVertices();
    }

    @Override // org.apache.flink.runtime.rescale.RuntimeRescaleSchedulerOperations
    public void finishRuntimeRescale() {
        List<ExecutionVertex> downscaledExecutionVertices = getExecutionGraph().getDownscaledExecutionVertices();
        Iterator<ExecutionVertex> it = downscaledExecutionVertices.iterator();
        while (it.hasNext()) {
            this.executionVertexVersioner.removeExecutionVertex(it.next().getID());
        }
        if (!downscaledExecutionVertices.isEmpty()) {
            this.schedulingStrategy.reinitOnDownscale();
        }
        getExecutionGraph().finishRuntimeRescale();
        try {
            getExecutionGraph().setJsonPlan(JsonPlanGenerator.generatePlan(getJobGraph()));
        } catch (Throwable th) {
            this.log.warn("Cannot create JSON plan for job", th);
            getExecutionGraph().setJsonPlan("{}");
        }
        getVertexEndOfDataListener().updateAfterRescale();
    }

    public CompletableFuture<Boolean> handleRuntimeRescaleFailure(Throwable th) {
        this.log.error("RuntimeRescale attempt failed with exception", th);
        Set<ExecutionVertexID> set = (Set) IterableUtils.toStream(getSchedulingTopology().getVertices()).map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toSet());
        this.log.info("{} tasks will be canceled to fallback from a runtime rescale error.", Integer.valueOf(set.size()));
        return cancelTasksAsync(set).thenApplyAsync(obj -> {
            return Boolean.valueOf(fallbackAfterRuntimeRescaleFailure());
        }, (Executor) getMainThreadExecutor());
    }

    private boolean fallbackAfterRuntimeRescaleFailure() {
        try {
            getExecutionGraph().restoreFromFallbackState();
            for (ExecutionJobVertex executionJobVertex : getExecutionGraph().getVerticesTopologically()) {
                executionJobVertex.getJobVertex().setParallelism(executionJobVertex.getParallelism());
                executionJobVertex.resetCoordinatorsAfterRescaleFailure(executionJobVertex.getParallelism());
                VertexParallelismStore vertexParallelismStore = this.jobInformation.getVertexParallelismStore();
                if (vertexParallelismStore instanceof MutableVertexParallelismStore) {
                    ((MutableVertexParallelismStore) vertexParallelismStore).setParallelismInfo(executionJobVertex.getJobVertexId(), executionJobVertex.getParallelismInfo());
                }
            }
            this.executionVertexVersioner.filterByExecutionVertices((Collection) IterableUtils.toStream(getExecutionGraph().getAllExecutionVertices()).map((v0) -> {
                return v0.getID();
            }).collect(Collectors.toSet()));
            this.schedulingStrategy.fallbackAfterRuntimeRescaleFailure();
            return true;
        } catch (Throwable th) {
            failJob(th, System.currentTimeMillis(), FailureEnricherUtils.EMPTY_FAILURE_LABELS);
            return false;
        }
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public Map<JobVertexID, Integer> prepareRuntimeRescale(Map<JobVertexID, Integer> map) throws RuntimeRescaleException {
        for (Map.Entry<JobVertexID, Integer> entry : map.entrySet()) {
            if (entry.getValue().intValue() <= 0) {
                throw new RuntimeRescaleException(String.format("Incorrect parallelism (%s) is set for vertex %s", entry.getValue(), entry.getKey()), RuntimeRescaleFailureReason.INCORRECT_PARALLELISM);
            }
        }
        this.log.info("Triggering a runtime rescaling for job {} and new parallelism = {}.", getJobId(), map);
        getMainThreadExecutor().assertRunningInMainThread();
        if (!runtimeRescaleEnabled()) {
            throw new RuntimeRescaleException(String.format("Runtime rescaling is disabled. Please, enable rescaling using '%s'", RuntimeRescaleConfigurationOptions.ENABLE_RUNTIME_RESCALING.key()), RuntimeRescaleFailureReason.RUNTIME_RESCALE_DISABLED);
        }
        Map<JobVertexID, Integer> verticesToRescale = getExecutionGraph().getVerticesToRescale(map);
        if (verticesToRescale.isEmpty()) {
            this.log.info("Rescale event was finished because of nothing to rescale");
            return verticesToRescale;
        }
        try {
            if (!isReadyToRescaling(map.keySet()).get(this.rpcTimeout.getSize(), this.rpcTimeout.getUnit()).booleanValue()) {
                throw new RuntimeRescaleException("Async compaction is not done.", RuntimeRescaleFailureReason.RUNTIME_RESCALE_IN_PROGRESS);
            }
            Optional<RuntimeRescaleException> acquireRuntimeRescaleAction = acquireRuntimeRescaleAction(map);
            if (!acquireRuntimeRescaleAction.isPresent()) {
                return verticesToRescale;
            }
            this.log.info("Cannot acquire runtime rescale action for job {} with reason", getJobId(), acquireRuntimeRescaleAction.get());
            throw acquireRuntimeRescaleAction.get();
        } catch (Exception e) {
            throw new RuntimeRescaleException(RuntimeRescaleFailureReason.RUNTIME_RESCALE_IN_PROGRESS, e);
        }
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public CompletableFuture<JobGraph> runtimeRescale(Map<JobVertexID, Integer> map) {
        RuntimeRescaleCoordinator runtimeRescaleCoordinator = getRuntimeRescaleCoordinator();
        Preconditions.checkNotNull(runtimeRescaleCoordinator, "Runtime rescale coordinator must be not null");
        CompletableFuture<JobGraph> completableFuture = new CompletableFuture<>();
        RuntimeRescaleUtils.checkpointCheck(getCheckpointCoordinator(), this.runtimeRescaleCheckpointBehavior, this.log).thenRun(() -> {
            CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
            if (checkpointCoordinator == null || !checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
                return;
            }
            checkpointCoordinator.stopCheckpointScheduler(CheckpointFailureReason.RUNTIME_RESCALE_SUSPEND);
        }).thenCompose(r8 -> {
            return runtimeRescaleCoordinator.triggerRuntimeRescaleAction(this, map, getMainThreadExecutor());
        }).handleAsync((BiFunction<? super U, Throwable, ? extends U>) (r7, th) -> {
            if (th != null) {
                handleRuntimeRescaleFailure(th).thenAccept(bool -> {
                    releaseRuntimeRescaleAction();
                    if (bool.booleanValue()) {
                        handleGlobalFailure(th);
                    } else {
                        this.log.error("Job was failed due runtime rescale and couldn't fallback from previous state.");
                    }
                    completableFuture.completeExceptionally(th);
                });
                return null;
            }
            this.log.info("Runtime rescaling process was successfully finished");
            releaseRuntimeRescaleAction();
            completableFuture.complete(getJobGraph());
            return null;
        }, (Executor) getMainThreadExecutor());
        return completableFuture;
    }

    private Optional<RuntimeRescaleException> acquireRuntimeRescaleAction(Map<JobVertexID, Integer> map) {
        this.log.info("Trying to acquire runtime rescale action for job {}", getJobId());
        RuntimeRescaleCoordinator runtimeRescaleCoordinator = getRuntimeRescaleCoordinator();
        if (runtimeRescaleCoordinator == null) {
            return Optional.of(new RuntimeRescaleException("Current job doesn't support runtime rescale", RuntimeRescaleFailureReason.RUNTIME_RESCALE_COORDINATOR_IS_NULL));
        }
        RuntimeRescaleSupporting supporting = runtimeRescaleCoordinator.supporting();
        if (!supporting.supported()) {
            return Optional.of(new RuntimeRescaleException("Current job doesn't support runtime rescale", supporting.reason().toFailureReason()));
        }
        if (IterableUtils.toStream(getExecutionGraph().getAllExecutionVertices()).filter(executionVertex -> {
            return map.containsKey(executionVertex.getJobVertex().getJobVertex().getID());
        }).map((v0) -> {
            return v0.getCurrentExecutionAttempt();
        }).anyMatch(execution -> {
            return execution.getState() != ExecutionState.RUNNING;
        })) {
            return Optional.of(new RuntimeRescaleException("All scaled tasks should be in RUNNING state", RuntimeRescaleFailureReason.NOT_ALL_TASKS_IN_RUNNING_STATE));
        }
        if (IterableUtils.toStream(getExecutionGraph().getAllExecutionVertices()).filter(executionVertex2 -> {
            return executionVertex2.getJobVertex().getJobVertex().isInputVertex();
        }).map((v0) -> {
            return v0.getCurrentExecutionAttempt();
        }).allMatch(execution2 -> {
            return execution2.getState() != ExecutionState.RUNNING;
        })) {
            return Optional.of(new RuntimeRescaleException("Any source tasks should be in RUNNING state", RuntimeRescaleFailureReason.NOT_ALL_TASKS_IN_RUNNING_STATE));
        }
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
        if (!runtimeRescaleCoordinator.enableAction()) {
            return Optional.of(new RuntimeRescaleException("Cannot start runtime rescale action because another runtime rescale action is active.", RuntimeRescaleFailureReason.RUNTIME_RESCALE_IN_PROGRESS));
        }
        if (checkpointCoordinator == null || !checkpointCoordinator.isTriggering()) {
            this.log.info("Runtime rescale action was successfully acquired for job {}", getJobId());
            return Optional.empty();
        }
        runtimeRescaleCoordinator.disableAction();
        return Optional.of(new RuntimeRescaleException("Cannot start runtime rescale action because checkpoint in progress", RuntimeRescaleFailureReason.CHECKPOINT_IN_PROGRESS));
    }

    private void releaseRuntimeRescaleAction() {
        getExecutionGraph().cleanupAfterRuntimeRescale();
        RuntimeRescaleCoordinator runtimeRescaleCoordinator = getRuntimeRescaleCoordinator();
        Preconditions.checkNotNull(runtimeRescaleCoordinator);
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
        if (checkpointCoordinator != null) {
            checkpointCoordinator.updateAfterRescale(getExecutionGraph());
            if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
                checkpointCoordinator.startCheckpointScheduler();
            }
        }
        runtimeRescaleCoordinator.disableAction();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    protected long getNumberOfRestarts() {
        return this.executionFailureHandler.getNumberOfRestarts();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    protected void cancelAllPendingSlotRequestsInternal() {
        getSchedulingTopology().getVertices().forEach(schedulingExecutionVertex -> {
            cancelAllPendingSlotRequestsForVertex(schedulingExecutionVertex.getId());
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    public void startSchedulingInternal() {
        this.log.info("Starting scheduling with scheduling strategy [{}]", this.schedulingStrategy.getClass().getName());
        transitionToRunning();
        this.schedulingStrategy.startScheduling();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    public void onTaskFinished(Execution execution, IOMetrics iOMetrics) {
        Preconditions.checkState(execution.getState() == ExecutionState.FINISHED);
        ExecutionVertexID id = execution.getVertex().getID();
        stopReserveAllocation(id);
        this.schedulingStrategy.onExecutionStateChange(id, ExecutionState.FINISHED);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    public void onTaskFailed(Execution execution) {
        Preconditions.checkState(execution.getState() == ExecutionState.FAILED);
        Preconditions.checkState(execution.getFailureInfo().isPresent());
        handleTaskFailure(execution, maybeTranslateToClusterDatasetException(execution.getFailureInfo().get().getException().deserializeError(this.userCodeLoader), execution.getVertex().getID()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleTaskFailure(Execution execution, @Nullable Throwable th) {
        if (hasActiveRuntimeRescaleAction()) {
            getRuntimeRescaleCoordinator().failRuntimeRescaleEvent(th);
        } else {
            maybeRestartTasks(recordTaskFailure(execution, th));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FailureHandlingResult recordTaskFailure(Execution execution, @Nullable Throwable th) {
        long currentTimeMillis = System.currentTimeMillis();
        setGlobalFailureCause(th, currentTimeMillis);
        notifyCoordinatorsAboutTaskFailure(execution, th);
        return this.executionFailureHandler.getFailureHandlingResult(execution, th, currentTimeMillis);
    }

    private Throwable maybeTranslateToClusterDatasetException(@Nullable Throwable th, ExecutionVertexID executionVertexID) {
        if (!(th instanceof PartitionException)) {
            return th;
        }
        List<IntermediateDataSetID> intermediateDataSetIdsToConsume = getExecutionJobVertex(executionVertexID.getJobVertexId()).getJobVertex().getIntermediateDataSetIdsToConsume();
        IntermediateResultPartitionID partitionId = ((PartitionException) th).getPartitionId().getPartitionId();
        return !intermediateDataSetIdsToConsume.contains(partitionId.getIntermediateDataSetID()) ? th : new ClusterDatasetCorruptedException(th, Collections.singletonList(partitionId.getIntermediateDataSetID()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void notifyCoordinatorsAboutTaskFailure(Execution execution, @Nullable Throwable th) {
        ExecutionJobVertex jobVertex = execution.getVertex().getJobVertex();
        int parallelSubtaskIndex = execution.getParallelSubtaskIndex();
        int attemptNumber = execution.getAttemptNumber();
        jobVertex.getOperatorCoordinators().forEach(operatorCoordinatorHolder -> {
            operatorCoordinatorHolder.executionAttemptFailed(parallelSubtaskIndex, attemptNumber, th);
        });
    }

    @Override // org.apache.flink.runtime.scheduler.GlobalFailureHandler
    public void handleGlobalFailure(Throwable th) {
        long currentTimeMillis = System.currentTimeMillis();
        setGlobalFailureCause(th, currentTimeMillis);
        this.log.info("Trying to recover from a global failure.", th);
        if (hasActiveRuntimeRescaleAction()) {
            getRuntimeRescaleCoordinator().failRuntimeRescaleEvent(th);
        } else {
            maybeRestartTasks(this.executionFailureHandler.getGlobalFailureHandlingResult(th, currentTimeMillis));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void maybeRestartTasks(FailureHandlingResult failureHandlingResult) {
        if (failureHandlingResult.canRestart()) {
            restartTasksWithDelay(failureHandlingResult);
        } else {
            failJob(failureHandlingResult.getError(), failureHandlingResult.getTimestamp(), failureHandlingResult.getFailureLabels());
        }
    }

    private void restartTasksWithDelay(FailureHandlingResult failureHandlingResult) {
        Set<ExecutionVertexID> verticesToRestart = failureHandlingResult.getVerticesToRestart();
        HashSet hashSet = new HashSet(this.executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
        boolean isGlobalFailure = failureHandlingResult.isGlobalFailure();
        if (isGlobalFailure) {
            this.log.info("{} tasks will be restarted to recover from a global failure.", Integer.valueOf(verticesToRestart.size()));
        } else {
            Preconditions.checkArgument(failureHandlingResult.getFailedExecution().isPresent());
            this.log.info("{} tasks will be restarted to recover the failed task {}.", Integer.valueOf(verticesToRestart.size()), failureHandlingResult.getFailedExecution().get().getAttemptId());
        }
        addVerticesToRestartPending(verticesToRestart);
        CompletableFuture<?> cancelTasksAsync = cancelTasksAsync(verticesToRestart);
        archiveFromFailureHandlingResult(createFailureHandlingResultSnapshot(failureHandlingResult));
        this.delayExecutor.schedule(() -> {
            FutureUtils.assertNoException(cancelTasksAsync.thenRunAsync(() -> {
                restartTasks(hashSet, isGlobalFailure);
            }, (Executor) getMainThreadExecutor()));
        }, failureHandlingResult.getRestartDelayMS(), TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FailureHandlingResultSnapshot createFailureHandlingResultSnapshot(FailureHandlingResult failureHandlingResult) {
        return FailureHandlingResultSnapshot.create(failureHandlingResult, executionVertexID -> {
            return getExecutionVertex(executionVertexID).getCurrentExecutions();
        });
    }

    private void addVerticesToRestartPending(Set<ExecutionVertexID> set) {
        this.verticesWaitingForRestart.addAll(set);
        transitionExecutionGraphState(JobStatus.RUNNING, JobStatus.RESTARTING);
    }

    private void removeVerticesFromRestartPending(Set<ExecutionVertexID> set) {
        this.verticesWaitingForRestart.removeAll(set);
        if (this.verticesWaitingForRestart.isEmpty()) {
            transitionExecutionGraphState(JobStatus.RESTARTING, JobStatus.RUNNING);
        }
    }

    private void restartTasks(Set<ExecutionVertexVersion> set, boolean z) {
        Set<ExecutionVertexID> unmodifiedExecutionVertices = this.executionVertexVersioner.getUnmodifiedExecutionVertices(set);
        if (unmodifiedExecutionVertices.isEmpty()) {
            return;
        }
        removeVerticesFromRestartPending(unmodifiedExecutionVertices);
        resetForNewExecutions(unmodifiedExecutionVertices);
        try {
            restoreState(unmodifiedExecutionVertices, z);
            this.schedulingStrategy.restartTasks(unmodifiedExecutionVertices);
        } catch (Throwable th) {
            handleGlobalFailure(th);
        }
    }

    private CompletableFuture<?> cancelTasksAsync(Set<ExecutionVertexID> set) {
        cancelAllPendingSlotRequestsForVertices(set);
        return FutureUtils.combineAll((List) set.stream().map(this::cancelExecutionVertex).collect(Collectors.toList()));
    }

    private CompletableFuture<?> cancelExecutionVertex(ExecutionVertexID executionVertexID) {
        return FutureUtils.combineAll((Collection) getExecutionVertex(executionVertexID).getCurrentExecutions().stream().map(this::cancelExecution).collect(Collectors.toList()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<?> cancelExecution(Execution execution) {
        notifyCoordinatorOfCancellation(execution);
        return this.executionOperations.cancel(execution);
    }

    private void cancelAllPendingSlotRequestsForVertices(Set<ExecutionVertexID> set) {
        set.forEach(this::cancelAllPendingSlotRequestsForVertex);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void cancelAllPendingSlotRequestsForVertex(ExecutionVertexID executionVertexID) {
        getExecutionVertex(executionVertexID).getCurrentExecutions().forEach(execution -> {
            this.executionSlotAllocator.cancel(execution.getAttemptId());
        });
    }

    private Execution getCurrentExecutionOfVertex(ExecutionVertexID executionVertexID) {
        return getExecutionVertex(executionVertexID).getCurrentExecutionAttempt();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerOperations
    public void allocateSlotsAndDeploy(List<ExecutionVertexID> list) {
        Map<ExecutionVertexID, ExecutionVertexVersion> recordVertexModifications = this.executionVertexVersioner.recordVertexModifications(list);
        this.executionDeployer.allocateSlotsAndDeploy((List) list.stream().map(this::getCurrentExecutionOfVertex).collect(Collectors.toList()), recordVertexModifications);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerOperations
    public CompletableFuture<List<RescalingExecutionDeploymentHandle>> allocateSlotsByRescaleAndGetRegionRescalingHandles(List<ExecutionVertexID> list) {
        Map<ExecutionVertexID, ExecutionVertexVersion> recordVertexModifications = this.executionVertexVersioner.recordVertexModifications(list);
        return this.executionDeployer.allocateExecutionsByRescaleAndGetRescalingHandles((List) list.stream().map(this::getCurrentExecutionOfVertex).collect(Collectors.toList()), recordVertexModifications);
    }

    private void startReserveAllocation(ExecutionVertexID executionVertexID, AllocationID allocationID) {
        stopReserveAllocation(executionVertexID);
        this.reservedAllocationByExecutionVertex.put(executionVertexID, allocationID);
        this.reservedAllocationRefCounters.compute(allocationID, (allocationID2, l) -> {
            return Long.valueOf(l == null ? 1L : l.longValue() + 1);
        });
    }

    private void stopReserveAllocation(ExecutionVertexID executionVertexID) {
        AllocationID remove = this.reservedAllocationByExecutionVertex.remove(executionVertexID);
        if (remove != null) {
            this.reservedAllocationRefCounters.compute(remove, (allocationID, l) -> {
                if (l.longValue() > 1) {
                    return Long.valueOf(l.longValue() - 1);
                }
                return null;
            });
        }
    }

    private void notifyCoordinatorOfCancellation(Execution execution) {
        ExecutionState state = execution.getState();
        if (state == ExecutionState.FAILED || state == ExecutionState.CANCELING || state == ExecutionState.CANCELED) {
            return;
        }
        notifyCoordinatorsAboutTaskFailure(execution, null);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public JobResourceRequirements requestJobResourceRequirements() {
        JobResourceRequirements.Builder newBuilder = JobResourceRequirements.newBuilder();
        for (JobInformation.VertexInformation vertexInformation : this.jobInformation.getVertices()) {
            newBuilder.setParallelismForJobVertex(vertexInformation.getJobVertexID(), 1, vertexInformation.getParallelism());
        }
        return newBuilder.build();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void updateJobResourceRequirements(JobResourceRequirements jobResourceRequirements) {
        updateJobResourceRequirementsAsync(jobResourceRequirements);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public CompletableFuture<Acknowledge> updateJobResourceRequirementsAsync(JobResourceRequirements jobResourceRequirements) {
        this.log.info("Update resource requirements with {}", jobResourceRequirements);
        Optional<VertexParallelismStore> applyJobResourceRequirements = DefaultVertexParallelismStore.applyJobResourceRequirements(this.jobInformation.getVertexParallelismStore(), jobResourceRequirements, getJobGraph());
        if (applyJobResourceRequirements.isPresent()) {
            JobGraphJobInformation jobGraphJobInformation = new JobGraphJobInformation(getJobGraph(), applyJobResourceRequirements.get());
            try {
                Map<JobVertexID, Integer> prepareRuntimeRescale = prepareRuntimeRescale(parallelismDifference(this.jobInformation, jobGraphJobInformation, jobResourceRequirements.getJobVertices()));
                if (!prepareRuntimeRescale.isEmpty()) {
                    runtimeRescale(prepareRuntimeRescale).whenComplete((jobGraph, th) -> {
                        if (th == null) {
                            this.jobInformation = jobGraphJobInformation;
                        }
                    });
                }
            } catch (Exception e) {
                return FutureUtils.completedExceptionally(e);
            }
        }
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    private Map<JobVertexID, Integer> parallelismDifference(JobGraphJobInformation jobGraphJobInformation, JobGraphJobInformation jobGraphJobInformation2, Set<JobVertexID> set) {
        VertexParallelismStore vertexParallelismStore = jobGraphJobInformation.getVertexParallelismStore();
        VertexParallelismStore vertexParallelismStore2 = jobGraphJobInformation2.getVertexParallelismStore();
        return (Map) set.stream().filter(jobVertexID -> {
            return vertexParallelismStore.getParallelismInfo(jobVertexID).getParallelism() - vertexParallelismStore2.getParallelismInfo(jobVertexID).getParallelism() != 0;
        }).map(jobVertexID2 -> {
            return Tuple2.of(jobVertexID2, Integer.valueOf(vertexParallelismStore2.getParallelismInfo(jobVertexID2).getParallelism()));
        }).collect(Collectors.toMap(tuple2 -> {
            return (JobVertexID) tuple2.f0;
        }, tuple22 -> {
            return (Integer) tuple22.f1;
        }));
    }
}
