package org.apache.flink.runtime.rescaling;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rescaling.controller.ScheduleController;
import org.apache.flink.runtime.rescaling.provider.RescaleProvider;
import org.apache.flink.util.FlinkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/rescaling/RedeployableJob.class */
public class RedeployableJob implements Runnable {
    private JobGraph runningJob;
    private final ExecutionConfig executionConfig;
    private final DeploymentManager deploymentManager;
    private Thread worker;
    private final ScheduleController scheduleController;
    private final RescaleProvider rescaleProvider;
    private final Semaphore rescalingSemaphore;
    private final ComponentMainThreadExecutor dispatcherExecutor;
    private final int redeployingJobIndex;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    private JobState jobState = JobState.STABLE;
    private final List<JobGraph> jobHistory = new ArrayList();
    private JobTransition currentTransition = null;

    /* loaded from: input_file:org/apache/flink/runtime/rescaling/RedeployableJob$JobState.class */
    public enum JobState {
        STABLE,
        SCALING,
        TERMINATED
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public RedeployableJob(JobGraph jobGraph, DeploymentManager deploymentManager, ScheduleController scheduleController, RescaleProvider rescaleProvider, Semaphore semaphore, ComponentMainThreadExecutor componentMainThreadExecutor, int i) throws FlinkException {
        this.runningJob = jobGraph;
        this.executionConfig = RedeploymentUtils.getJobExecutionConfig(jobGraph);
        this.deploymentManager = deploymentManager;
        this.jobHistory.add(this.runningJob);
        this.scheduleController = scheduleController;
        this.rescaleProvider = rescaleProvider;
        this.rescalingSemaphore = semaphore;
        this.redeployingJobIndex = i;
        this.dispatcherExecutor = componentMainThreadExecutor;
        this.scheduleController.registerScalableJob(this);
        this.scheduleController.schedule();
    }

    public void start() {
        this.worker = new Thread(this);
        this.worker.start();
    }

    public void stop() {
        this.scheduleController.cancel();
    }

    public Optional<JobGraph> generateJobGraphWithLimitation(int i, int i2) {
        try {
            return this.rescaleProvider.getRescaledJob(this.runningJob, this.jobHistory, i, i2);
        } catch (FlinkException e) {
            this.log.info(e.getMessage());
            return Optional.empty();
        }
    }

    public JobGraph generateOptimalJobGraph() {
        return generateJobGraphWithLimitation(-1, -1).orElse(this.runningJob);
    }

    public boolean processRedeployment(JobGraph jobGraph) {
        this.currentTransition = new JobTransition(this.runningJob, jobGraph, this.deploymentManager.dispatcher, this.dispatcherExecutor);
        this.jobState = JobState.SCALING;
        try {
            this.jobHistory.add(this.currentTransition.to);
            this.deploymentManager.notifyRedeployingStarted(this.runningJob.getJobID());
            this.currentTransition.start();
            this.deploymentManager.notifyRedeployingFinished(this.runningJob.getJobID());
            if (!this.currentTransition.isTransitionSuccessful()) {
                this.jobHistory.remove(this.currentTransition.to);
                this.jobState = JobState.STABLE;
                this.currentTransition = null;
                return false;
            }
            this.runningJob = this.currentTransition.to;
            this.rescaleProvider.notifyParallelismChanged(this.runningJob);
            this.jobState = JobState.STABLE;
            this.currentTransition = null;
            this.deploymentManager.resourceManagerGatewayRetriever.getFuture().get().notifyResourceUsage();
            return true;
        } catch (Exception e) {
            this.log.warn("Error: {}", e.getClass().getName());
            return false;
        }
    }

    public void silentRedeployment() {
        try {
            if (this.jobState == JobState.STABLE) {
                this.deploymentManager.resourceManagerGatewayRetriever.getFuture().get().jobRedeploying(this.redeployingJobIndex);
            }
        } catch (IllegalStateException | InterruptedException | ExecutionException e) {
            this.log.error(e.getMessage());
        }
        this.scheduleController.schedule();
    }

    public boolean internalRedeployment(int i, int i2, boolean z) {
        boolean z2 = false;
        try {
            if (this.jobState == JobState.STABLE) {
                Optional<JobGraph> generateJobGraphWithLimitation = generateJobGraphWithLimitation(i, i2);
                if (!z && !generateJobGraphWithLimitation.isPresent()) {
                    this.log.info("Skip silent rescaling as it changes nothing.");
                    return false;
                }
                JobGraph orElse = generateJobGraphWithLimitation.orElse(this.runningJob);
                if (i2 < RedeploymentUtils.calculateSlotRequirements(orElse).intValue()) {
                    this.log.info("Job cannot be run on the existing resources.");
                    return false;
                }
                this.log.info("Rescaled job {} needs {} slots", orElse.getJobID(), RedeploymentUtils.calculateSlotRequirements(orElse));
                z2 = processRedeployment(orElse);
                this.log.info("Job rescaled: {}", Boolean.valueOf(z2));
            }
        } catch (Exception e) {
            this.log.info("Interrupted during rescaling wait.");
        }
        return z2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getSlotsRequiredForLimitation(int i, int i2) {
        JobGraph jobGraph = this.runningJob;
        try {
            jobGraph = this.rescaleProvider.getRescaledJob(this.runningJob, this.jobHistory, i, i2).orElse(this.runningJob);
        } catch (FlinkException e) {
            this.log.warn("Exception caught during job scaling: {}", e.getMessage());
        }
        this.log.info("Rescaled job {} in diapason [{}, {}]. Expected slots: {}", jobGraph.getJobID(), Integer.valueOf(i), Integer.valueOf(i2), RedeploymentUtils.calculateSlotRequirements(jobGraph));
        return RedeploymentUtils.calculateSlotRequirements(jobGraph).intValue();
    }

    @Override // java.lang.Runnable
    public void run() {
        silentRedeployment();
    }

    public JobGraph getRunningJob() {
        return this.runningJob;
    }

    public JobTransition getCurrentTransition() {
        return this.currentTransition;
    }

    public List<JobGraph> getJobHistory() {
        return this.jobHistory;
    }

    public JobState getJobState() {
        return this.jobState;
    }
}
