package org.apache.flink.runtime.dispatcher.runner;

import java.util.Arrays;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.class */
public final class DefaultDispatcherRunner implements DispatcherRunner, LeaderContender {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultDispatcherRunner.class);
    private final LeaderElection leaderElection;
    private final FatalErrorHandler fatalErrorHandler;
    private final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory;
    private final Object lock = new Object();
    private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
    private final CompletableFuture<ApplicationStatus> shutDownFuture = new CompletableFuture<>();
    private boolean running = true;
    private DispatcherLeaderProcess dispatcherLeaderProcess = StoppedDispatcherLeaderProcess.INSTANCE;
    private CompletableFuture<Void> previousDispatcherLeaderProcessTerminationFuture = CompletableFuture.completedFuture(null);

    private DefaultDispatcherRunner(LeaderElection leaderElection, FatalErrorHandler fatalErrorHandler, DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) {
        this.leaderElection = leaderElection;
        this.fatalErrorHandler = fatalErrorHandler;
        this.dispatcherLeaderProcessFactory = dispatcherLeaderProcessFactory;
    }

    void start() throws Exception {
        this.leaderElection.startLeaderElection(this);
    }

    @Override // org.apache.flink.runtime.dispatcher.runner.DispatcherRunner
    public CompletableFuture<ApplicationStatus> getShutDownFuture() {
        return this.shutDownFuture;
    }

    public CompletableFuture<Void> closeAsync() {
        synchronized (this.lock) {
            if (!this.running) {
                return this.terminationFuture;
            }
            this.running = false;
            CompletableFuture<Void> stopLeaderElectionService = stopLeaderElectionService();
            stopDispatcherLeaderProcess();
            FutureUtils.forward(this.previousDispatcherLeaderProcessTerminationFuture, this.terminationFuture);
            return FutureUtils.completeAll(Arrays.asList(this.terminationFuture, stopLeaderElectionService));
        }
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void grantLeadership(UUID uuid) {
        runActionIfRunning(() -> {
            LOG.info("{} was granted leadership with leader id {}. Creating new {}.", new Object[]{getClass().getSimpleName(), uuid, DispatcherLeaderProcess.class.getSimpleName()});
            startNewDispatcherLeaderProcess(uuid);
        });
    }

    private void startNewDispatcherLeaderProcess(UUID uuid) {
        stopDispatcherLeaderProcess();
        this.dispatcherLeaderProcess = createNewDispatcherLeaderProcess(uuid);
        DispatcherLeaderProcess dispatcherLeaderProcess = this.dispatcherLeaderProcess;
        CompletableFuture<Void> completableFuture = this.previousDispatcherLeaderProcessTerminationFuture;
        Objects.requireNonNull(dispatcherLeaderProcess);
        FutureUtils.assertNoException(completableFuture.thenRun(dispatcherLeaderProcess::start));
    }

    private void stopDispatcherLeaderProcess() {
        this.previousDispatcherLeaderProcessTerminationFuture = FutureUtils.completeAll(Arrays.asList(this.previousDispatcherLeaderProcessTerminationFuture, this.dispatcherLeaderProcess.closeAsync()));
    }

    private DispatcherLeaderProcess createNewDispatcherLeaderProcess(UUID uuid) {
        LOG.debug("Create new {} with leader session id {}.", DispatcherLeaderProcess.class.getSimpleName(), WebMonitorEndpoint.HIDDEN_CONTENT);
        DispatcherLeaderProcess create = this.dispatcherLeaderProcessFactory.create(uuid);
        forwardShutDownFuture(create);
        forwardConfirmLeaderSessionFuture(uuid, create);
        return create;
    }

    private void forwardShutDownFuture(DispatcherLeaderProcess dispatcherLeaderProcess) {
        dispatcherLeaderProcess.getShutDownFuture().whenComplete((applicationStatus, th) -> {
            synchronized (this.lock) {
                if (this.running && this.dispatcherLeaderProcess == dispatcherLeaderProcess) {
                    if (th != null) {
                        this.shutDownFuture.completeExceptionally(th);
                    } else {
                        this.shutDownFuture.complete(applicationStatus);
                    }
                }
            }
        });
    }

    private void forwardConfirmLeaderSessionFuture(UUID uuid, DispatcherLeaderProcess dispatcherLeaderProcess) {
        FutureUtils.assertNoException(dispatcherLeaderProcess.getLeaderAddressFuture().thenCompose(str -> {
            return this.leaderElection.confirmLeadershipAsync(uuid, str);
        }));
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void revokeLeadership() {
        runActionIfRunning(() -> {
            LOG.info("{} was revoked the leadership with leader id {}. Stopping the {}.", new Object[]{getClass().getSimpleName(), this.dispatcherLeaderProcess.getLeaderSessionId(), DispatcherLeaderProcess.class.getSimpleName()});
            stopDispatcherLeaderProcess();
        });
    }

    private CompletableFuture<Void> stopLeaderElectionService() {
        try {
            this.leaderElection.close();
            return FutureUtils.completedVoidFuture();
        } catch (Exception e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    private void runActionIfRunning(Runnable runnable) {
        synchronized (this.lock) {
            if (this.running) {
                runnable.run();
            } else {
                LOG.debug("Ignoring action because {} has already been stopped.", getClass().getSimpleName());
            }
        }
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void handleError(Exception exc) {
        this.fatalErrorHandler.onFatalError(new FlinkException(String.format("Exception during leader election of %s occurred.", getClass().getSimpleName()), exc));
    }

    public static DispatcherRunner create(LeaderElection leaderElection, FatalErrorHandler fatalErrorHandler, DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) throws Exception {
        DefaultDispatcherRunner defaultDispatcherRunner = new DefaultDispatcherRunner(leaderElection, fatalErrorHandler, dispatcherLeaderProcessFactory);
        defaultDispatcherRunner.start();
        return defaultDispatcherRunner;
    }
}
