package org.apache.flink.mesos.runtime.clusterframework;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.dispatch.OnComplete;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.mesos.entrypoint.MesosEntrypointUtils;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
import org.apache.flink.mesos.util.MesosArtifactServer;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.class */
public class MesosApplicationMasterRunner {
    private static final int INIT_ERROR_EXIT_CODE = 31;
    private static final int ACTOR_DIED_EXIT_CODE = 32;
    protected static final Logger LOG = LoggerFactory.getLogger((Class<?>) MesosApplicationMasterRunner.class);
    private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
    private static final Map<String, String> ENV = System.getenv();
    private static final Options ALL_OPTIONS = new Options().addOption(BootstrapTools.newDynamicPropertiesOption());

    public static void main(String[] strArr) {
        EnvironmentInformation.logEnvironmentInfo(LOG, "Mesos AppMaster", strArr);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);
        System.exit(new MesosApplicationMasterRunner().run(strArr));
    }

    protected int run(String[] strArr) {
        try {
            LOG.debug("All environment variables: {}", ENV);
            final Configuration parseDynamicProperties = BootstrapTools.parseDynamicProperties(new PosixParser().parse(ALL_OPTIONS, strArr));
            final Configuration loadConfiguration = MesosEntrypointUtils.loadConfiguration(parseDynamicProperties, LOG);
            try {
                FileSystem.initialize(loadConfiguration);
                SecurityUtils.install(new SecurityConfiguration(loadConfiguration));
                return ((Integer) SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() { // from class: org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Integer call() throws Exception {
                        return Integer.valueOf(MesosApplicationMasterRunner.this.runPrivileged(loadConfiguration, parseDynamicProperties));
                    }
                })).intValue();
            } catch (IOException e) {
                throw new IOException("Error while configuring the filesystems.", e);
            }
        } catch (Throwable th) {
            LOG.error("Mesos AppMaster initialization failed", th);
            return 31;
        }
    }

    protected int runPrivileged(Configuration configuration, Configuration configuration2) {
        ActorSystem actorSystem = null;
        WebMonitor webMonitor = null;
        ScheduledExecutorService scheduledExecutorService = null;
        ExecutorService executorService = null;
        MesosServices mesosServices = null;
        try {
            String string = configuration.getString(JobManagerOptions.ADDRESS, InetAddress.getLocalHost().getHostName());
            LOG.info("App Master Hostname to use: {}", string);
            MesosConfiguration createMesosSchedulerConfiguration = MesosEntrypointUtils.createMesosSchedulerConfiguration(configuration, string);
            int numberCPUCores = Hardware.getNumberCPUCores();
            scheduledExecutorService = Executors.newScheduledThreadPool(numberCPUCores, new ExecutorThreadFactory("mesos-jobmanager-future"));
            executorService = Executors.newFixedThreadPool(numberCPUCores, new ExecutorThreadFactory("mesos-jobmanager-io"));
            mesosServices = MesosServicesUtils.createMesosServices(configuration, string);
            MesosTaskManagerParameters createTmParameters = MesosEntrypointUtils.createTmParameters(configuration, LOG);
            int integer = configuration.getInteger(JobManagerOptions.PORT);
            Preconditions.checkState(integer >= 0 && integer <= 65536, "Config parameter \"" + JobManagerOptions.PORT.key() + "\" is invalid, it must be between 0 and 65536");
            actorSystem = BootstrapTools.startActorSystem(configuration, string, integer, LOG);
            Address address = AkkaUtils.getAddress(actorSystem);
            String str = address.host().get();
            int intValue = ((Integer) address.port().get()).intValue();
            LOG.info("Actor system bound to hostname {}.", str);
            LOG.debug("Starting Artifact Server");
            MesosArtifactServer artifactServer = mesosServices.getArtifactServer();
            ContainerSpecification containerSpecification = new ContainerSpecification();
            containerSpecification.getDynamicConfiguration().addAll(configuration2);
            containerSpecification.getDynamicConfiguration().addAll(BootstrapTools.generateTaskManagerConfiguration(new Configuration(), str, intValue, createTmParameters.containeredParameters().numSlots(), TASKMANAGER_REGISTRATION_TIMEOUT));
            MesosEntrypointUtils.applyOverlays(configuration, containerSpecification);
            LaunchableMesosWorker.configureArtifactServer(artifactServer, containerSpecification);
            HighAvailabilityServices createHighAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration, executorService, HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
            LOG.debug("Starting Web Frontend");
            Time milliseconds = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
            webMonitor = BootstrapTools.startWebMonitorIfConfigured(configuration, createHighAvailabilityServices, new AkkaJobManagerRetriever(actorSystem, milliseconds, 10, Time.milliseconds(50L)), new AkkaQueryServiceRetriever(actorSystem, milliseconds), milliseconds, new ScheduledExecutorServiceAdapter(scheduledExecutorService), LOG);
            if (webMonitor != null) {
                createMesosSchedulerConfiguration.frameworkInfo().setWebuiUrl(new URL(webMonitor.getRestAddress()).toExternalForm());
            }
            LOG.debug("Starting JobManager actor");
            MetricRegistryImpl metricRegistryImpl = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
            metricRegistryImpl.startQueryService(actorSystem, null);
            ActorRef mo5859_1 = JobManager.startJobManagerActors(configuration, actorSystem, scheduledExecutorService, executorService, createHighAvailabilityServices, metricRegistryImpl, (Option<String>) (webMonitor != null ? Option.apply(webMonitor.getRestAddress()) : Option.empty()), (Option<String>) Option.apply("jobmanager"), (Option<String>) Option.apply(JobMaster.ARCHIVE_NAME), getJobManagerClass(), getArchivistClass()).mo5859_1();
            LOG.debug("Starting Mesos Flink Resource Manager");
            ActorRef actorOf = actorSystem.actorOf(MesosFlinkResourceManager.createActorProps(getResourceManagerClass(), configuration, createMesosSchedulerConfiguration, mesosServices.createMesosWorkerStore(configuration, executorService), createHighAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), createTmParameters, containerSpecification, artifactServer, LOG), "Mesos_Resource_Master");
            LOG.debug("Starting process reapers for JobManager");
            actorSystem.actorOf(Props.create((Class<?>) ProcessReaper.class, actorOf, LOG, 32), "Mesos_Resource_Master_Process_Reaper");
            actorSystem.actorOf(Props.create((Class<?>) ProcessReaper.class, mo5859_1, LOG, 32), "JobManager_Process_Reaper");
            LOG.info("Mesos JobManager started");
            try {
                Await.ready(actorSystem.whenTerminated(), Duration.Inf());
            } catch (InterruptedException | TimeoutException e) {
                LOG.error("Error shutting down actor system", e);
            }
            if (webMonitor != null) {
                try {
                    webMonitor.stop();
                } catch (Throwable th) {
                    LOG.error("Failed to stop the web frontend", th);
                }
            }
            if (createHighAvailabilityServices != null) {
                try {
                    createHighAvailabilityServices.close();
                } catch (Throwable th2) {
                    LOG.error("Could not properly stop the high availability services.");
                }
            }
            if (metricRegistryImpl != null) {
                try {
                    metricRegistryImpl.shutdown().get();
                } catch (Throwable th3) {
                    LOG.error("Could not shut down metric registry.", th3);
                }
            }
            ExecutorUtils.gracefulShutdown(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS, scheduledExecutorService, executorService);
            try {
                mesosServices.close(true);
                return 0;
            } catch (Throwable th4) {
                LOG.error("Failed to clean up and close MesosServices.", th4);
                return 0;
            }
        } catch (Throwable th5) {
            LOG.error("Mesos JobManager initialization failed", th5);
            if (webMonitor != null) {
                try {
                    webMonitor.stop();
                } catch (Throwable th6) {
                    LOG.warn("Failed to stop the web frontend", th6);
                }
            }
            if (actorSystem != null) {
                actorSystem.terminate().onComplete(new OnComplete<Terminated>() { // from class: org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner.2
                    @Override // akka.dispatch.OnComplete
                    public void onComplete(Throwable th7, Terminated terminated) {
                        if (th7 != null) {
                            MesosApplicationMasterRunner.LOG.error("Error shutting down actor system", th7);
                        }
                    }
                }, org.apache.flink.runtime.concurrent.Executors.directExecutionContext());
            }
            if (scheduledExecutorService != null) {
                try {
                    scheduledExecutorService.shutdownNow();
                } catch (Throwable th7) {
                    LOG.error("Error shutting down future executor", th7);
                }
            }
            if (executorService != null) {
                try {
                    executorService.shutdownNow();
                } catch (Throwable th8) {
                    LOG.error("Error shutting down io executor", th8);
                }
            }
            if (mesosServices == null) {
                return 31;
            }
            try {
                mesosServices.close(false);
                return 31;
            } catch (Throwable th9) {
                LOG.error("Error closing the mesos services.", th9);
                return 31;
            }
        }
    }

    protected Class<? extends MesosFlinkResourceManager> getResourceManagerClass() {
        return MesosFlinkResourceManager.class;
    }

    protected Class<? extends JobManager> getJobManagerClass() {
        return MesosJobManager.class;
    }

    protected Class<? extends MemoryArchivist> getArchivistClass() {
        return MemoryArchivist.class;
    }
}
