package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionGroupReleaseStrategyFactoryLoader;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLoader;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.class */
public class DefaultExecutionGraphBuilder {
    public static DefaultExecutionGraph buildGraph(JobGraph jobGraph, Configuration configuration, ScheduledExecutorService scheduledExecutorService, Executor executor, ClassLoader classLoader, CompletedCheckpointStore completedCheckpointStore, CheckpointsCleaner checkpointsCleaner, CheckpointIDCounter checkpointIDCounter, Time time, BlobWriter blobWriter, Logger logger, ShuffleMaster<?> shuffleMaster, JobMasterPartitionTracker jobMasterPartitionTracker, TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint, ExecutionDeploymentListener executionDeploymentListener, ExecutionStateUpdateListener executionStateUpdateListener, long j, VertexAttemptNumberStore vertexAttemptNumberStore, VertexParallelismStore vertexParallelismStore, Supplier<CheckpointStatsTracker> supplier, boolean z, ExecutionJobVertex.Factory factory, MarkPartitionFinishedStrategy markPartitionFinishedStrategy, boolean z2) throws JobExecutionException, JobException {
        StateBackend stateBackend;
        CheckpointStorage checkpointStorage;
        List<MasterTriggerRestoreHook<?>> arrayList;
        Preconditions.checkNotNull(jobGraph, "job graph cannot be null");
        String name = jobGraph.getName();
        JobID jobID = jobGraph.getJobID();
        try {
            DefaultExecutionGraph defaultExecutionGraph = new DefaultExecutionGraph(new JobInformation(jobID, name, jobGraph.getSerializedExecutionConfig(), jobGraph.getJobConfiguration(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths()), scheduledExecutorService, executor, time, configuration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE), classLoader, blobWriter, PartitionGroupReleaseStrategyFactoryLoader.loadPartitionGroupReleaseStrategyFactory(configuration), shuffleMaster, jobMasterPartitionTracker, partitionLocationConstraint, executionDeploymentListener, executionStateUpdateListener, j, vertexAttemptNumberStore, vertexParallelismStore, z, factory, jobGraph.getJobStatusHooks(), markPartitionFinishedStrategy, z2);
            try {
                defaultExecutionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
            } catch (Throwable th) {
                logger.warn("Cannot create JSON plan for job", th);
                defaultExecutionGraph.setJsonPlan("{}");
            }
            long nanoTime = System.nanoTime();
            logger.info("Running initialization on master for job {} ({}).", name, jobID);
            for (JobVertex jobVertex : jobGraph.getVertices()) {
                String invokableClassName = jobVertex.getInvokableClassName();
                if (invokableClassName == null || invokableClassName.isEmpty()) {
                    throw new JobSubmissionException(jobID, "The vertex " + jobVertex.getID() + " (" + jobVertex.getName() + ") has no invokable class.");
                }
                try {
                    jobVertex.initializeOnMaster(new SimpleInitializeOnMasterContext(classLoader, vertexParallelismStore.getParallelismInfo(jobVertex.getID()).getParallelism()));
                } catch (Throwable th2) {
                    throw new JobExecutionException(jobID, "Cannot initialize task '" + jobVertex.getName() + "': " + th2.getMessage(), th2);
                }
            }
            logger.info("Successfully ran initialization on master in {} ms.", Long.valueOf((System.nanoTime() - nanoTime) / 1000000));
            List<JobVertex> verticesSortedTopologicallyFromSources = jobGraph.getVerticesSortedTopologicallyFromSources();
            if (logger.isDebugEnabled()) {
                logger.debug("Adding {} vertices from job graph {} ({}).", new Object[]{Integer.valueOf(verticesSortedTopologicallyFromSources.size()), name, jobID});
            }
            defaultExecutionGraph.attachJobGraph(verticesSortedTopologicallyFromSources);
            if (logger.isDebugEnabled()) {
                logger.debug("Successfully created execution graph from job graph {} ({}).", name, jobID);
            }
            if (z) {
                logger.warn("Skip setting up checkpointing for a job with dynamic graph.");
            } else if (isCheckpointingEnabled(jobGraph)) {
                JobCheckpointingSettings checkpointingSettings = jobGraph.getCheckpointingSettings();
                SerializedValue<StateBackend> defaultStateBackend = checkpointingSettings.getDefaultStateBackend();
                if (defaultStateBackend == null) {
                    stateBackend = null;
                } else {
                    try {
                        stateBackend = (StateBackend) defaultStateBackend.deserializeValue(classLoader);
                    } catch (IOException | ClassNotFoundException e) {
                        throw new JobExecutionException(jobID, "Could not deserialize application-defined state backend.", e);
                    }
                }
                try {
                    StateBackend fromApplicationOrConfigOrDefault = StateBackendLoader.fromApplicationOrConfigOrDefault(stateBackend, checkpointingSettings.isChangelogStateBackendEnabled(), configuration, classLoader, logger);
                    SerializedValue<CheckpointStorage> defaultCheckpointStorage = checkpointingSettings.getDefaultCheckpointStorage();
                    if (defaultCheckpointStorage == null) {
                        checkpointStorage = null;
                    } else {
                        try {
                            checkpointStorage = (CheckpointStorage) defaultCheckpointStorage.deserializeValue(classLoader);
                        } catch (IOException | ClassNotFoundException e2) {
                            throw new JobExecutionException(jobID, "Could not deserialize application-defined checkpoint storage.", e2);
                        }
                    }
                    try {
                        CheckpointStorage load = CheckpointStorageLoader.load(checkpointStorage, null, fromApplicationOrConfigOrDefault, configuration, classLoader, logger);
                        SerializedValue<MasterTriggerRestoreHook.Factory[]> masterHooks = checkpointingSettings.getMasterHooks();
                        if (masterHooks == null) {
                            arrayList = Collections.emptyList();
                        } else {
                            try {
                                MasterTriggerRestoreHook.Factory[] factoryArr = (MasterTriggerRestoreHook.Factory[]) masterHooks.deserializeValue(classLoader);
                                Thread currentThread = Thread.currentThread();
                                ClassLoader contextClassLoader = currentThread.getContextClassLoader();
                                currentThread.setContextClassLoader(classLoader);
                                try {
                                    arrayList = new ArrayList(factoryArr.length);
                                    for (MasterTriggerRestoreHook.Factory factory2 : factoryArr) {
                                        arrayList.add(MasterHooks.wrapHook(factory2.create(), classLoader));
                                    }
                                } finally {
                                    currentThread.setContextClassLoader(contextClassLoader);
                                }
                            } catch (IOException | ClassNotFoundException e3) {
                                throw new JobExecutionException(jobID, "Could not instantiate user-defined checkpoint hooks", e3);
                            }
                        }
                        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = checkpointingSettings.getCheckpointCoordinatorConfiguration();
                        configuration.getString(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE);
                        defaultExecutionGraph.enableCheckpointing(checkpointCoordinatorConfiguration, arrayList, checkpointIDCounter, completedCheckpointStore, fromApplicationOrConfigOrDefault, load, supplier.get(), checkpointsCleaner, configuration.getString(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE));
                    } catch (IllegalConfigurationException | DynamicCodeLoadingException e4) {
                        throw new JobExecutionException(jobID, "Could not instantiate configured checkpoint storage", e4);
                    }
                } catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e5) {
                    throw new JobExecutionException(jobID, "Could not instantiate configured state backend", e5);
                }
            }
            if (isEarlyStoppingEnabled(jobGraph)) {
                defaultExecutionGraph.enableEarlyStopping();
            }
            return defaultExecutionGraph;
        } catch (IOException e6) {
            throw new JobException("Could not create the ExecutionGraph.", e6);
        }
    }

    public static boolean isCheckpointingEnabled(JobGraph jobGraph) {
        return jobGraph.getCheckpointingSettings() != null;
    }

    public static boolean isEarlyStoppingEnabled(JobGraph jobGraph) {
        return jobGraph.getEarlyStoppingSettings() != null && jobGraph.getEarlyStoppingSettings().isEarlyStoppingEnabled();
    }

    private DefaultExecutionGraphBuilder() {
    }
}
