package org.apache.flink.runtime.rescale;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.RuntimeRescaleCoordinatorConfiguration;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rescale.plan.RuntimeRescalePlanCalculator;
import org.apache.flink.runtime.state.rescale.RuntimeRescaleStorage;
import org.apache.flink.runtime.state.rescale.RuntimeRescaleStorageCoordinatorView;
import org.apache.flink.runtime.state.rescale.RuntimeRescaleStorageLocation;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/rescale/RuntimeRescaleCoordinator.class */
public class RuntimeRescaleCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(RuntimeRescaleCoordinator.class);
    private final JobID jobID;
    private final boolean runtimeRescalingEnabled;
    private final Executor executor;
    private final RuntimeRescalePlanCalculator runtimeRescalePlanCalculator;
    private final RuntimeRescaleStorageCoordinatorView runtimeRescaleStorageView;
    private final RuntimeRescaleIDCounter counter;
    private final RuntimeRescaleTasksHandler runtimeRescaleTasksHandler;
    private final RuntimeRescaleSupporting supporting;
    private final Object enableActionLock = new Object();
    private volatile boolean isActiveAction = false;
    private CompletableFuture<Void> runtimeRescaleActionCompleteFuture = new CompletableFuture<>();
    private boolean baseLocationsForRuntimeRescaleInitialized = false;
    private final ScheduledExecutorService timeoutControlExecutorService;
    private final long timeoutMs;

    public RuntimeRescaleCoordinator(JobID jobID, RuntimeRescaleCoordinatorConfiguration runtimeRescaleCoordinatorConfiguration, RuntimeRescaleStorage runtimeRescaleStorage, Executor executor, RuntimeRescalePlanCalculator runtimeRescalePlanCalculator, RuntimeRescaleIDCounter runtimeRescaleIDCounter, RuntimeRescaleSupporting runtimeRescaleSupporting) {
        this.jobID = (JobID) Preconditions.checkNotNull(jobID);
        this.runtimeRescalingEnabled = runtimeRescaleCoordinatorConfiguration.isRuntimeRescalingEnabled();
        this.timeoutMs = runtimeRescaleCoordinatorConfiguration.getRuntimeRescalingTimeout().toMillis();
        this.executor = (Executor) Preconditions.checkNotNull(executor);
        this.runtimeRescalePlanCalculator = (RuntimeRescalePlanCalculator) Preconditions.checkNotNull(runtimeRescalePlanCalculator);
        try {
            this.runtimeRescaleStorageView = ((RuntimeRescaleStorage) Preconditions.checkNotNull(runtimeRescaleStorage)).createRuntimeRescaleStorage(jobID);
            this.counter = (RuntimeRescaleIDCounter) Preconditions.checkNotNull(runtimeRescaleIDCounter);
            try {
                runtimeRescaleIDCounter.start();
                this.runtimeRescaleTasksHandler = new RuntimeRescaleTasksHandler(this);
                this.supporting = (RuntimeRescaleSupporting) Preconditions.checkNotNull(runtimeRescaleSupporting);
                this.timeoutControlExecutorService = Executors.newScheduledThreadPool(1);
            } catch (Throwable th) {
                throw new RuntimeException("Failed to start runtime rescale ID counter: " + th.getMessage(), th);
            }
        } catch (IOException e) {
            throw new FlinkRuntimeException("Failed to create runtime rescale storage at runtime rescale coordinator side.", e);
        }
    }

    public CompletableFuture<Void> triggerRuntimeRescaleAction(RuntimeRescaleSchedulerOperations runtimeRescaleSchedulerOperations, Map<JobVertexID, Integer> map, ComponentMainThreadExecutor componentMainThreadExecutor) {
        Preconditions.checkState(hasActiveAction());
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
            runtimeRescaleSchedulerOperations.startRuntimeRescale(map);
        }, componentMainThreadExecutor);
        Objects.requireNonNull(runtimeRescaleSchedulerOperations);
        CompletableFuture thenCompose = runAsync.thenRun(runtimeRescaleSchedulerOperations::updateSchedulingTopologyByUpscaledVertices).thenApply(r3 -> {
            return runtimeRescaleSchedulerOperations.rescheduleByRescale();
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) this::triggerRuntimeRescaleTasksHandler);
        Objects.requireNonNull(runtimeRescaleSchedulerOperations);
        CompletableFuture<Void> thenRun = thenCompose.thenRun(runtimeRescaleSchedulerOperations::updateSchedulingTopologyByDownscaledVertices);
        Objects.requireNonNull(runtimeRescaleSchedulerOperations);
        CompletableFuture<Void> thenRun2 = thenRun.thenRun(runtimeRescaleSchedulerOperations::finishRuntimeRescale);
        scheduleTimeoutControlFuture(thenRun2);
        thenRun2.whenComplete((r4, th) -> {
            if (th != null) {
                this.runtimeRescaleActionCompleteFuture.completeExceptionally(th);
            } else {
                this.runtimeRescaleActionCompleteFuture.complete(null);
            }
        });
        return this.runtimeRescaleActionCompleteFuture;
    }

    public CompletableFuture<Void> triggerRuntimeRescaleAction(List<RescalingExecutionDeploymentHandle> list, ComponentMainThreadExecutor componentMainThreadExecutor) {
        componentMainThreadExecutor.assertRunningInMainThread();
        Preconditions.checkState(hasActiveAction());
        CompletableFuture<Acknowledge> triggerRuntimeRescaleTasksHandler = triggerRuntimeRescaleTasksHandler(Collections.singletonList(new RescaledPipelinedRegion(CompletableFuture.completedFuture(list))));
        scheduleTimeoutControlFuture(triggerRuntimeRescaleTasksHandler);
        triggerRuntimeRescaleTasksHandler.whenComplete((acknowledge, th) -> {
            if (th != null) {
                this.runtimeRescaleActionCompleteFuture.completeExceptionally(th);
            } else {
                this.runtimeRescaleActionCompleteFuture.complete(null);
            }
        });
        return this.runtimeRescaleActionCompleteFuture;
    }

    private void scheduleTimeoutControlFuture(CompletableFuture<?> completableFuture) {
        this.timeoutControlExecutorService.schedule(() -> {
            return Boolean.valueOf(completableFuture.completeExceptionally(new TimeoutException(String.format("Timeout (%s ms) occurred during Runtime Rescaling", Long.valueOf(this.timeoutMs)))));
        }, this.timeoutMs, TimeUnit.MILLISECONDS);
    }

    public void acknowledgeRuntimeRescale(JobID jobID, ExecutionAttemptID executionAttemptID, long j) throws RuntimeRescaleException {
        if (!this.jobID.equals(jobID)) {
            failRuntimeRescaleEvent(new IllegalArgumentException("Received runtime rescale acknowledge message for job " + jobID + " and attempt " + executionAttemptID + " while this coordinator handles job " + this.jobID));
            return;
        }
        if (j != getRuntimeRescaleActionId()) {
            failRuntimeRescaleEvent(new IllegalArgumentException("Received runtime rescale acknowledge message for runtime rescale id " + j + " and attempt " + this + " while this coordinator handles runtime rescale id " + executionAttemptID));
            return;
        }
        LOG.info("Runtime rescale event with id {} for job {} was acknowledged by execution {}.", new Object[]{Long.valueOf(j), jobID, executionAttemptID});
        if (this.runtimeRescaleTasksHandler.acknowledgeTask(executionAttemptID)) {
            this.runtimeRescaleTasksHandler.finishAcknowledging();
        }
    }

    public void declineRuntimeRescale(JobID jobID, ExecutionAttemptID executionAttemptID, long j, Throwable th) throws RuntimeRescaleException {
        if (!this.jobID.equals(jobID)) {
            failRuntimeRescaleEvent(new IllegalArgumentException("Received DeclineRuntimeRescaleMessage message for job " + jobID + " and attempt " + executionAttemptID + " while this coordinator handles job " + this.jobID, th));
        } else if (j != getRuntimeRescaleActionId()) {
            failRuntimeRescaleEvent(new IllegalArgumentException("Received DeclineRuntimeRescaleMessage message for runtime rescale id " + j + " and attempt " + this + " while this coordinator handles runtime rescale id " + executionAttemptID, th));
        } else {
            LOG.info("Runtime rescale event with id {} for job {} was declined by execution {} with next error.", new Object[]{Long.valueOf(j), jobID, executionAttemptID, th});
            failRuntimeRescaleEvent(th);
        }
    }

    public void failRuntimeRescaleEvent(Throwable th) {
        this.runtimeRescaleTasksHandler.failAcknowledging(th);
        this.runtimeRescaleActionCompleteFuture.completeExceptionally(th);
    }

    public long getRuntimeRescaleActionId() {
        return this.counter.get();
    }

    public boolean isRuntimeRescalingEnabled() {
        return this.runtimeRescalingEnabled;
    }

    public RuntimeRescaleSupporting supporting() {
        return this.supporting;
    }

    public boolean hasActiveAction() {
        return this.isActiveAction;
    }

    public boolean enableAction() {
        if (hasActiveAction()) {
            return false;
        }
        synchronized (this.enableActionLock) {
            if (hasActiveAction()) {
                return false;
            }
            this.runtimeRescaleTasksHandler.reset();
            this.runtimeRescaleActionCompleteFuture = new CompletableFuture<>();
            setNewRuntimeRescaleId();
            this.isActiveAction = true;
            return true;
        }
    }

    public void disableAction() {
        this.isActiveAction = false;
    }

    private CompletableFuture<Acknowledge> triggerRuntimeRescaleTasksHandler(List<RescaledPipelinedRegion> list) {
        long currentTimeMillis = System.currentTimeMillis();
        return this.runtimeRescalePlanCalculator.calculateRuntimeRescalePlan().thenCompose(runtimeRescalePlan -> {
            try {
                this.runtimeRescaleTasksHandler.initTasksHandler(runtimeRescalePlan);
                long runtimeRescaleActionId = getRuntimeRescaleActionId();
                ArrayList arrayList = new ArrayList(list.size());
                list.forEach(rescaledPipelinedRegion -> {
                    arrayList.add(rescaledPipelinedRegion.deployOrRedeploy(runtimeRescaleActionId, currentTimeMillis));
                });
                return FutureUtils.completeAll(arrayList).thenComposeAsync(r12 -> {
                    ArrayList arrayList2 = new ArrayList();
                    Iterator<Execution> it = runtimeRescalePlan.getTasksToTrigger().iterator();
                    while (it.hasNext()) {
                        arrayList2.add(it.next().triggerRuntimeRescale(runtimeRescaleActionId, currentTimeMillis));
                    }
                    return FutureUtils.completeAll(arrayList2);
                }, this.executor).thenCompose(r3 -> {
                    return this.runtimeRescaleTasksHandler.getAllTasksAcknowledgedFuture();
                }).whenComplete((acknowledge, th) -> {
                    this.runtimeRescaleTasksHandler.cleanupTasksHandler();
                });
            } catch (Throwable th2) {
                throw new CompletionException(th2);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RuntimeRescaleStorageLocation initializeRuntimeRescaleLocation() throws IOException {
        long runtimeRescaleActionId = getRuntimeRescaleActionId();
        boolean z = !this.baseLocationsForRuntimeRescaleInitialized;
        this.baseLocationsForRuntimeRescaleInitialized = true;
        if (z) {
            this.runtimeRescaleStorageView.initializeBaseLocationsForRuntimeRescaling();
        }
        return this.runtimeRescaleStorageView.initializeLocationForRuntimeRescaleEvent(runtimeRescaleActionId);
    }

    private void setNewRuntimeRescaleId() {
        try {
            this.counter.getAndIncrement();
        } catch (Exception e) {
            throw new RuntimeException("Error with runtime rescale id counter.", e);
        }
    }
}
