package org.apache.flink.runtime.minicluster;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.client.ClientUtils;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.ReporterSetup;
import org.apache.flink.runtime.metrics.TraceReporterSetup;
import org.apache.flink.runtime.metrics.groups.ProcessMetricGroup;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.security.token.DefaultDelegationTokenManagerFactory;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcMetricQueryServiceRetriever;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.Reference;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.ExponentialBackoffRetryStrategy;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster.class */
public class MiniCluster implements AutoCloseableAsync {
    private static final Logger LOG = LoggerFactory.getLogger(MiniCluster.class);
    private final Object lock;
    private final MiniClusterConfiguration miniClusterConfiguration;
    private final Time rpcTimeout;

    @GuardedBy("lock")
    private final List<TaskExecutor> taskManagers;
    private final TerminatingFatalErrorHandlerFactory taskManagerTerminatingFatalErrorHandlerFactory;
    private final Supplier<Reference<RpcSystem>> rpcSystemSupplier;
    private CompletableFuture<Void> terminationFuture;

    @GuardedBy("lock")
    private MetricRegistryImpl metricRegistry;

    @GuardedBy("lock")
    private ProcessMetricGroup processMetricGroup;

    @GuardedBy("lock")
    private RpcService commonRpcService;

    @GuardedBy("lock")
    private ExecutorService ioExecutor;

    @GuardedBy("lock")
    private final Collection<RpcService> rpcServices;

    @GuardedBy("lock")
    private HighAvailabilityServicesFactory haServicesFactory;

    @GuardedBy("lock")
    private HighAvailabilityServices haServices;

    @GuardedBy("lock")
    private BlobServer blobServer;

    @GuardedBy("lock")
    private HeartbeatServices heartbeatServices;

    @GuardedBy("lock")
    private DelegationTokenManager delegationTokenManager;

    @GuardedBy("lock")
    private DelegationTokenReceiverRepository delegationTokenReceiverRepository;

    @GuardedBy("lock")
    private BlobCacheService blobCacheService;

    @GuardedBy("lock")
    private LeaderRetrievalService resourceManagerLeaderRetriever;

    @GuardedBy("lock")
    private LeaderRetrievalService dispatcherLeaderRetriever;

    @GuardedBy("lock")
    private LeaderRetrievalService clusterRestEndpointLeaderRetrievalService;

    @GuardedBy("lock")
    private Collection<DispatcherResourceManagerComponent> dispatcherResourceManagerComponents;

    @GuardedBy("lock")
    private RpcGatewayRetriever<DispatcherId, DispatcherGateway> dispatcherGatewayRetriever;

    @GuardedBy("lock")
    private RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever;

    @GuardedBy("lock")
    private LeaderRetriever webMonitorLeaderRetriever;

    @GuardedBy("lock")
    private RpcServiceFactory taskManagerRpcServiceFactory;

    @GuardedBy("lock")
    private WorkingDirectory workingDirectory;
    private volatile boolean running;

    @GuardedBy("lock")
    private Reference<RpcSystem> rpcSystem;

    @Internal
    private boolean overrideRestoreModeForChangelogStateBackend;

    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster$CommonRpcServiceFactory.class */
    protected static class CommonRpcServiceFactory implements RpcServiceFactory {
        private final RpcService commonRpcService;

        CommonRpcServiceFactory(RpcService rpcService) {
            this.commonRpcService = rpcService;
        }

        @Override // org.apache.flink.runtime.minicluster.MiniCluster.RpcServiceFactory
        public RpcService createRpcService() {
            return this.commonRpcService;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster$DedicatedRpcServiceFactory.class */
    protected class DedicatedRpcServiceFactory implements RpcServiceFactory {
        private final Configuration configuration;
        private final String externalAddress;
        private final String externalPortRange;
        private final String bindAddress;
        private final RpcSystem rpcSystem;

        DedicatedRpcServiceFactory(Configuration configuration, String str, String str2, String str3, RpcSystem rpcSystem) {
            this.configuration = configuration;
            this.externalAddress = str;
            this.externalPortRange = str2;
            this.bindAddress = str3;
            this.rpcSystem = rpcSystem;
        }

        @Override // org.apache.flink.runtime.minicluster.MiniCluster.RpcServiceFactory
        public RpcService createRpcService() throws Exception {
            RpcService createRemoteRpcService = MiniCluster.this.createRemoteRpcService(this.configuration, this.externalAddress, this.externalPortRange, this.bindAddress, this.rpcSystem);
            synchronized (MiniCluster.this.lock) {
                MiniCluster.this.rpcServices.add(createRemoteRpcService);
            }
            return createRemoteRpcService;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster$HaServices.class */
    public enum HaServices {
        CONFIGURED,
        WITH_LEADERSHIP_CONTROL
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster$RegularHighAvailabilityServicesFactory.class */
    public class RegularHighAvailabilityServicesFactory implements HighAvailabilityServicesFactory {
        private RegularHighAvailabilityServicesFactory() {
        }

        @Override // org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory
        public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) throws Exception {
            return HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration, executor, new ShutDownFatalErrorHandler());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster$RpcServiceFactory.class */
    public interface RpcServiceFactory {
        RpcService createRpcService() throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster$ShutDownFatalErrorHandler.class */
    public class ShutDownFatalErrorHandler implements FatalErrorHandler {
        private ShutDownFatalErrorHandler() {
        }

        public void onFatalError(Throwable th) {
            MiniCluster.LOG.warn("Error in MiniCluster. Shutting the MiniCluster down.", th);
            MiniCluster.this.closeAsync();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster$SingletonHighAvailabilityServicesFactory.class */
    public static class SingletonHighAvailabilityServicesFactory implements HighAvailabilityServicesFactory {
        private final BiFunctionWithException<Configuration, Executor, HighAvailabilityServices, Exception> creationCallback;

        @Nullable
        private HighAvailabilityServices haServices;

        public SingletonHighAvailabilityServicesFactory(BiFunctionWithException<Configuration, Executor, HighAvailabilityServices, Exception> biFunctionWithException) {
            this.creationCallback = biFunctionWithException;
        }

        @Override // org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory
        public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) throws Exception {
            if (this.haServices == null) {
                this.haServices = (HighAvailabilityServices) this.creationCallback.apply(configuration, executor);
            }
            return this.haServices;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster$TerminatingFatalErrorHandler.class */
    public class TerminatingFatalErrorHandler implements FatalErrorHandler {
        private final int index;

        private TerminatingFatalErrorHandler(int i) {
            this.index = i;
        }

        public void onFatalError(Throwable th) {
            if (MiniCluster.this.running) {
                MiniCluster.LOG.error("TaskManager #{} failed.", Integer.valueOf(this.index), th);
                synchronized (MiniCluster.this.lock) {
                    MiniCluster.this.taskManagers.get(this.index).closeAsync();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster$TerminatingFatalErrorHandlerFactory.class */
    public class TerminatingFatalErrorHandlerFactory {
        private TerminatingFatalErrorHandlerFactory() {
        }

        @GuardedBy("lock")
        private TerminatingFatalErrorHandler create(int i) {
            return new TerminatingFatalErrorHandler(i);
        }
    }

    public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) {
        this(miniClusterConfiguration, () -> {
            return Reference.owned(RpcSystem.load(miniClusterConfiguration.getConfiguration()));
        });
    }

    public MiniCluster(MiniClusterConfiguration miniClusterConfiguration, Supplier<Reference<RpcSystem>> supplier) {
        this.lock = new Object();
        this.taskManagerTerminatingFatalErrorHandlerFactory = new TerminatingFatalErrorHandlerFactory();
        this.miniClusterConfiguration = (MiniClusterConfiguration) Preconditions.checkNotNull(miniClusterConfiguration, "config may not be null");
        this.rpcServices = new ArrayList(3 + miniClusterConfiguration.getNumTaskManagers());
        this.dispatcherResourceManagerComponents = new ArrayList(1);
        this.rpcTimeout = RpcUtils.INF_TIMEOUT;
        this.terminationFuture = CompletableFuture.completedFuture(null);
        this.running = false;
        this.taskManagers = new ArrayList(miniClusterConfiguration.getNumTaskManagers());
        this.rpcSystemSupplier = supplier;
    }

    public CompletableFuture<URI> getRestAddress() {
        CompletableFuture thenApply;
        synchronized (this.lock) {
            Preconditions.checkState(this.running, "MiniCluster is not yet running or has already been shut down.");
            thenApply = this.webMonitorLeaderRetriever.getLeaderFuture().thenApply(FunctionUtils.uncheckedFunction(tuple2 -> {
                return new URI((String) tuple2.f0);
            }));
        }
        return thenApply;
    }

    public ClusterInformation getClusterInformation() {
        ClusterInformation clusterInformation;
        synchronized (this.lock) {
            Preconditions.checkState(this.running, "MiniCluster is not yet running or has already been shut down.");
            clusterInformation = new ClusterInformation("localhost", this.blobServer.getPort());
        }
        return clusterInformation;
    }

    protected Executor getIOExecutor() {
        return this.ioExecutor;
    }

    public boolean isRunning() {
        return this.running;
    }

    public void start() throws Exception {
        RpcServiceFactory dedicatedRpcServiceFactory;
        RpcService startRemoteMetricsRpcService;
        synchronized (this.lock) {
            Preconditions.checkState(!this.running, "MiniCluster is already running");
            LOG.info("Starting Flink Mini Cluster");
            LOG.debug("Using configuration {}", this.miniClusterConfiguration);
            UnmodifiableConfiguration configuration = this.miniClusterConfiguration.getConfiguration();
            boolean z = this.miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;
            try {
                this.workingDirectory = WorkingDirectory.create(ClusterEntrypointUtils.generateWorkingDirectoryFile(configuration, Optional.of(ClusterOptions.PROCESS_WORKING_DIR_BASE), "minicluster_" + ResourceID.generate()));
                initializeIOFormatClasses(configuration);
                this.rpcSystem = this.rpcSystemSupplier.get();
                LOG.info("Starting Metrics Registry");
                this.metricRegistry = createMetricRegistry(configuration, ((RpcSystem) this.rpcSystem.deref()).getMaximumMessageSizeInBytes(configuration));
                LOG.info("Starting RPC Service(s)");
                if (z) {
                    this.commonRpcService = createLocalRpcService(configuration, (RpcSystem) this.rpcSystem.deref());
                    CommonRpcServiceFactory commonRpcServiceFactory = new CommonRpcServiceFactory(this.commonRpcService);
                    this.taskManagerRpcServiceFactory = commonRpcServiceFactory;
                    dedicatedRpcServiceFactory = commonRpcServiceFactory;
                    startRemoteMetricsRpcService = MetricUtils.startLocalMetricsRpcService(configuration, (RpcSystem) this.rpcSystem.deref());
                } else {
                    String jobManagerExternalAddress = this.miniClusterConfiguration.getJobManagerExternalAddress();
                    String taskManagerExternalAddress = this.miniClusterConfiguration.getTaskManagerExternalAddress();
                    String jobManagerExternalPortRange = this.miniClusterConfiguration.getJobManagerExternalPortRange();
                    String taskManagerExternalPortRange = this.miniClusterConfiguration.getTaskManagerExternalPortRange();
                    String jobManagerBindAddress = this.miniClusterConfiguration.getJobManagerBindAddress();
                    String taskManagerBindAddress = this.miniClusterConfiguration.getTaskManagerBindAddress();
                    dedicatedRpcServiceFactory = new DedicatedRpcServiceFactory(configuration, jobManagerExternalAddress, jobManagerExternalPortRange, jobManagerBindAddress, (RpcSystem) this.rpcSystem.deref());
                    this.taskManagerRpcServiceFactory = new DedicatedRpcServiceFactory(configuration, taskManagerExternalAddress, taskManagerExternalPortRange, taskManagerBindAddress, (RpcSystem) this.rpcSystem.deref());
                    this.commonRpcService = createRemoteRpcService(configuration, jobManagerBindAddress, 0, (RpcSystem) this.rpcSystem.deref());
                    startRemoteMetricsRpcService = MetricUtils.startRemoteMetricsRpcService(configuration, this.commonRpcService.getAddress(), null, (RpcSystem) this.rpcSystem.deref());
                }
                this.metricRegistry.startQueryService(startRemoteMetricsRpcService, null);
                this.processMetricGroup = MetricUtils.instantiateProcessMetricGroup(this.metricRegistry, RpcUtils.getHostname(this.commonRpcService), ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
                this.ioExecutor = Executors.newFixedThreadPool(ClusterEntrypointUtils.getPoolSize(configuration), new ExecutorThreadFactory("mini-cluster-io"));
                this.delegationTokenManager = DefaultDelegationTokenManagerFactory.create(configuration, this.miniClusterConfiguration.getPluginManager(), this.commonRpcService.getScheduledExecutor(), this.ioExecutor);
                this.delegationTokenManager.obtainDelegationTokens();
                this.delegationTokenReceiverRepository = new DelegationTokenReceiverRepository(configuration, this.miniClusterConfiguration.getPluginManager());
                this.haServicesFactory = createHighAvailabilityServicesFactory(configuration);
                this.haServices = createHighAvailabilityServices(configuration, this.ioExecutor);
                this.blobServer = BlobUtils.createBlobServer(configuration, Reference.borrowed(this.workingDirectory.getBlobStorageDirectory()), this.haServices.createBlobStore());
                this.blobServer.start();
                this.heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
                this.blobCacheService = BlobUtils.createBlobCacheService(configuration, Reference.borrowed(this.workingDirectory.getBlobStorageDirectory()), this.haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), this.blobServer.getPort()));
                startTaskManagers();
                setupDispatcherResourceManagerComponents(configuration, dedicatedRpcServiceFactory, new RpcMetricQueryServiceRetriever(this.metricRegistry.getMetricQueryServiceRpcService()));
                this.resourceManagerLeaderRetriever = this.haServices.getResourceManagerLeaderRetriever();
                this.dispatcherLeaderRetriever = this.haServices.getDispatcherLeaderRetriever();
                this.clusterRestEndpointLeaderRetrievalService = this.haServices.getClusterRestEndpointLeaderRetriever();
                this.dispatcherGatewayRetriever = new RpcGatewayRetriever<>(this.commonRpcService, DispatcherGateway.class, DispatcherId::fromUuid, new ExponentialBackoffRetryStrategy(21, Duration.ofMillis(5L), Duration.ofMillis(20L)));
                this.resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(this.commonRpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, new ExponentialBackoffRetryStrategy(21, Duration.ofMillis(5L), Duration.ofMillis(20L)));
                this.webMonitorLeaderRetriever = new LeaderRetriever();
                this.resourceManagerLeaderRetriever.start(this.resourceManagerGatewayRetriever);
                this.dispatcherLeaderRetriever.start(this.dispatcherGatewayRetriever);
                this.clusterRestEndpointLeaderRetrievalService.start(this.webMonitorLeaderRetriever);
                this.terminationFuture = new CompletableFuture<>();
                this.running = true;
                LOG.info("Flink Mini Cluster started successfully");
            } catch (Exception e) {
                try {
                    close();
                } catch (Exception e2) {
                    e.addSuppressed(e2);
                }
                throw e;
            }
        }
    }

    @GuardedBy("lock")
    private void setupDispatcherResourceManagerComponents(Configuration configuration, RpcServiceFactory rpcServiceFactory, MetricQueryServiceRetriever metricQueryServiceRetriever) throws Exception {
        this.dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents(configuration, rpcServiceFactory, this.blobServer, this.heartbeatServices, this.delegationTokenManager, this.metricRegistry, metricQueryServiceRetriever, new ShutDownFatalErrorHandler()));
        FutureUtils.completeAll((Collection) this.dispatcherResourceManagerComponents.stream().map((v0) -> {
            return v0.getShutDownFuture();
        }).collect(Collectors.toList())).whenComplete((r3, th) -> {
            closeAsync();
        });
    }

    @VisibleForTesting
    protected Collection<? extends DispatcherResourceManagerComponent> createDispatcherResourceManagerComponents(Configuration configuration, RpcServiceFactory rpcServiceFactory, BlobServer blobServer, HeartbeatServices heartbeatServices, DelegationTokenManager delegationTokenManager, MetricRegistry metricRegistry, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception {
        DispatcherResourceManagerComponent create = createDispatcherResourceManagerComponentFactory().create(configuration, ResourceID.generate(), this.ioExecutor, rpcServiceFactory.createRpcService(), this.haServices, blobServer, heartbeatServices, delegationTokenManager, metricRegistry, new MemoryExecutionGraphInfoStore(), metricQueryServiceRetriever, Collections.emptySet(), fatalErrorHandler);
        FutureUtils.assertNoException(create.getShutDownFuture().thenCompose(applicationStatus -> {
            return create.stopApplication(applicationStatus, null);
        }));
        return Collections.singleton(create);
    }

    protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory() {
        return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(StandaloneResourceManagerFactory.getInstance());
    }

    private HighAvailabilityServicesFactory createHighAvailabilityServicesFactory(Configuration configuration) {
        HaServices haServices = this.miniClusterConfiguration.getHaServices();
        if (haServices == HaServices.WITH_LEADERSHIP_CONTROL) {
            return new SingletonHighAvailabilityServicesFactory((configuration2, executor) -> {
                return new EmbeddedHaServicesWithLeadershipControl(executor);
            });
        }
        if (haServices != HaServices.CONFIGURED) {
            throw new IllegalConfigurationException("Unknown HA Services Mode configured in MiniCluster configuration: " + haServices);
        }
        return HighAvailabilityMode.fromConfig(configuration) == HighAvailabilityMode.NONE ? new SingletonHighAvailabilityServicesFactory((configuration3, executor2) -> {
            return new EmbeddedHaServices(executor2);
        }) : new RegularHighAvailabilityServicesFactory();
    }

    @VisibleForTesting
    protected HighAvailabilityServices createHighAvailabilityServices(Configuration configuration, Executor executor) throws Exception {
        return this.haServicesFactory.createHAServices(configuration, executor);
    }

    public Optional<HaLeadershipControl> getHaLeadershipControl() {
        Optional<HaLeadershipControl> of;
        synchronized (this.lock) {
            of = this.haServices instanceof HaLeadershipControl ? Optional.of((HaLeadershipControl) this.haServices) : Optional.empty();
        }
        return of;
    }

    protected HighAvailabilityServices getHaServices() {
        return this.haServices;
    }

    public CompletableFuture<Void> closeAsync() {
        return closeInternal(true);
    }

    public CompletableFuture<Void> closeAsyncWithoutCleaningHighAvailabilityData() {
        return closeInternal(false);
    }

    private CompletableFuture<Void> closeInternal(boolean z) {
        CompletableFuture<Void> completableFuture;
        synchronized (this.lock) {
            if (this.running) {
                LOG.info("Shutting down Flink Mini Cluster");
                try {
                    long millis = ((Duration) this.miniClusterConfiguration.getConfiguration().get(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT)).toMillis();
                    ArrayList arrayList = new ArrayList(2 + this.miniClusterConfiguration.getNumTaskManagers());
                    arrayList.addAll(terminateTaskManagers());
                    arrayList.add(shutDownResourceManagerComponents());
                    FutureUtils.runAfterwards(FutureUtils.composeAfterwards(FutureUtils.runAfterwards(FutureUtils.composeAfterwards(FutureUtils.composeAfterwards(FutureUtils.completeAll(arrayList), this::closeMetricSystem), this::terminateRpcServices), () -> {
                        terminateMiniClusterServices(z);
                    }), () -> {
                        return terminateExecutors(millis);
                    }), this::deleteDirectories).whenComplete((r4, th) -> {
                        if (th != null) {
                            this.terminationFuture.completeExceptionally(ExceptionUtils.stripCompletionException(th));
                        } else {
                            this.terminationFuture.complete(null);
                        }
                    });
                    this.running = false;
                } catch (Throwable th2) {
                    this.running = false;
                    throw th2;
                }
            }
            completableFuture = this.terminationFuture;
        }
        return completableFuture;
    }

    private CompletableFuture<Void> closeMetricSystem() {
        FutureUtils.ConjunctFuture completeAll;
        synchronized (this.lock) {
            ArrayList arrayList = new ArrayList(2);
            if (this.processMetricGroup != null) {
                this.processMetricGroup.close();
                this.processMetricGroup = null;
            }
            if (this.metricRegistry != null) {
                arrayList.add(this.metricRegistry.closeAsync());
                this.metricRegistry = null;
            }
            completeAll = FutureUtils.completeAll(arrayList);
        }
        return completeAll;
    }

    @GuardedBy("lock")
    private void startTaskManagers() throws Exception {
        int numTaskManagers = this.miniClusterConfiguration.getNumTaskManagers();
        LOG.info("Starting {} TaskManager(s)", Integer.valueOf(numTaskManagers));
        for (int i = 0; i < numTaskManagers; i++) {
            startTaskManager();
        }
    }

    public void startTaskManager() throws Exception {
        synchronized (this.lock) {
            TaskExecutor startTaskManager = TaskManagerRunner.startTaskManager(this.miniClusterConfiguration.getConfiguration(), new ResourceID(UUID.randomUUID().toString()), this.taskManagerRpcServiceFactory.createRpcService(), this.haServices, this.heartbeatServices, this.metricRegistry, this.blobCacheService, useLocalCommunication(), ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, this.workingDirectory.createSubWorkingDirectory("tm_" + this.taskManagers.size()), this.taskManagerTerminatingFatalErrorHandlerFactory.create(this.taskManagers.size()), this.delegationTokenReceiverRepository);
            startTaskManager.start();
            this.taskManagers.add(startTaskManager);
        }
    }

    @VisibleForTesting
    protected boolean useLocalCommunication() {
        return this.miniClusterConfiguration.getNumTaskManagers() == 1;
    }

    @VisibleForTesting
    public Configuration getConfiguration() {
        return this.miniClusterConfiguration.getConfiguration();
    }

    @Internal
    public void overrideRestoreModeForChangelogStateBackend() {
        this.overrideRestoreModeForChangelogStateBackend = true;
    }

    @GuardedBy("lock")
    private Collection<? extends CompletableFuture<Void>> terminateTaskManagers() {
        ArrayList arrayList = new ArrayList(this.taskManagers.size());
        for (int i = 0; i < this.taskManagers.size(); i++) {
            arrayList.add(terminateTaskManager(i));
        }
        return arrayList;
    }

    public CompletableFuture<Void> terminateTaskManager(int i) {
        CompletableFuture<Void> closeAsync;
        synchronized (this.lock) {
            closeAsync = this.taskManagers.get(i).closeAsync();
        }
        return closeAsync;
    }

    public CompletableFuture<ArchivedExecutionGraph> getArchivedExecutionGraph(JobID jobID) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.requestExecutionGraphInfo(jobID, this.rpcTimeout).thenApply((v0) -> {
                return v0.getArchivedExecutionGraph();
            });
        });
    }

    public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.requestMultipleJobDetails(this.rpcTimeout).thenApply(multipleJobsDetails -> {
                return (Collection) multipleJobsDetails.getJobs().stream().map(jobDetails -> {
                    return new JobStatusMessage(jobDetails.getJobId(), jobDetails.getJobName(), jobDetails.getStatus(), jobDetails.getStartTime());
                }).collect(Collectors.toList());
            });
        });
    }

    public CompletableFuture<JobStatus> getJobStatus(JobID jobID) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.requestJobStatus(jobID, this.rpcTimeout);
        });
    }

    public CompletableFuture<Acknowledge> cancelJob(JobID jobID) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.cancelJob(jobID, this.rpcTimeout);
        });
    }

    public CompletableFuture<String> triggerSavepoint(JobID jobID, String str, boolean z, SavepointFormatType savepointFormatType) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.triggerSavepointAndGetLocation(jobID, str, savepointFormatType, z ? TriggerSavepointMode.CANCEL_WITH_SAVEPOINT : TriggerSavepointMode.SAVEPOINT, this.rpcTimeout);
        });
    }

    public CompletableFuture<String> triggerDetachedSavepoint(JobID jobID, String str, boolean z, SavepointFormatType savepointFormatType) {
        return runDispatcherCommand(dispatcherGateway -> {
            dispatcherGateway.triggerSavepointAndGetLocation(jobID, str, savepointFormatType, z ? TriggerSavepointMode.CANCEL_WITH_SAVEPOINT : TriggerSavepointMode.SAVEPOINT, this.rpcTimeout);
            return CompletableFuture.completedFuture("");
        });
    }

    public CompletableFuture<String> triggerCheckpoint(JobID jobID) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.triggerCheckpoint(jobID, this.rpcTimeout);
        });
    }

    public CompletableFuture<Long> triggerCheckpoint(JobID jobID, CheckpointType checkpointType) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.triggerCheckpointAndGetCheckpointID(jobID, checkpointType, this.rpcTimeout);
        });
    }

    public CompletableFuture<Acknowledge> triggerRuntimeRescale(JobID jobID, Map<String, Integer> map) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.triggerRuntimeRescale(jobID, map, this.rpcTimeout);
        });
    }

    public CompletableFuture<String> stopWithSavepoint(JobID jobID, String str, boolean z, SavepointFormatType savepointFormatType) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.stopWithSavepointAndGetLocation(jobID, str, savepointFormatType, z ? TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT : TriggerSavepointMode.SUSPEND_WITH_SAVEPOINT, this.rpcTimeout);
        });
    }

    public CompletableFuture<String> stopWithDetachedSavepoint(JobID jobID, String str, boolean z, SavepointFormatType savepointFormatType) {
        return runDispatcherCommand(dispatcherGateway -> {
            dispatcherGateway.stopWithSavepointAndGetLocation(jobID, str, savepointFormatType, z ? TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT : TriggerSavepointMode.SUSPEND_WITH_SAVEPOINT, this.rpcTimeout);
            return CompletableFuture.completedFuture("");
        });
    }

    public CompletableFuture<Acknowledge> disposeSavepoint(String str) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.disposeSavepoint(str, this.rpcTimeout);
        });
    }

    public CompletableFuture<? extends AccessExecutionGraph> getExecutionGraph(JobID jobID) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.requestJob(jobID, this.rpcTimeout);
        });
    }

    public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(JobID jobID, OperatorID operatorID, SerializedValue<CoordinationRequest> serializedValue) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.deliverCoordinationRequestToCoordinator(jobID, operatorID, serializedValue, this.rpcTimeout);
        });
    }

    public CompletableFuture<ResourceOverview> getResourceOverview() {
        return runResourceManagerCommand(resourceManagerGateway -> {
            return resourceManagerGateway.requestResourceOverview(this.rpcTimeout);
        });
    }

    private <T> CompletableFuture<T> runDispatcherCommand(Function<DispatcherGateway, CompletableFuture<T>> function) {
        return getDispatcherGatewayFuture().thenApply((Function<? super DispatcherGateway, ? extends U>) function).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity());
    }

    private <T> CompletableFuture<T> runResourceManagerCommand(Function<ResourceManagerGateway, CompletableFuture<T>> function) {
        return getResourceManagerGatewayFuture().thenApply((Function<? super ResourceManagerGateway, ? extends U>) function).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity());
    }

    public void runDetached(JobGraph jobGraph) throws JobExecutionException, InterruptedException {
        Preconditions.checkNotNull(jobGraph, "job is null");
        try {
            submitJob(jobGraph).get();
        } catch (ExecutionException e) {
            throw new JobExecutionException(jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e));
        }
    }

    public JobExecutionResult executeJobBlocking(JobGraph jobGraph) throws JobExecutionException, InterruptedException {
        Preconditions.checkNotNull(jobGraph, "job is null");
        try {
            try {
                return ((JobResult) submitJob(jobGraph).thenCompose(jobSubmissionResult -> {
                    return requestJobResult(jobGraph.getJobID());
                }).get()).toJobExecutionResult(Thread.currentThread().getContextClassLoader());
            } catch (IOException | ClassNotFoundException e) {
                throw new JobExecutionException(jobGraph.getJobID(), e);
            }
        } catch (ExecutionException e2) {
            throw new JobExecutionException(jobGraph.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e2));
        }
    }

    public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
        JobGraph jobGraph2 = (JobGraph) InstantiationUtil.cloneUnchecked(jobGraph);
        checkRestoreModeForChangelogStateBackend(jobGraph2);
        CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();
        return uploadAndSetJobFiles(createBlobServerAddress(dispatcherGatewayFuture), jobGraph2).thenCombine((CompletionStage) dispatcherGatewayFuture, (r6, dispatcherGateway) -> {
            return dispatcherGateway.submitJob(jobGraph2, this.rpcTimeout);
        }).thenCompose((Function<? super V, ? extends CompletionStage<U>>) Function.identity()).thenApply(acknowledge -> {
            return new JobSubmissionResult(jobGraph2.getJobID());
        });
    }

    private void checkRestoreModeForChangelogStateBackend(JobGraph jobGraph) {
        SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
        if (this.overrideRestoreModeForChangelogStateBackend && savepointRestoreSettings.getRestoreMode() == RestoreMode.NO_CLAIM) {
            Configuration configuration = new Configuration();
            SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, configuration);
            configuration.set(StateRecoveryOptions.RESTORE_MODE, RestoreMode.LEGACY);
            jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.fromConfiguration(configuration));
        }
    }

    public CompletableFuture<JobResult> requestJobResult(JobID jobID) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.requestJobResult(jobID, RpcUtils.INF_TIMEOUT);
        });
    }

    public CompletableFuture<ClusterOverview> requestClusterOverview() {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.requestClusterOverview(RpcUtils.INF_TIMEOUT);
        });
    }

    @VisibleForTesting
    protected CompletableFuture<DispatcherGateway> getDispatcherGatewayFuture() {
        CompletableFuture future;
        synchronized (this.lock) {
            Preconditions.checkState(this.running, "MiniCluster is not yet running or has already been shut down.");
            future = this.dispatcherGatewayRetriever.getFuture();
        }
        return future;
    }

    private CompletableFuture<ResourceManagerGateway> getResourceManagerGatewayFuture() {
        CompletableFuture future;
        synchronized (this.lock) {
            Preconditions.checkState(this.running, "MiniCluster is not yet running or has already been shut down.");
            future = this.resourceManagerGatewayRetriever.getFuture();
        }
        return future;
    }

    private CompletableFuture<Void> uploadAndSetJobFiles(CompletableFuture<InetSocketAddress> completableFuture, JobGraph jobGraph) {
        return completableFuture.thenAccept(inetSocketAddress -> {
            try {
                ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> {
                    return new BlobClient(inetSocketAddress, this.miniClusterConfiguration.getConfiguration());
                });
            } catch (FlinkException e) {
                throw new CompletionException((Throwable) e);
            }
        });
    }

    private CompletableFuture<InetSocketAddress> createBlobServerAddress(CompletableFuture<DispatcherGateway> completableFuture) {
        return completableFuture.thenApply(dispatcherGateway -> {
            return dispatcherGateway.getBlobServerPort(this.rpcTimeout).thenApply(num -> {
                return new InetSocketAddress(dispatcherGateway.getHostname(), num.intValue());
            });
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity());
    }

    protected MetricRegistryImpl createMetricRegistry(Configuration configuration, long j) {
        return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration, j), ReporterSetup.fromConfiguration(configuration, this.miniClusterConfiguration.getPluginManager()), TraceReporterSetup.fromConfiguration(configuration, this.miniClusterConfiguration.getPluginManager()));
    }

    protected RpcService createRemoteRpcService(Configuration configuration, String str, int i, RpcSystem rpcSystem) throws Exception {
        return rpcSystem.remoteServiceBuilder(configuration, str, String.valueOf(i)).withBindAddress(str).withBindPort(i).withExecutorConfiguration(RpcUtils.getTestForkJoinExecutorConfiguration()).createAndStart();
    }

    protected RpcService createRemoteRpcService(Configuration configuration, String str, String str2, String str3, RpcSystem rpcSystem) throws Exception {
        return rpcSystem.remoteServiceBuilder(configuration, str, str2).withBindAddress(str3).withExecutorConfiguration(RpcUtils.getTestForkJoinExecutorConfiguration()).createAndStart();
    }

    protected RpcService createLocalRpcService(Configuration configuration, RpcSystem rpcSystem) throws Exception {
        return rpcSystem.localServiceBuilder(configuration).withExecutorConfiguration(RpcUtils.getTestForkJoinExecutorConfiguration()).createAndStart();
    }

    @GuardedBy("lock")
    private CompletableFuture<Void> shutDownResourceManagerComponents() {
        ArrayList arrayList = new ArrayList(this.dispatcherResourceManagerComponents.size());
        Iterator<DispatcherResourceManagerComponent> it = this.dispatcherResourceManagerComponents.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().closeAsync());
        }
        return FutureUtils.runAfterwards(FutureUtils.completeAll(arrayList), () -> {
            Exception exc = null;
            synchronized (this.lock) {
                if (this.resourceManagerLeaderRetriever != null) {
                    try {
                        this.resourceManagerLeaderRetriever.stop();
                    } catch (Exception e) {
                        exc = (Exception) ExceptionUtils.firstOrSuppressed(e, (Throwable) null);
                    }
                    this.resourceManagerLeaderRetriever = null;
                }
                if (this.dispatcherLeaderRetriever != null) {
                    try {
                        this.dispatcherLeaderRetriever.stop();
                    } catch (Exception e2) {
                        exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
                    }
                    this.dispatcherLeaderRetriever = null;
                }
                if (this.clusterRestEndpointLeaderRetrievalService != null) {
                    try {
                        this.clusterRestEndpointLeaderRetrievalService.stop();
                    } catch (Exception e3) {
                        exc = (Exception) ExceptionUtils.firstOrSuppressed(e3, exc);
                    }
                    this.clusterRestEndpointLeaderRetrievalService = null;
                }
            }
            if (exc != null) {
                throw exc;
            }
        });
    }

    private void terminateMiniClusterServices(boolean z) throws Exception {
        Exception exc = null;
        synchronized (this.lock) {
            if (this.blobCacheService != null) {
                try {
                    this.blobCacheService.close();
                } catch (Exception e) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(e, (Throwable) null);
                }
                this.blobCacheService = null;
            }
            if (this.blobServer != null) {
                try {
                    this.blobServer.close();
                } catch (Exception e2) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
                }
                this.blobServer = null;
            }
            if (this.haServices != null) {
                this.haServices.closeWithOptionalClean(z);
                this.haServices = null;
            }
            try {
                if (this.rpcSystem.isOwned()) {
                    ((RpcSystem) this.rpcSystem.deref()).close();
                }
            } catch (Exception e3) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e3, exc);
            }
            if (exc != null) {
                throw exc;
            }
        }
    }

    @Nonnull
    private CompletableFuture<Void> terminateRpcServices() {
        FutureUtils.ConjunctFuture completeAll;
        synchronized (this.lock) {
            ArrayList arrayList = new ArrayList(1 + this.rpcServices.size());
            arrayList.add(this.commonRpcService.closeAsync());
            Iterator<RpcService> it = this.rpcServices.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().closeAsync());
            }
            this.commonRpcService = null;
            this.rpcServices.clear();
            completeAll = FutureUtils.completeAll(arrayList);
        }
        return completeAll;
    }

    private CompletableFuture<Void> terminateExecutors(long j) {
        synchronized (this.lock) {
            if (this.ioExecutor != null) {
                return ExecutorUtils.nonBlockingShutdown(j, TimeUnit.MILLISECONDS, new ExecutorService[]{this.ioExecutor});
            }
            return CompletableFuture.completedFuture(null);
        }
    }

    private void deleteDirectories() throws IOException {
        synchronized (this.lock) {
            if (this.workingDirectory != null) {
                this.workingDirectory.delete();
            }
        }
    }

    public CompletableFuture<Void> invalidateClusterDataset(AbstractID abstractID) {
        return this.resourceManagerGatewayRetriever.getFuture().thenApply(resourceManagerGateway -> {
            return resourceManagerGateway.releaseClusterPartitions(new IntermediateDataSetID(abstractID));
        }).thenCompose(Function.identity());
    }

    public CompletableFuture<Set<AbstractID>> listCompletedClusterDatasetIds() {
        return this.resourceManagerGatewayRetriever.getFuture().thenApply((v0) -> {
            return v0.listDataSets();
        }).thenCompose(completableFuture -> {
            return completableFuture.thenApply(map -> {
                return new HashSet(map.keySet());
            });
        });
    }

    public CompletableFuture<Void> reportHeartbeat(JobID jobID, long j) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.reportJobClientHeartbeat(jobID, j, this.rpcTimeout);
        });
    }

    private void initializeIOFormatClasses(Configuration configuration) {
        FileOutputFormat.initDefaultsFromConfiguration(configuration);
    }
}
