package org.apache.flink.yarn;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.program.ActorSystemLoader;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
import org.apache.flink.runtime.clusterframework.messages.ShutdownClusterAfterJob;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.YarnMessages;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/yarn/YarnClusterClient.class */
public class YarnClusterClient extends ClusterClient<ApplicationId> {
    private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClient.class);
    private final AbstractYarnClusterDescriptor clusterDescriptor;
    private final int numberTaskManagers;
    private final int slotsPerTaskManager;
    private final LazApplicationClientLoader applicationClient;
    private final FiniteDuration akkaDuration;
    private final ApplicationId appId;
    private final String trackingURL;
    private final boolean newlyCreatedCluster;

    /* loaded from: input_file:org/apache/flink/yarn/YarnClusterClient$LazApplicationClientLoader.class */
    private static class LazApplicationClientLoader {
        private final Configuration flinkConfig;
        private final ActorSystemLoader actorSystemLoader;
        private final HighAvailabilityServices highAvailabilityServices;
        private ActorRef applicationClient;

        private LazApplicationClientLoader(Configuration configuration, ActorSystemLoader actorSystemLoader, HighAvailabilityServices highAvailabilityServices) {
            this.flinkConfig = (Configuration) Preconditions.checkNotNull(configuration, "flinkConfig");
            this.actorSystemLoader = (ActorSystemLoader) Preconditions.checkNotNull(actorSystemLoader, "actorSystemLoader");
            this.highAvailabilityServices = (HighAvailabilityServices) Preconditions.checkNotNull(highAvailabilityServices, "highAvailabilityServices");
        }

        public ActorRef get() throws FlinkException {
            if (this.applicationClient == null) {
                YarnClusterClient.LOG.info("Start application client.");
                try {
                    try {
                        this.applicationClient = this.actorSystemLoader.get().actorOf(Props.create(ApplicationClient.class, new Object[]{this.flinkConfig, this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID)}), "applicationClient");
                    } catch (Exception e) {
                        throw new FlinkException("Could not start the ApplicationClient.", e);
                    }
                } catch (FlinkException e2) {
                    throw new FlinkException("Could not start the ClusterClient's ActorSystem.", e2);
                }
            }
            return this.applicationClient;
        }
    }

    public YarnClusterClient(AbstractYarnClusterDescriptor abstractYarnClusterDescriptor, int i, int i2, ApplicationReport applicationReport, Configuration configuration, boolean z) throws Exception {
        super(configuration);
        this.akkaDuration = AkkaUtils.getTimeout(configuration);
        this.clusterDescriptor = abstractYarnClusterDescriptor;
        this.numberTaskManagers = i;
        this.slotsPerTaskManager = i2;
        this.appId = applicationReport.getApplicationId();
        this.trackingURL = applicationReport.getTrackingUrl();
        this.newlyCreatedCluster = z;
        this.applicationClient = new LazApplicationClientLoader(configuration, this.actorSystemLoader, this.highAvailabilityServices);
    }

    private void stopAfterJob(JobID jobID) {
        Preconditions.checkNotNull(jobID, "The job id must not be null");
        try {
            Await.ready(getJobManagerGateway().ask(new ShutdownClusterAfterJob(jobID), this.akkaDuration), this.akkaDuration);
        } catch (Exception e) {
            throw new RuntimeException("Unable to tell application master to stop once the specified job has been finised", e);
        }
    }

    public Configuration getFlinkConfiguration() {
        return this.flinkConfig;
    }

    public int getMaxSlots() {
        int i = this.numberTaskManagers * this.slotsPerTaskManager;
        if (i > 0) {
            return i;
        }
        return -1;
    }

    public boolean hasUserJarsInClassPath(List<URL> list) {
        return this.clusterDescriptor.hasUserJarFiles(list);
    }

    public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        if (!isDetached()) {
            return super.run(jobGraph, classLoader);
        }
        if (this.newlyCreatedCluster) {
            stopAfterJob(jobGraph.getJobID());
        }
        return super.runDetached(jobGraph, classLoader);
    }

    public String getWebInterfaceURL() {
        return (this.trackingURL.startsWith("http://") || this.trackingURL.startsWith("https://")) ? this.trackingURL : "http://" + this.trackingURL;
    }

    public GetClusterStatusResponse getClusterStatus() {
        try {
            return (GetClusterStatusResponse) Await.result(getJobManagerGateway().ask(GetClusterStatus.getInstance(), this.akkaDuration), this.akkaDuration);
        } catch (Exception e) {
            throw new RuntimeException("Unable to get ClusterClient status from Application Client", e);
        }
    }

    public List<String> getNewMessages() {
        ArrayList arrayList = new ArrayList();
        while (true) {
            try {
                Object result = Await.result(Patterns.ask(this.applicationClient.get(), YarnMessages.getLocalGetYarnMessage(), new Timeout(this.akkaDuration)), this.akkaDuration);
                if (!(result instanceof Option)) {
                    throw new RuntimeException("LocalGetYarnMessage requires a response of type Option. Instead the response is of type " + result.getClass() + ".");
                }
                Option option = (Option) result;
                LOG.debug("Received message option {}", option);
                if (option.isEmpty()) {
                    break;
                }
                Object obj = option.get();
                if (obj instanceof InfoMessage) {
                    InfoMessage infoMessage = (InfoMessage) obj;
                    arrayList.add("[" + infoMessage.date() + "] " + infoMessage.message());
                } else {
                    LOG.warn("LocalGetYarnMessage returned unexpected type: " + option);
                }
            } catch (Exception e) {
                LOG.warn("Error retrieving the YARN messages locally", e);
            }
        }
        return arrayList;
    }

    /* renamed from: getClusterId, reason: merged with bridge method [inline-methods] */
    public ApplicationId m25getClusterId() {
        return this.appId;
    }

    public boolean isDetached() {
        return super.isDetached() || this.clusterDescriptor.isDetachedMode();
    }

    public void waitForClusterToBeReady() {
        logAndSysout("Waiting until all TaskManagers have connected");
        GetClusterStatusResponse getClusterStatusResponse = null;
        while (true) {
            GetClusterStatusResponse getClusterStatusResponse2 = getClusterStatusResponse;
            GetClusterStatusResponse clusterStatus = getClusterStatus();
            if (clusterStatus != null && !clusterStatus.equals(getClusterStatusResponse2)) {
                logAndSysout("TaskManager status (" + clusterStatus.numRegisteredTaskManagers() + "/" + this.numberTaskManagers + ")");
                if (clusterStatus.numRegisteredTaskManagers() >= this.numberTaskManagers) {
                    logAndSysout("All TaskManagers are connected");
                    return;
                }
            } else if (getClusterStatusResponse2 == null) {
                logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
            }
            try {
                Thread.sleep(250L);
                getClusterStatusResponse = clusterStatus;
            } catch (InterruptedException e) {
                throw new RuntimeException("Interrupted while waiting for TaskManagers", e);
            }
        }
    }

    public void shutDownCluster() {
        LOG.info("Sending shutdown request to the Application Master");
        try {
            Await.ready(Patterns.ask(this.applicationClient.get(), new YarnMessages.LocalStopYarnSession(ApplicationStatus.SUCCEEDED, "Flink YARN Client requested shutdown"), new Timeout(this.akkaDuration)), this.akkaDuration);
        } catch (Exception e) {
            LOG.warn("Error while stopping YARN cluster.", e);
        }
    }

    public ApplicationId getApplicationId() {
        return this.appId;
    }
}
