package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.RuntimeRescaleConfigurationOptions;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.rescale.ModifiedRescalingExecutionDeploymentHandle;
import org.apache.flink.runtime.rescale.NewAllocatedRescalingExecutionDeploymentHandle;
import org.apache.flink.runtime.rescale.NoModifiedRescalingExecutionDeploymentHandle;
import org.apache.flink.runtime.rescale.RescalingExecutionDeploymentHandle;
import org.apache.flink.runtime.rescale.RuntimeRescaleCoordinator;
import org.apache.flink.runtime.rescale.RuntimeRescaleRuntimeException;
import org.apache.flink.runtime.rescale.RuntimeRescaleSupporting;
import org.apache.flink.runtime.rescale.RuntimeRescaleUtils;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.StateTransitions;
import org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.shaded.guava31.com.google.common.collect.Streams;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/RuntimeRescaling.class */
public class RuntimeRescaling extends StateWithExecutionGraph {
    public static final String NOTHING_TO_RESCALE = "Nothing to rescale for runtime rescaling.";
    public static final String FAILED_TO_PLAN = "Failed to calculate assignments";
    public static final String CANNOT_RELOCATE = "Cannot runtime rescale when some running execution should be relocated.";
    public static final String CANNOT_RESCALE_IN_RUNTIME = "Cannot runtime rescale some vertices.";
    public static final String NOT_READY_TO_RESCALE_IN_RUNTIME = "The execution graph is not ready to be rescaled in run time.";
    private final Context context;
    private final JobGraphJobInformation jobInformation;
    private final SlotAllocator slotAllocator;
    private final DeclarativeSlotPool declarativeSlotPool;
    private final JobAllocationsInformation prevAllocation;
    private final ArchivedExecutionGraph archivedExecutionGraph;
    private final JobGraph jobGraph;
    private final CompletableFuture<JobGraph> ack;
    private final RuntimeRescaleConfigurationOptions.RuntimeRescaleCheckpointBehavior runtimeRescaleCheckpointBehavior;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/RuntimeRescaling$ActionManager.class */
    public class ActionManager implements Manager {
        private RuntimeRescaleCoordinator coordinator = null;
        private ComponentMainThreadExecutor executor = null;

        private ActionManager() {
        }

        public void init() {
            this.coordinator = (RuntimeRescaleCoordinator) Preconditions.checkNotNull(RuntimeRescaling.this.getRuntimeRescaleCoordinator());
            this.executor = RuntimeRescaling.this.context.mo697getMainThreadExecutor();
            RuntimeRescaleSupporting supporting = this.coordinator.supporting();
            if (!supporting.supported()) {
                throw new RuntimeRescaleRuntimeException("Current job doesn't support runtime rescale " + supporting.reason().toFailureReason().message());
            }
            if (!this.coordinator.enableAction()) {
                throw new RuntimeRescaleRuntimeException("Failed to enable action.");
            }
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.RuntimeRescaling.Manager
        public void finalizeOnSuccess() {
            this.coordinator.disableAction();
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.RuntimeRescaling.Manager
        public void restoreOnFailure() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/RuntimeRescaling$CheckpointingManager.class */
    private class CheckpointingManager implements Manager {
        private CheckpointCoordinator coordinator = null;
        private boolean stopped;

        private CheckpointingManager() {
        }

        public void init() {
            this.coordinator = RuntimeRescaling.this.getExecutionGraph().getCheckpointCoordinator();
            if (this.coordinator == null || !this.coordinator.isPeriodicCheckpointingConfigured()) {
                return;
            }
            assertNotTriggering();
            this.coordinator.stopCheckpointScheduler(CheckpointFailureReason.RUNTIME_RESCALE_SUSPEND);
            this.stopped = true;
            assertNotTriggering();
        }

        private void assertNotTriggering() {
            if (this.coordinator.isTriggering()) {
                throw new RuntimeRescaleRuntimeException("Cannot start runtime rescale action because checkpoint in progress");
            }
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.RuntimeRescaling.Manager
        public void finalizeOnSuccess() {
            if (this.coordinator != null) {
                this.coordinator.updateAfterRescale(RuntimeRescaling.this.getExecutionGraph());
                if (this.stopped) {
                    this.coordinator.startCheckpointScheduler();
                }
            }
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.RuntimeRescaling.Manager
        public void restoreOnFailure() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/RuntimeRescaling$Context.class */
    public interface Context extends StateWithExecutionGraph.Context, StateTransitions.ToCancelling, StateTransitions.ToWaitingForResources, StateTransitions.ToRestarting, StateTransitions.ToFailing, StateTransitions.ToExecuting {
        ScheduledFuture<?> runIfState(State state, Runnable runnable, Duration duration);

        FailureResult howToHandleFailure(Throwable th, CompletableFuture<Map<String, String>> completableFuture);
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/RuntimeRescaling$Factory.class */
    static class Factory implements StateFactory<RuntimeRescaling> {
        private final Context context;
        private final Logger log;
        private final ExecutionGraph executionGraph;
        private final JobGraphJobInformation jobInformation;
        private final ExecutionGraphHandler executionGraphHandler;
        private final OperatorCoordinatorHandler operatorCoordinatorHandler;
        private final ClassLoader userCodeClassLoader;
        private final List<ExceptionHistoryEntry> failureCollection;
        private final SlotAllocator slotAllocator;
        private final DeclarativeSlotPool declarativeSlotPool;
        private final JobGraph jobGraph;
        private final CompletableFuture<JobGraph> ack;
        private final RuntimeRescaleConfigurationOptions.RuntimeRescaleCheckpointBehavior runtimeRescaleCheckpointBehavior;

        public Factory(Context context, ExecutionGraph executionGraph, JobGraphJobInformation jobGraphJobInformation, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Logger logger, ClassLoader classLoader, List<ExceptionHistoryEntry> list, SlotAllocator slotAllocator, DeclarativeSlotPool declarativeSlotPool, JobGraph jobGraph, CompletableFuture<JobGraph> completableFuture, RuntimeRescaleConfigurationOptions.RuntimeRescaleCheckpointBehavior runtimeRescaleCheckpointBehavior) {
            this.context = context;
            this.log = logger;
            this.executionGraph = executionGraph;
            this.jobInformation = jobGraphJobInformation;
            this.executionGraphHandler = executionGraphHandler;
            this.operatorCoordinatorHandler = operatorCoordinatorHandler;
            this.userCodeClassLoader = classLoader;
            this.failureCollection = list;
            this.slotAllocator = slotAllocator;
            this.declarativeSlotPool = declarativeSlotPool;
            this.jobGraph = jobGraph;
            this.ack = completableFuture;
            this.runtimeRescaleCheckpointBehavior = runtimeRescaleCheckpointBehavior;
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.StateFactory
        public Class<RuntimeRescaling> getStateClass() {
            return RuntimeRescaling.class;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.runtime.scheduler.adaptive.StateFactory
        public RuntimeRescaling getState() {
            return new RuntimeRescaling(this.context, this.executionGraph, this.jobInformation, this.executionGraphHandler, this.operatorCoordinatorHandler, this.log, this.userCodeClassLoader, this.failureCollection, this.slotAllocator, this.declarativeSlotPool, this.jobGraph, this.ack, this.runtimeRescaleCheckpointBehavior);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/RuntimeRescaling$GraphManager.class */
    private class GraphManager implements Manager {
        private ExecutionGraph executionGraph;
        private JobSchedulingPlan plan;

        private GraphManager() {
        }

        public void init(Map<JobVertexID, Integer> map, JobSchedulingPlan jobSchedulingPlan) {
            this.plan = (JobSchedulingPlan) Preconditions.checkNotNull(jobSchedulingPlan);
            this.executionGraph = RuntimeRescaling.this.getExecutionGraph();
            if (RuntimeRescaling.this.slotAllocator.hasRelocations(jobSchedulingPlan, RuntimeRescaling.this.prevAllocation)) {
                throw new RuntimeRescaleRuntimeException(RuntimeRescaling.CANNOT_RELOCATE);
            }
            if (!map.equals(this.executionGraph.getVerticesToRescale(map))) {
                throw new RuntimeRescaleRuntimeException(RuntimeRescaling.CANNOT_RESCALE_IN_RUNTIME);
            }
            if (!isReadyToRescale(this.executionGraph, map.keySet())) {
                throw new RuntimeRescaleRuntimeException(RuntimeRescaling.NOT_READY_TO_RESCALE_IN_RUNTIME);
            }
        }

        private boolean isReadyToRescale(ExecutionGraph executionGraph, Set<JobVertexID> set) {
            try {
                return executionGraph.isReadyToRescaleInRunTime(set);
            } catch (Exception e) {
                RuntimeRescaling.this.getLogger().warn("Failed to check isReadyToRescaleInRunTime with :" + e);
                return false;
            }
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.RuntimeRescaling.Manager
        public void finalizeOnSuccess() {
            this.executionGraph.updateSchedulingTopologyByDownscaledVertices();
            this.executionGraph.finishRuntimeRescale();
            Iterator it = IterableUtils.toStream(this.executionGraph.getVerticesTopologically()).map((v0) -> {
                return v0.getJobVertex();
            }).iterator();
            this.executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(this.executionGraph.getJobID(), this.executionGraph.getJobName(), JobType.STREAMING, () -> {
                return it;
            }, this.plan.getVertexParallelism()));
            this.executionGraph.cleanupAfterRuntimeRescale();
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.RuntimeRescaling.Manager
        public void restoreOnFailure() {
            if (this.executionGraph != null) {
                this.executionGraph.cleanupAfterRuntimeRescale();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/RuntimeRescaling$Manager.class */
    public interface Manager {
        void finalizeOnSuccess();

        void restoreOnFailure();
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/RuntimeRescaling$SlotsManager.class */
    private class SlotsManager implements Manager {
        private ReservedSlots slots = null;

        private SlotsManager() {
        }

        public void init(JobSchedulingPlan jobSchedulingPlan, List<Execution> list) {
            this.slots = RuntimeRescaling.this.slotAllocator.reserveAdditionalResources(jobSchedulingPlan, list);
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.RuntimeRescaling.Manager
        public void finalizeOnSuccess() {
        }

        @Override // org.apache.flink.runtime.scheduler.adaptive.RuntimeRescaling.Manager
        public void restoreOnFailure() {
            if (this.slots != null) {
                this.slots.releaseAll(null);
            }
        }
    }

    RuntimeRescaling(Context context, ExecutionGraph executionGraph, JobGraphJobInformation jobGraphJobInformation, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Logger logger, ClassLoader classLoader, List<ExceptionHistoryEntry> list, SlotAllocator slotAllocator, DeclarativeSlotPool declarativeSlotPool, JobGraph jobGraph, CompletableFuture<JobGraph> completableFuture, RuntimeRescaleConfigurationOptions.RuntimeRescaleCheckpointBehavior runtimeRescaleCheckpointBehavior) {
        super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger, classLoader, list);
        this.jobInformation = jobGraphJobInformation;
        this.context = context;
        this.slotAllocator = slotAllocator;
        this.declarativeSlotPool = declarativeSlotPool;
        this.prevAllocation = getAllocationInformation();
        this.archivedExecutionGraph = ArchivedExecutionGraph.createFrom(executionGraph, getJobStatus());
        this.jobGraph = jobGraph;
        this.ack = (CompletableFuture) Preconditions.checkNotNull(completableFuture);
        this.runtimeRescaleCheckpointBehavior = runtimeRescaleCheckpointBehavior;
        context.runIfState(this, this::runtimeRescale, Duration.ZERO);
    }

    public void runtimeRescale() {
        ExecutionGraph executionGraph = getExecutionGraph();
        Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignmentForRuntimeRescaling = this.slotAllocator.determineParallelismAndCalculateAssignmentForRuntimeRescaling(this.jobInformation, this.declarativeSlotPool.getAllSlotsInformation(), this.prevAllocation);
        if (determineParallelismAndCalculateAssignmentForRuntimeRescaling.isEmpty()) {
            fallback(FAILED_TO_PLAN);
            return;
        }
        JobSchedulingPlan jobSchedulingPlan = determineParallelismAndCalculateAssignmentForRuntimeRescaling.get();
        Map<JobVertexID, Integer> parallelismUpdates = jobSchedulingPlan.getParallelismUpdates(executionGraph);
        if (parallelismUpdates.isEmpty()) {
            getLogger().info(NOTHING_TO_RESCALE);
            succeed(new ArrayList());
            return;
        }
        GraphManager graphManager = new GraphManager();
        ActionManager actionManager = new ActionManager();
        SlotsManager slotsManager = new SlotsManager();
        CheckpointingManager checkpointingManager = new CheckpointingManager();
        List<Manager> of = List.of(graphManager, slotsManager, actionManager, checkpointingManager);
        try {
            graphManager.init(parallelismUpdates, jobSchedulingPlan);
            actionManager.init();
            logParallelismChanges(executionGraph, parallelismUpdates);
            executionGraph.modifyByRescale(parallelismUpdates);
            executionGraph.updateSchedulingTopologyByUpscaledVertices();
            List<Execution> executionsCreatedByRescale = executionGraph.getExecutionsCreatedByRescale();
            slotsManager.init(jobSchedulingPlan, executionsCreatedByRescale);
            for (Execution execution : executionsCreatedByRescale) {
                ExecutionVertex vertex = execution.getVertex();
                LogicalSlot slotFor = slotsManager.slots.getSlotFor(vertex.getID());
                Preconditions.checkState(execution.registerProducedPartitions(slotFor.getTaskManagerLocation()).isDone(), "Should be done");
                vertex.tryAssignResource(slotFor);
                execution.transitionState(ExecutionState.SCHEDULED);
            }
            List list = (List) Streams.stream(getExecutionGraph().getAllExecutionVertices()).map(this::buildHandle).collect(Collectors.toList());
            CompletableFuture<?> checkpointCheck = RuntimeRescaleUtils.checkpointCheck(executionGraph.getCheckpointCoordinator(), this.runtimeRescaleCheckpointBehavior, getLogger());
            Objects.requireNonNull(checkpointingManager);
            checkpointCheck.thenRun(checkpointingManager::init).thenCompose(r6 -> {
                return actionManager.coordinator.triggerRuntimeRescaleAction(list, actionManager.executor);
            }).handleAsync((BiFunction<? super U, Throwable, ? extends U>) (r62, th) -> {
                if (th == null) {
                    succeed(of);
                    return null;
                }
                fallback(th, of);
                return null;
            }, (Executor) actionManager.executor);
        } catch (Throwable th2) {
            fallback(th2, of);
        }
    }

    private void logParallelismChanges(ExecutionGraph executionGraph, Map<JobVertexID, Integer> map) {
        for (ExecutionJobVertex executionJobVertex : executionGraph.getVerticesTopologically()) {
            JobVertexID jobVertexId = executionJobVertex.getJobVertexId();
            if (map.containsKey(jobVertexId)) {
                getLogger().info("Runtime Rescaling Job Vertex: {} {} -> {}", new Object[]{executionJobVertex.getName(), Integer.valueOf(executionJobVertex.getParallelism()), map.get(jobVertexId)});
            }
        }
    }

    private RescalingExecutionDeploymentHandle buildHandle(ExecutionVertex executionVertex) {
        return executionVertex.isCreatedByRescale() ? new NewAllocatedRescalingExecutionDeploymentHandle(executionVertex) : executionVertex.getJobVertex().isModifiedInputsOrOutputs() ? new ModifiedRescalingExecutionDeploymentHandle(executionVertex) : new NoModifiedRescalingExecutionDeploymentHandle();
    }

    private void fallback(String str) {
        fallback(new RuntimeRescaleRuntimeException(str));
    }

    private void fallback(Throwable th) {
        fallback(th, new ArrayList());
    }

    private void fallback(Throwable th, List<Manager> list) {
        try {
            list.forEach((v0) -> {
                v0.restoreOnFailure();
            });
        } catch (Throwable th2) {
            getLogger().warn("Failed to restoreOnFailure", th2);
        }
        getLogger().info("Failed to runtime rescale. Restarting.", th);
        this.ack.completeExceptionally(th);
        this.context.goToRestarting(getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler(), Duration.ofMillis(0L), getFailures(), this.prevAllocation);
    }

    private void succeed(List<Manager> list) {
        try {
            list.forEach((v0) -> {
                v0.finalizeOnSuccess();
            });
            getLogger().info("Succeed to runtime rescale. Executing.");
            this.ack.complete(this.jobGraph);
            this.context.goToExecuting(getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler(), getFailures());
        } catch (Throwable th) {
            fallback(th, list);
        }
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph, org.apache.flink.runtime.scheduler.adaptive.State
    public ArchivedExecutionGraph getJob() {
        return this.archivedExecutionGraph;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.State
    public void cancel() {
        this.context.goToCanceling(getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler(), getFailures());
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.State
    public JobStatus getJobStatus() {
        return JobStatus.RUNNING;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph
    void onFailure(Throwable th, CompletableFuture<Map<String, String>> completableFuture) {
        FailureResultUtil.restartOrFail(this.context.howToHandleFailure(th, completableFuture), this.context, this, this.prevAllocation);
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph
    void onGloballyTerminalState(JobStatus jobStatus) {
        this.context.goToFinished(ArchivedExecutionGraph.createFrom(getExecutionGraph()));
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph, org.apache.flink.runtime.scheduler.adaptive.LabeledGlobalFailureHandler
    public /* bridge */ /* synthetic */ void handleGlobalFailure(Throwable th, CompletableFuture completableFuture) {
        super.handleGlobalFailure(th, completableFuture);
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph, org.apache.flink.runtime.scheduler.adaptive.State
    public /* bridge */ /* synthetic */ Logger getLogger() {
        return super.getLogger();
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph, org.apache.flink.runtime.scheduler.adaptive.State
    public /* bridge */ /* synthetic */ void suspend(Throwable th) {
        super.suspend(th);
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph, org.apache.flink.runtime.scheduler.adaptive.State
    public /* bridge */ /* synthetic */ void onLeave(Class cls) {
        super.onLeave(cls);
    }
}
