package org.apache.flink.runtime.rescaling;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rescaling.controller.ScheduleController;
import org.apache.flink.runtime.rescaling.provider.RescaleProvider;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo;
import org.apache.flink.util.FlinkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/rescaling/ScalableJob.class */
public class ScalableJob implements Runnable {
    private JobGraph runningJob;
    private final ExecutionConfig executionConfig;
    private final RescaleManager rescaleManager;
    private long latestRescale;
    private long rescalesFailed;
    private Thread worker;
    private final ScheduleController scheduleController;
    private final RescaleProvider rescaleProvider;
    private final Semaphore rescalingSemaphore;
    private final boolean checkFreeResources;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final AtomicBoolean forceRescaling = new AtomicBoolean(false);
    private JobState jobState = JobState.STABLE;
    private final List<JobGraph> jobHistory = new ArrayList();
    private JobTransition currentTransition = null;

    /* loaded from: input_file:org/apache/flink/runtime/rescaling/ScalableJob$JobState.class */
    public enum JobState {
        STABLE,
        SCALING,
        TERMINATED
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public ScalableJob(JobGraph jobGraph, RescaleManager rescaleManager, ScheduleController scheduleController, RescaleProvider rescaleProvider, boolean z, Semaphore semaphore) throws FlinkException {
        this.runningJob = jobGraph;
        this.executionConfig = RescaleUtils.getJobExecutionConfig(jobGraph);
        this.rescaleManager = rescaleManager;
        this.jobHistory.add(this.runningJob);
        this.scheduleController = scheduleController;
        this.rescaleProvider = rescaleProvider;
        this.checkFreeResources = z;
        this.rescalingSemaphore = semaphore;
        this.latestRescale = System.currentTimeMillis();
        this.rescalesFailed = 0L;
    }

    public void start() {
        this.worker = new Thread(this);
        this.worker.start();
    }

    public void stop() {
        this.running.set(false);
    }

    @Override // java.lang.Runnable
    public void run() {
        this.running.set(true);
        while (this.running.get()) {
            try {
            } catch (FlinkException e) {
                this.log.error(e.getMessage());
                registerError();
            } catch (IllegalStateException e2) {
                this.log.error(e2.getMessage());
            } catch (InterruptedException e3) {
                Thread.currentThread().interrupt();
                this.log.error("Thread was interrupted, Failed to complete operation");
            }
            if (this.currentTransition == null && this.jobState == JobState.STABLE) {
                if (this.forceRescaling.get() || this.scheduleController.readyToRescale(this)) {
                    this.forceRescaling.set(false);
                    Optional<JobGraph> rescaledJob = this.rescaleProvider.getRescaledJob(this.runningJob, this.jobHistory, expectedFreeSlots(this.runningJob));
                    if (!rescaledJob.isPresent()) {
                        this.log.info("Skip rescale for Job " + this.runningJob.getJobID());
                        setLatestRescale(System.currentTimeMillis());
                    } else if (!this.checkFreeResources || resourcesCheck(rescaledJob.get())) {
                        rescale(rescaledJob.get());
                    } else {
                        this.log.info("Not enough resources to rescale Job " + this.runningJob.getJobID());
                        setLatestRescale(System.currentTimeMillis());
                    }
                }
            }
            if (this.currentTransition != null) {
                if (this.currentTransition.isScaleDown()) {
                    this.jobHistory.add(this.currentTransition.to);
                    this.rescalingSemaphore.acquire();
                    this.currentTransition.start();
                    this.rescalingSemaphore.release();
                } else if (this.rescalingSemaphore.tryAcquire()) {
                    this.jobHistory.add(this.currentTransition.to);
                    this.currentTransition.start();
                    this.rescalingSemaphore.release();
                }
                this.log.info("Job Transition result: \n" + this.currentTransition.toString());
                if (this.currentTransition.isTransitionSuccessful()) {
                    this.runningJob = this.currentTransition.to;
                    this.latestRescale = System.currentTimeMillis();
                    this.rescaleProvider.notifyParallelismChanged(this.runningJob);
                } else {
                    this.jobHistory.remove(this.currentTransition.to);
                }
                this.jobState = JobState.STABLE;
                this.currentTransition = null;
            }
        }
    }

    private int expectedFreeSlots(JobGraph jobGraph) {
        Time seconds = Time.seconds(30L);
        int intValue = RescaleUtils.calculateSlotRequirements(jobGraph).intValue();
        int i = Integer.MAX_VALUE;
        if (jobGraph.getJobResourceLimitation().isPresent()) {
            i = jobGraph.getJobResourceLimitation().get().getTmSlotsLimit();
        }
        try {
            Boolean bool = (Boolean) this.rescaleManager.resourceManagerGatewayRetriever.getFuture().thenApply((v0) -> {
                return v0.isDetailedResourceCheckSupported();
            }).get();
            long numberFreeSlots = ((ResourceOverview) this.rescaleManager.resourceManagerGatewayRetriever.getFuture().thenApply(resourceManagerGateway -> {
                return resourceManagerGateway.requestResourceOverview(seconds);
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity()).get()).getNumberFreeSlots() + intValue;
            if (bool.booleanValue()) {
                numberFreeSlots += ((Long) this.rescaleManager.resourceManagerGatewayRetriever.getFuture().thenApply((v0) -> {
                    return v0.getMaximumExtraSlots();
                }).get()).longValue();
            }
            return (int) Math.min(numberFreeSlots, i);
        } catch (Exception e) {
            return intValue;
        }
    }

    private void registerError() {
        this.rescalesFailed++;
        if (this.rescalesFailed >= 3) {
            stop();
            this.jobState = JobState.TERMINATED;
            this.log.info("ScalableJob was terminated because of too many errors.");
        }
    }

    private boolean resourcesCheck(JobGraph jobGraph) {
        Time seconds = Time.seconds(30L);
        int intValue = RescaleUtils.calculateSlotRequirements(this.runningJob).intValue();
        int intValue2 = RescaleUtils.calculateSlotRequirements(jobGraph).intValue();
        if (intValue2 - intValue <= 0) {
            return true;
        }
        try {
            Boolean bool = (Boolean) this.rescaleManager.resourceManagerGatewayRetriever.getFuture().thenApply((v0) -> {
                return v0.isDetailedResourceCheckSupported();
            }).get();
            this.log.info("Detailed resource check is " + (bool.booleanValue() ? CheckpointConfigInfo.ExternalizedCheckpointInfo.FIELD_NAME_ENABLED : "disabled"));
            if (bool.booleanValue()) {
                int i = intValue2 - intValue;
                return ((Boolean) this.rescaleManager.resourceManagerGatewayRetriever.getFuture().thenApply(resourceManagerGateway -> {
                    return Boolean.valueOf(resourceManagerGateway.checkResourceAvailabilityForSlots(i));
                }).get()).booleanValue();
            }
            int numberFreeSlots = ((ResourceOverview) this.rescaleManager.resourceManagerGatewayRetriever.getFuture().thenApply(resourceManagerGateway2 -> {
                return resourceManagerGateway2.requestResourceOverview(seconds);
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity()).get()).getNumberFreeSlots() + intValue;
            boolean booleanValue = ((Boolean) this.rescaleManager.dispatcher.listJobs(seconds).thenApply(collection -> {
                boolean z = false;
                try {
                    Iterator it = collection.iterator();
                    while (it.hasNext()) {
                        JobStatus jobStatus = this.rescaleManager.dispatcher.requestJobStatus((JobID) it.next(), seconds).get();
                        if (jobStatus == JobStatus.CREATED || jobStatus == JobStatus.RESTARTING || jobStatus == JobStatus.SUSPENDED) {
                            z = true;
                            break;
                        }
                    }
                } catch (InterruptedException | ExecutionException e) {
                    z = true;
                }
                return Boolean.valueOf(z);
            }).get()).booleanValue();
            this.log.info("scaled job slot requirements: " + intValue2);
            this.log.info("free slots after job stops: " + numberFreeSlots);
            return intValue2 <= numberFreeSlots && !booleanValue;
        } catch (InterruptedException | ExecutionException e) {
            return false;
        }
    }

    public void forceRescale() {
        this.forceRescaling.set(true);
    }

    public void rescale(JobGraph jobGraph) throws FlinkException {
        if (this.jobState != JobState.STABLE) {
            throw new IllegalStateException("Rescaling job " + this.runningJob.getJobID() + " is not possible. ScalableJob is not STABLE, but " + this.jobState);
        }
        this.currentTransition = new JobTransition(this.runningJob, jobGraph, this.rescaleManager.dispatcher);
        this.jobState = JobState.SCALING;
    }

    public long getLatestRescale() {
        return this.latestRescale;
    }

    public void setLatestRescale(long j) {
        this.latestRescale = j;
    }

    public JobGraph getRunningJob() {
        return this.runningJob;
    }

    public JobTransition getCurrentTransition() {
        return this.currentTransition;
    }

    public List<JobGraph> getJobHistory() {
        return this.jobHistory;
    }

    public JobState getJobState() {
        return this.jobState;
    }
}
