package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
import org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge;
import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.class */
public class ExecutionGraphBuilder {
    public static final String PARALLELISM_AUTO_MAX_ERROR_MESSAGE = "PARALLELISM_AUTO_MAX is no longer supported. Please specify a concrete value for the parallelism.";

    public static ExecutionGraph buildGraph(@Nullable ExecutionGraph executionGraph, JobGraph jobGraph, Configuration configuration, ScheduledExecutorService scheduledExecutorService, Executor executor, SlotProvider slotProvider, ClassLoader classLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, Time time, RestartStrategy restartStrategy, MetricGroup metricGroup, BlobWriter blobWriter, Time time2, Logger logger) throws JobExecutionException, JobException {
        return buildGraph(executionGraph, jobGraph, configuration, scheduledExecutorService, executor, slotProvider, classLoader, checkpointRecoveryFactory, time, restartStrategy, metricGroup, -1, blobWriter, time2, logger);
    }

    @Deprecated
    public static ExecutionGraph buildGraph(@Nullable ExecutionGraph executionGraph, JobGraph jobGraph, Configuration configuration, ScheduledExecutorService scheduledExecutorService, Executor executor, SlotProvider slotProvider, ClassLoader classLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, Time time, RestartStrategy restartStrategy, MetricGroup metricGroup, int i, BlobWriter blobWriter, Time time2, Logger logger) throws JobExecutionException, JobException {
        ExecutionGraph executionGraph2;
        StateBackend deserializeValue;
        List<MasterTriggerRestoreHook<?>> arrayList;
        Preconditions.checkNotNull(jobGraph, "job graph cannot be null");
        String name = jobGraph.getName();
        JobID jobID = jobGraph.getJobID();
        FailoverStrategy.Factory loadFailoverStrategy = FailoverStrategyLoader.loadFailoverStrategy(configuration, logger);
        JobInformation jobInformation = new JobInformation(jobID, name, jobGraph.getSerializedExecutionConfig(), jobGraph.getJobConfiguration(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
        if (executionGraph != null) {
            executionGraph2 = executionGraph;
        } else {
            try {
                executionGraph2 = new ExecutionGraph(jobInformation, scheduledExecutorService, executor, time, restartStrategy, loadFailoverStrategy, slotProvider, classLoader, blobWriter, time2);
            } catch (IOException e) {
                throw new JobException("Could not create the ExecutionGraph.", e);
            }
        }
        ExecutionGraph executionGraph3 = executionGraph2;
        executionGraph3.setScheduleMode(jobGraph.getScheduleMode());
        executionGraph3.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
        try {
            executionGraph3.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
        } catch (Throwable th) {
            logger.warn("Cannot create JSON plan for job", th);
            executionGraph3.setJsonPlan("{}");
        }
        long nanoTime = System.nanoTime();
        logger.info("Running initialization on master for job {} ({}).", name, jobID);
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            String invokableClassName = jobVertex.getInvokableClassName();
            if (invokableClassName == null || invokableClassName.isEmpty()) {
                throw new JobSubmissionException(jobID, "The vertex " + jobVertex.getID() + " (" + jobVertex.getName() + ") has no invokable class.");
            }
            if (jobVertex.getParallelism() == Integer.MAX_VALUE) {
                if (i < 0) {
                    throw new JobSubmissionException(jobID, PARALLELISM_AUTO_MAX_ERROR_MESSAGE);
                }
                jobVertex.setParallelism(i);
            }
            try {
                jobVertex.initializeOnMaster(classLoader);
            } catch (Throwable th2) {
                throw new JobExecutionException(jobID, "Cannot initialize task '" + jobVertex.getName() + "': " + th2.getMessage(), th2);
            }
        }
        logger.info("Successfully ran initialization on master in {} ms.", Long.valueOf((System.nanoTime() - nanoTime) / 1000000));
        List<JobVertex> verticesSortedTopologicallyFromSources = jobGraph.getVerticesSortedTopologicallyFromSources();
        if (logger.isDebugEnabled()) {
            logger.debug("Adding {} vertices from job graph {} ({}).", Integer.valueOf(verticesSortedTopologicallyFromSources.size()), name, jobID);
        }
        executionGraph3.attachJobGraph(verticesSortedTopologicallyFromSources);
        if (logger.isDebugEnabled()) {
            logger.debug("Successfully created execution graph from job graph {} ({}).", name, jobID);
        }
        JobCheckpointingSettings checkpointingSettings = jobGraph.getCheckpointingSettings();
        if (checkpointingSettings != null) {
            List<ExecutionJobVertex> idToVertex = idToVertex(checkpointingSettings.getVerticesToTrigger(), executionGraph3);
            List<ExecutionJobVertex> idToVertex2 = idToVertex(checkpointingSettings.getVerticesToAcknowledge(), executionGraph3);
            List<ExecutionJobVertex> idToVertex3 = idToVertex(checkpointingSettings.getVerticesToConfirm(), executionGraph3);
            try {
                int integer = configuration.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);
                if (integer <= 0) {
                    logger.warn("The setting for '{} : {}' is invalid. Using default value of {}", CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(), Integer.valueOf(integer), CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());
                    integer = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue();
                }
                CompletedCheckpointStore createCheckpointStore = checkpointRecoveryFactory.createCheckpointStore(jobID, integer, classLoader);
                CheckpointIDCounter createCheckpointIDCounter = checkpointRecoveryFactory.createCheckpointIDCounter(jobID);
                CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(configuration.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE), idToVertex2, checkpointingSettings.getCheckpointCoordinatorConfiguration(), metricGroup);
                configuration.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
                SerializedValue<StateBackend> defaultStateBackend = checkpointingSettings.getDefaultStateBackend();
                if (defaultStateBackend == null) {
                    deserializeValue = null;
                } else {
                    try {
                        deserializeValue = defaultStateBackend.deserializeValue(classLoader);
                    } catch (IOException | ClassNotFoundException e2) {
                        throw new JobExecutionException(jobID, "Could not deserialize application-defined state backend.", e2);
                    }
                }
                try {
                    StateBackend fromApplicationOrConfigOrDefault = StateBackendLoader.fromApplicationOrConfigOrDefault(deserializeValue, configuration, classLoader, logger);
                    SerializedValue<MasterTriggerRestoreHook.Factory[]> masterHooks = checkpointingSettings.getMasterHooks();
                    if (masterHooks == null) {
                        arrayList = Collections.emptyList();
                    } else {
                        try {
                            MasterTriggerRestoreHook.Factory[] deserializeValue2 = masterHooks.deserializeValue(classLoader);
                            Thread currentThread = Thread.currentThread();
                            ClassLoader contextClassLoader = currentThread.getContextClassLoader();
                            currentThread.setContextClassLoader(classLoader);
                            try {
                                arrayList = new ArrayList(deserializeValue2.length);
                                for (MasterTriggerRestoreHook.Factory factory : deserializeValue2) {
                                    arrayList.add(MasterHooks.wrapHook(factory.create(), classLoader));
                                }
                            } finally {
                                currentThread.setContextClassLoader(contextClassLoader);
                            }
                        } catch (IOException | ClassNotFoundException e3) {
                            throw new JobExecutionException(jobID, "Could not instantiate user-defined checkpoint hooks", e3);
                        }
                    }
                    CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = checkpointingSettings.getCheckpointCoordinatorConfiguration();
                    executionGraph3.enableCheckpointing(checkpointCoordinatorConfiguration.getCheckpointInterval(), checkpointCoordinatorConfiguration.getCheckpointTimeout(), checkpointCoordinatorConfiguration.getMinPauseBetweenCheckpoints(), checkpointCoordinatorConfiguration.getMaxConcurrentCheckpoints(), checkpointCoordinatorConfiguration.getCheckpointRetentionPolicy(), idToVertex, idToVertex2, idToVertex3, arrayList, createCheckpointIDCounter, createCheckpointStore, fromApplicationOrConfigOrDefault, checkpointStatsTracker);
                } catch (IOException | IllegalConfigurationException | DynamicCodeLoadingException e4) {
                    throw new JobExecutionException(jobID, "Could not instantiate configured state backend", e4);
                }
            } catch (Exception e5) {
                throw new JobExecutionException(jobID, "Failed to initialize high-availability checkpoint handler", e5);
            }
        }
        metricGroup.gauge(RestartTimeGauge.METRIC_NAME, (String) new RestartTimeGauge(executionGraph3));
        metricGroup.gauge(DownTimeGauge.METRIC_NAME, (String) new DownTimeGauge(executionGraph3));
        metricGroup.gauge(UpTimeGauge.METRIC_NAME, (String) new UpTimeGauge(executionGraph3));
        metricGroup.gauge(NumberOfFullRestartsGauge.METRIC_NAME, (String) new NumberOfFullRestartsGauge(executionGraph3));
        executionGraph3.getFailoverStrategy().registerMetrics(metricGroup);
        return executionGraph3;
    }

    private static List<ExecutionJobVertex> idToVertex(List<JobVertexID> list, ExecutionGraph executionGraph) throws IllegalArgumentException {
        ArrayList arrayList = new ArrayList(list.size());
        for (JobVertexID jobVertexID : list) {
            ExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);
            if (jobVertex == null) {
                throw new IllegalArgumentException("The snapshot checkpointing settings refer to non-existent vertex " + jobVertexID);
            }
            arrayList.add(jobVertex);
        }
        return arrayList;
    }

    private ExecutionGraphBuilder() {
    }
}
