package org.apache.flink.kubernetes.highavailability;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.highavailability.AbstractHaServices;
import org.apache.flink.runtime.highavailability.FileSystemJobResultStore;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService;
import org.apache.flink.runtime.leaderelection.DefaultMultipleComponentLeaderElectionService;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.class */
public class KubernetesMultipleComponentLeaderElectionHaServices extends AbstractHaServices {
    private final Object lock;
    private final String clusterId;
    private final FlinkKubeClient kubeClient;
    private final KubernetesConfigMapSharedWatcher configMapSharedWatcher;
    private final ExecutorService watchExecutorService;
    private final String lockIdentity;
    private final FatalErrorHandler fatalErrorHandler;

    @GuardedBy("lock")
    @Nullable
    private DefaultMultipleComponentLeaderElectionService multipleComponentLeaderElectionService;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KubernetesMultipleComponentLeaderElectionHaServices(FlinkKubeClient flinkKubeClient, Executor executor, Configuration configuration, BlobStoreService blobStoreService, FatalErrorHandler fatalErrorHandler) throws IOException {
        super(configuration, executor, blobStoreService, FileSystemJobResultStore.fromConfiguration(configuration));
        this.lock = new Object();
        this.multipleComponentLeaderElectionService = null;
        this.kubeClient = (FlinkKubeClient) Preconditions.checkNotNull(flinkKubeClient);
        this.clusterId = (String) Preconditions.checkNotNull(configuration.get(KubernetesConfigOptions.CLUSTER_ID));
        this.fatalErrorHandler = (FatalErrorHandler) Preconditions.checkNotNull(fatalErrorHandler);
        this.configMapSharedWatcher = this.kubeClient.createConfigMapSharedWatcher(KubernetesUtils.getConfigMapLabels(this.clusterId, "high-availability"));
        this.watchExecutorService = Executors.newCachedThreadPool(new ExecutorThreadFactory("config-map-watch-handler"));
        this.lockIdentity = UUID.randomUUID().toString();
    }

    @Override // org.apache.flink.runtime.highavailability.AbstractHaServices
    protected LeaderElectionService createLeaderElectionService(String str) {
        return new DefaultLeaderElectionService(getOrInitializeSingleLeaderElectionService().createDriverFactory(str));
    }

    private DefaultMultipleComponentLeaderElectionService getOrInitializeSingleLeaderElectionService() {
        DefaultMultipleComponentLeaderElectionService defaultMultipleComponentLeaderElectionService;
        synchronized (this.lock) {
            if (this.multipleComponentLeaderElectionService == null) {
                try {
                    this.multipleComponentLeaderElectionService = new DefaultMultipleComponentLeaderElectionService(this.fatalErrorHandler, new KubernetesMultipleComponentLeaderElectionDriverFactory(this.kubeClient, new KubernetesLeaderElectionConfiguration(getClusterConfigMap(), this.lockIdentity, this.configuration), this.configMapSharedWatcher, this.watchExecutorService, this.fatalErrorHandler));
                } catch (Exception e) {
                    throw new FlinkRuntimeException("Could not initialize the default single leader election service.", e);
                }
            }
            defaultMultipleComponentLeaderElectionService = this.multipleComponentLeaderElectionService;
        }
        return defaultMultipleComponentLeaderElectionService;
    }

    @Override // org.apache.flink.runtime.highavailability.AbstractHaServices
    protected LeaderRetrievalService createLeaderRetrievalService(String str) {
        return new DefaultLeaderRetrievalService(new KubernetesMultipleComponentLeaderRetrievalDriverFactory(this.kubeClient, this.configMapSharedWatcher, this.watchExecutorService, getClusterConfigMap(), str));
    }

    @Override // org.apache.flink.runtime.highavailability.AbstractHaServices
    protected CheckpointRecoveryFactory createCheckpointRecoveryFactory() {
        return KubernetesCheckpointRecoveryFactory.withoutLeadershipValidation(this.kubeClient, this.configuration, this.ioExecutor, this.clusterId, this::getJobSpecificConfigMap);
    }

    private String getJobSpecificConfigMap(JobID jobID) {
        return this.clusterId + "-" + jobID.toString() + "-config-map";
    }

    @Override // org.apache.flink.runtime.highavailability.AbstractHaServices
    protected JobGraphStore createJobGraphStore() throws Exception {
        return KubernetesUtils.createJobGraphStore(this.configuration, this.kubeClient, getClusterConfigMap(), this.lockIdentity);
    }

    private String getClusterConfigMap() {
        return this.clusterId + "-cluster-config-map";
    }

    @Override // org.apache.flink.runtime.highavailability.AbstractHaServices
    public void internalClose() throws Exception {
        Exception exc = null;
        try {
            closeK8sServices();
        } catch (Exception e) {
            exc = e;
        }
        this.kubeClient.close();
        ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, this.watchExecutorService);
        ExceptionUtils.tryRethrowException(exc);
    }

    private void closeK8sServices() throws Exception {
        Exception exc = null;
        synchronized (this.lock) {
            if (this.multipleComponentLeaderElectionService != null) {
                try {
                    this.multipleComponentLeaderElectionService.close();
                } catch (Exception e) {
                    exc = e;
                }
                this.multipleComponentLeaderElectionService = null;
            }
        }
        this.configMapSharedWatcher.close();
        ExceptionUtils.tryRethrowException(exc);
    }

    @Override // org.apache.flink.runtime.highavailability.AbstractHaServices
    public void internalCleanup() throws Exception {
        Exception exc = null;
        try {
            closeK8sServices();
        } catch (Exception e) {
            exc = e;
        }
        this.kubeClient.deleteConfigMapsByLabels(KubernetesUtils.getConfigMapLabels(this.clusterId, "high-availability")).get();
        ExceptionUtils.tryRethrowException(exc);
    }

    @Override // org.apache.flink.runtime.highavailability.AbstractHaServices
    public void internalCleanupJobData(JobID jobID) throws Exception {
        this.kubeClient.deleteConfigMap(getJobSpecificConfigMap(jobID)).get();
    }

    @Override // org.apache.flink.runtime.highavailability.AbstractHaServices
    protected String getLeaderPathForResourceManager() {
        return ResourceManager.RESOURCE_MANAGER_NAME;
    }

    @Override // org.apache.flink.runtime.highavailability.AbstractHaServices
    protected String getLeaderPathForDispatcher() {
        return Dispatcher.DISPATCHER_NAME;
    }

    @Override // org.apache.flink.runtime.highavailability.AbstractHaServices
    protected String getLeaderPathForJobManager(JobID jobID) {
        return jobID.toString();
    }

    @Override // org.apache.flink.runtime.highavailability.AbstractHaServices
    protected String getLeaderPathForRestServer() {
        return "restserver";
    }
}
