package org.apache.flink.yarn;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.dispatch.OnComplete;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Some;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/yarn/YarnApplicationMasterRunner.class */
public class YarnApplicationMasterRunner {
    protected static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunner.class);
    private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
    private static final Map<String, String> ENV = System.getenv();
    private static final int INIT_ERROR_EXIT_CODE = 31;
    private static final int ACTOR_DIED_EXIT_CODE = 32;

    public static void main(String[] strArr) {
        EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / ResourceManager / JobManager", strArr);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);
        System.exit(new YarnApplicationMasterRunner().run(strArr));
    }

    protected int run(String[] strArr) {
        try {
            LOG.debug("All environment variables: {}", ENV);
            String str = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
            Utils.require(str != null, "YARN client user name environment variable (%s) not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);
            String str2 = ENV.get(ApplicationConstants.Environment.PWD.key());
            Utils.require(str2 != null, "Current working directory variable (%s) not set", ApplicationConstants.Environment.PWD.key());
            LOG.debug("Current working Directory: {}", str2);
            String str3 = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
            LOG.info("remoteKeytabPrincipal obtained {}", str3);
            LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}", UserGroupInformation.getCurrentUser().getShortUserName(), str);
            Map<String, String> dynamicProperties = FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
            LOG.debug("YARN dynamic properties: {}", dynamicProperties);
            final Configuration createConfiguration = createConfiguration(str2, dynamicProperties, LOG);
            try {
                FileSystem.initialize(createConfiguration);
                File file = new File(str2, Utils.KEYTAB_FILE_NAME);
                if (str3 != null && file.exists()) {
                    LOG.debug("keytabPath: {}", file.getAbsolutePath());
                    createConfiguration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, file.getAbsolutePath());
                    createConfiguration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, str3);
                }
                SecurityUtils.install(new SecurityConfiguration(createConfiguration));
                return ((Integer) SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() { // from class: org.apache.flink.yarn.YarnApplicationMasterRunner.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Integer call() {
                        return Integer.valueOf(YarnApplicationMasterRunner.this.runApplicationMaster(createConfiguration));
                    }
                })).intValue();
            } catch (IOException e) {
                throw new IOException("Error while configuring the filesystems.", e);
            }
        } catch (Throwable th) {
            LOG.error("YARN Application Master initialization failed", th);
            return INIT_ERROR_EXIT_CODE;
        }
    }

    protected int runApplicationMaster(Configuration configuration) {
        ActorSystem actorSystem = null;
        WebMonitor webMonitor = null;
        int numberCPUCores = Hardware.getNumberCPUCores();
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(numberCPUCores, new ExecutorThreadFactory("yarn-jobmanager-future"));
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(numberCPUCores, new ExecutorThreadFactory("yarn-jobmanager-io"));
        try {
            String str = ENV.get(ApplicationConstants.Environment.PWD.key());
            Utils.require(str != null, "Current working directory variable (%s) not set", ApplicationConstants.Environment.PWD.key());
            String str2 = ENV.get(ApplicationConstants.Environment.NM_HOST.key());
            Utils.require(str2 != null, "ApplicationMaster hostname variable %s not set", ApplicationConstants.Environment.NM_HOST.key());
            LOG.info("YARN assigned hostname for application master: {}", str2);
            String str3 = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
            File file = new File(str, Utils.KEYTAB_FILE_NAME);
            if (str3 != null && file.exists()) {
                String absolutePath = file.getAbsolutePath();
                LOG.debug("keytabPath: {}", absolutePath);
                configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, absolutePath);
                configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, str3);
            }
            YarnConfiguration yarnConfiguration = new YarnConfiguration();
            try {
                int parseInt = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_TM_MEMORY));
                try {
                    int parseInt2 = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_TM_COUNT));
                    try {
                        int parseInt3 = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_SLOTS));
                        ContaineredTaskManagerParameters create = ContaineredTaskManagerParameters.create(configuration, parseInt, parseInt3);
                        LOG.info("TaskManagers will be created with {} task slots", Integer.valueOf(create.numSlots()));
                        LOG.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, JVM direct memory limit {} MB", new Object[]{Long.valueOf(create.taskManagerTotalMemoryMB()), Long.valueOf(create.taskManagerHeapSizeMB()), Long.valueOf(create.taskManagerDirectMemoryLimitMB())});
                        ActorSystem startActorSystem = BootstrapTools.startActorSystem(configuration, str2, configuration.getString(YarnConfigOptions.APPLICATION_MASTER_PORT), LOG);
                        String str4 = (String) AkkaUtils.getAddress(startActorSystem).host().get();
                        int intValue = ((Integer) AkkaUtils.getAddress(startActorSystem).port().get()).intValue();
                        LOG.info("Actor system bound to hostname {}.", str4);
                        ContainerLaunchContext createTaskExecutorContext = Utils.createTaskExecutorContext(configuration, yarnConfiguration, ENV, create, BootstrapTools.generateTaskManagerConfiguration(configuration, str4, intValue, parseInt3, TASKMANAGER_REGISTRATION_TIMEOUT), str, getTaskManagerClass(), LOG);
                        configuration.setString(JobManagerOptions.ADDRESS, str4);
                        configuration.setInteger(JobManagerOptions.PORT, intValue);
                        HighAvailabilityServices createHighAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration, newFixedThreadPool, HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
                        LOG.debug("Starting Web Frontend");
                        Time milliseconds = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
                        WebMonitor createWebMonitorIfConfigured = BootstrapTools.createWebMonitorIfConfigured(configuration, createHighAvailabilityServices, new AkkaJobManagerRetriever(startActorSystem, milliseconds, 10, Time.milliseconds(50L)), new AkkaQueryServiceRetriever(startActorSystem, milliseconds), milliseconds, new ScheduledExecutorServiceAdapter(newScheduledThreadPool), LOG);
                        String str5 = "http://";
                        if (configuration.getBoolean(WebOptions.SSL_ENABLED, true) && SSLUtils.isRestSSLEnabled(configuration)) {
                            str5 = "https://";
                        }
                        String str6 = createWebMonitorIfConfigured == null ? null : str5 + str2 + ":" + createWebMonitorIfConfigured.getServerPort();
                        if (createWebMonitorIfConfigured != null) {
                            configuration.setString(WebOptions.PORT, String.valueOf(createWebMonitorIfConfigured.getServerPort()));
                        }
                        MetricRegistryImpl metricRegistryImpl = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
                        metricRegistryImpl.startQueryService(startActorSystem, (ResourceID) null);
                        LOG.debug("Starting JobManager actor");
                        ActorRef actorRef = (ActorRef) JobManager.startJobManagerActors(configuration, startActorSystem, newScheduledThreadPool, newFixedThreadPool, createHighAvailabilityServices, metricRegistryImpl, createWebMonitorIfConfigured == null ? Option.empty() : Option.apply(createWebMonitorIfConfigured.getRestAddress()), new Some("jobmanager"), Option.empty(), getJobManagerClass(), getArchivistClass())._1();
                        if (createWebMonitorIfConfigured != null) {
                            createWebMonitorIfConfigured.start();
                        }
                        LOG.debug("Starting YARN Flink Resource Manager");
                        ActorRef actorOf = startActorSystem.actorOf(YarnFlinkResourceManager.createActorProps(getResourceManagerClass(), configuration, yarnConfiguration, createHighAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), str2, str6, create, createTaskExecutorContext, parseInt2, LOG));
                        LOG.debug("Starting process reapers for JobManager and YARN Application Master");
                        startActorSystem.actorOf(Props.create(ProcessReaper.class, new Object[]{actorOf, LOG, Integer.valueOf(ACTOR_DIED_EXIT_CODE)}), "YARN_Resource_Master_Process_Reaper");
                        startActorSystem.actorOf(Props.create(ProcessReaper.class, new Object[]{actorRef, LOG, Integer.valueOf(ACTOR_DIED_EXIT_CODE)}), "JobManager_Process_Reaper");
                        LOG.info("YARN Application Master started");
                        try {
                            Await.ready(startActorSystem.whenTerminated(), Duration.Inf());
                        } catch (InterruptedException | TimeoutException e) {
                            LOG.error("Error shutting down actor system", e);
                        }
                        if (createWebMonitorIfConfigured != null) {
                            try {
                                createWebMonitorIfConfigured.stop();
                            } catch (Throwable th) {
                                LOG.error("Failed to stop the web frontend", th);
                            }
                        }
                        if (createHighAvailabilityServices != null) {
                            try {
                                createHighAvailabilityServices.close();
                            } catch (Throwable th2) {
                                LOG.error("Failed to stop the high availability services.", th2);
                            }
                        }
                        if (metricRegistryImpl != null) {
                            try {
                                metricRegistryImpl.shutdown().get();
                            } catch (Throwable th3) {
                                LOG.error("Could not properly shut down the metric registry.", th3);
                            }
                        }
                        ExecutorUtils.gracefulShutdown(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS, new ExecutorService[]{newScheduledThreadPool, newFixedThreadPool});
                        return 0;
                    } catch (NumberFormatException e2) {
                        throw new RuntimeException("Invalid value for _SLOTS : " + e2.getMessage());
                    }
                } catch (NumberFormatException e3) {
                    throw new RuntimeException("Invalid value for _CLIENT_TM_COUNT : " + e3.getMessage());
                }
            } catch (NumberFormatException e4) {
                throw new RuntimeException("Invalid value for _CLIENT_TM_MEMORY : " + e4.getMessage());
            }
        } catch (Throwable th4) {
            LOG.error("YARN Application Master initialization failed", th4);
            if (0 != 0) {
                try {
                    webMonitor.stop();
                } catch (Throwable th5) {
                    LOG.warn("Failed to stop the web frontend", th4);
                }
            }
            if (0 != 0) {
                actorSystem.terminate().onComplete(new OnComplete<Terminated>() { // from class: org.apache.flink.yarn.YarnApplicationMasterRunner.2
                    public void onComplete(Throwable th6, Terminated terminated) {
                        if (th6 != null) {
                            YarnApplicationMasterRunner.LOG.error("Error shutting down actor system", th6);
                        }
                    }
                }, org.apache.flink.runtime.concurrent.Executors.directExecutionContext());
            }
            newScheduledThreadPool.shutdownNow();
            newFixedThreadPool.shutdownNow();
            return INIT_ERROR_EXIT_CODE;
        }
    }

    protected Class<? extends YarnFlinkResourceManager> getResourceManagerClass() {
        return YarnFlinkResourceManager.class;
    }

    protected Class<? extends JobManager> getJobManagerClass() {
        return YarnJobManager.class;
    }

    protected Class<? extends MemoryArchivist> getArchivistClass() {
        return MemoryArchivist.class;
    }

    protected Class<? extends TaskManager> getTaskManagerClass() {
        return YarnTaskManager.class;
    }

    private static Configuration createConfiguration(String str, Map<String, String> map, Logger logger) {
        LOG.info("Loading config from directory " + str);
        Configuration loadConfiguration = GlobalConfiguration.loadConfiguration(str);
        for (Map.Entry<String, String> entry : map.entrySet()) {
            loadConfiguration.setString(entry.getKey(), entry.getValue());
        }
        String str2 = ENV.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
        if (str2 != null && !str2.isEmpty()) {
            loadConfiguration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, str2);
        }
        BootstrapTools.substituteDeprecatedConfigPrefix(loadConfiguration, "yarn.application-master.env.", "containerized.master.env.");
        BootstrapTools.substituteDeprecatedConfigPrefix(loadConfiguration, "yarn.taskmanager.env.", "containerized.taskmanager.env.");
        BootstrapTools.updateTmpDirectoriesInConfiguration(loadConfiguration, ENV.get(ApplicationConstants.Environment.LOCAL_DIRS.key()));
        return loadConfiguration;
    }
}
