package org.apache.flink.runtime.resourcemanager;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.rescaling.DeploymentManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.class */
public class ResourceManagerServiceImpl implements ResourceManagerService, LeaderContender {
    private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerServiceImpl.class);
    private final ResourceManagerFactory<?> resourceManagerFactory;
    private final ResourceManagerProcessContext rmProcessContext;
    private final LeaderElectionService leaderElectionService;
    private final FatalErrorHandler fatalErrorHandler;
    private final Executor ioExecutor;
    private final Object lock = new Object();
    private final ExecutorService handleLeaderEventExecutor = Executors.newSingleThreadExecutor();
    private final CompletableFuture<Void> serviceTerminationFuture = new CompletableFuture<>();

    @GuardedBy("lock")
    private boolean running = false;

    @GuardedBy("lock")
    @Nullable
    private ResourceManager<?> leaderResourceManager = null;

    @GuardedBy("lock")
    @Nullable
    private UUID leaderSessionID = null;

    @GuardedBy("lock")
    private CompletableFuture<Void> previousResourceManagerTerminationFuture = FutureUtils.completedVoidFuture();

    private ResourceManagerServiceImpl(ResourceManagerFactory<?> resourceManagerFactory, ResourceManagerProcessContext resourceManagerProcessContext) {
        this.resourceManagerFactory = (ResourceManagerFactory) Preconditions.checkNotNull(resourceManagerFactory);
        this.rmProcessContext = (ResourceManagerProcessContext) Preconditions.checkNotNull(resourceManagerProcessContext);
        this.leaderElectionService = resourceManagerProcessContext.getHighAvailabilityServices().getResourceManagerLeaderElectionService();
        this.fatalErrorHandler = resourceManagerProcessContext.getFatalErrorHandler();
        this.ioExecutor = resourceManagerProcessContext.getIoExecutor();
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManagerService
    public void start() throws Exception {
        synchronized (this.lock) {
            if (this.running) {
                LOG.debug("Resource manager service has already started.");
                return;
            }
            this.running = true;
            LOG.info("Starting resource manager service.");
            this.leaderElectionService.start(this);
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManagerService
    public CompletableFuture<Void> getTerminationFuture() {
        return this.serviceTerminationFuture;
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManagerService
    public CompletableFuture<Void> deregisterApplication(ApplicationStatus applicationStatus, @Nullable String str) {
        synchronized (this.lock) {
            if (!this.running || this.leaderResourceManager == null) {
                return deregisterWithoutLeaderRm();
            }
            ResourceManager<?> resourceManager = this.leaderResourceManager;
            return resourceManager.getStartedFuture().thenCompose(r8 -> {
                synchronized (this.lock) {
                    if (isLeader(resourceManager)) {
                        return resourceManager.getSelfGateway(ResourceManagerGateway.class).deregisterApplication(applicationStatus, str).thenApply(acknowledge -> {
                            return null;
                        });
                    }
                    return deregisterWithoutLeaderRm();
                }
            });
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManagerService
    public boolean isJmMigrationWasStarted() {
        if (this.leaderResourceManager == null) {
            return false;
        }
        return this.leaderResourceManager.isJmMigrationWasStarted();
    }

    private static CompletableFuture<Void> deregisterWithoutLeaderRm() {
        LOG.warn("Cannot deregister application. Resource manager service is not available.");
        return FutureUtils.completedVoidFuture();
    }

    public CompletableFuture<Void> closeAsync() {
        synchronized (this.lock) {
            if (this.running) {
                LOG.info("Stopping resource manager service.");
                this.running = false;
                stopLeaderElectionService();
                stopLeaderResourceManager();
            } else {
                LOG.debug("Resource manager service is not running.");
            }
            FutureUtils.forward(this.previousResourceManagerTerminationFuture, this.serviceTerminationFuture);
        }
        this.handleLeaderEventExecutor.shutdownNow();
        return this.serviceTerminationFuture;
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void grantLeadership(UUID uuid) {
        this.handleLeaderEventExecutor.execute(() -> {
            synchronized (this.lock) {
                if (!this.running) {
                    LOG.info("Resource manager service is not running. Ignore granting leadership with session ID {}.", uuid);
                    return;
                }
                LOG.info("Resource manager service is granted leadership with session id {}.", uuid);
                try {
                    startNewLeaderResourceManager(uuid);
                } catch (Throwable th) {
                    this.fatalErrorHandler.onFatalError(new FlinkException("Cannot start resource manager.", th));
                }
            }
        });
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void revokeLeadership() {
        this.handleLeaderEventExecutor.execute(() -> {
            synchronized (this.lock) {
                if (!this.running) {
                    LOG.info("Resource manager service is not running. Ignore revoking leadership.");
                    return;
                }
                LOG.info("Resource manager service is revoked leadership with session id {}.", this.leaderSessionID);
                stopLeaderResourceManager();
                if (!this.resourceManagerFactory.supportMultiLeaderSession()) {
                    closeAsync();
                }
            }
        });
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void handleError(Exception exc) {
        this.fatalErrorHandler.onFatalError(new FlinkException("Exception during leader election of resource manager occurred.", exc));
    }

    @GuardedBy("lock")
    private void startNewLeaderResourceManager(UUID uuid) throws Exception {
        stopLeaderResourceManager();
        this.leaderSessionID = uuid;
        this.leaderResourceManager = this.resourceManagerFactory.createResourceManager(this.rmProcessContext, uuid);
        ResourceManager<?> resourceManager = this.leaderResourceManager;
        this.previousResourceManagerTerminationFuture.thenComposeAsync(r5 -> {
            CompletableFuture<Boolean> startResourceManagerIfIsLeader;
            synchronized (this.lock) {
                startResourceManagerIfIsLeader = startResourceManagerIfIsLeader(resourceManager);
            }
            return startResourceManagerIfIsLeader;
        }, (Executor) this.handleLeaderEventExecutor).thenAcceptAsync((Consumer<? super U>) bool -> {
            if (bool.booleanValue()) {
                this.leaderElectionService.confirmLeadership(uuid, resourceManager.getAddress());
            }
        }, this.ioExecutor);
    }

    @GuardedBy("lock")
    private CompletableFuture<Boolean> startResourceManagerIfIsLeader(ResourceManager<?> resourceManager) {
        if (!isLeader(resourceManager)) {
            return CompletableFuture.completedFuture(false);
        }
        resourceManager.start();
        forwardTerminationFuture(resourceManager);
        return resourceManager.getStartedFuture().thenApply(r2 -> {
            return true;
        });
    }

    private void forwardTerminationFuture(ResourceManager<?> resourceManager) {
        resourceManager.getTerminationFuture().whenComplete((r5, th) -> {
            synchronized (this.lock) {
                if (isLeader(resourceManager)) {
                    if (th != null) {
                        this.serviceTerminationFuture.completeExceptionally(th);
                    } else {
                        this.serviceTerminationFuture.complete(null);
                    }
                }
            }
        });
    }

    @GuardedBy("lock")
    private boolean isLeader(ResourceManager<?> resourceManager) {
        return this.running && this.leaderResourceManager == resourceManager;
    }

    @GuardedBy("lock")
    private void stopLeaderResourceManager() {
        if (this.leaderResourceManager != null) {
            this.previousResourceManagerTerminationFuture = this.previousResourceManagerTerminationFuture.thenCombine((CompletionStage) this.leaderResourceManager.closeAsync(), (r2, r3) -> {
                return null;
            });
            this.leaderResourceManager = null;
            this.leaderSessionID = null;
        }
    }

    private void stopLeaderElectionService() {
        try {
            this.leaderElectionService.stop();
        } catch (Exception e) {
            this.serviceTerminationFuture.completeExceptionally(new FlinkException("Cannot stop leader election service.", e));
        }
    }

    @VisibleForTesting
    @Nullable
    public ResourceManager<?> getLeaderResourceManager() {
        ResourceManager<?> resourceManager;
        synchronized (this.lock) {
            resourceManager = this.leaderResourceManager;
        }
        return resourceManager;
    }

    public static ResourceManagerServiceImpl create(ResourceManagerFactory<?> resourceManagerFactory, Configuration configuration, ResourceID resourceID, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, DelegationTokenManager delegationTokenManager, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String str, MetricRegistry metricRegistry, String str2, Executor executor, DeploymentManager deploymentManager) throws ConfigurationException {
        return new ResourceManagerServiceImpl(resourceManagerFactory, resourceManagerFactory.createResourceManagerProcessContext(configuration, resourceID, rpcService, highAvailabilityServices, heartbeatServices, delegationTokenManager, fatalErrorHandler, clusterInformation, str, metricRegistry, str2, executor, deploymentManager));
    }
}
