package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.util.Iterator;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
import org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.util.SlotSelectionStrategyUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.class */
public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory {
    @Override // org.apache.flink.runtime.scheduler.SchedulerNGFactory
    public SchedulerNG createInstance(Logger logger, JobGraph jobGraph, Executor executor, Configuration configuration, SlotPoolService slotPoolService, ScheduledExecutorService scheduledExecutorService, ClassLoader classLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, Time time, BlobWriter blobWriter, JobManagerJobMetricGroup jobManagerJobMetricGroup, Time time2, ShuffleMaster<?> shuffleMaster, JobMasterPartitionTracker jobMasterPartitionTracker, ExecutionDeploymentTracker executionDeploymentTracker, long j, ComponentMainThreadExecutor componentMainThreadExecutor, FatalErrorHandler fatalErrorHandler, JobStatusListener jobStatusListener) throws Exception {
        Preconditions.checkState(jobGraph.getJobType() == JobType.BATCH, "Adaptive batch scheduler only supports batch jobs");
        checkAllExchangesBlocking(jobGraph);
        SlotPool slotPool = (SlotPool) slotPoolService.castInto(SlotPool.class).orElseThrow(() -> {
            return new IllegalStateException("The AdaptiveBatchScheduler requires a SlotPool.");
        });
        SlotSelectionStrategy selectSlotSelectionStrategy = SlotSelectionStrategyUtils.selectSlotSelectionStrategy(JobType.BATCH, configuration);
        PhysicalSlotRequestBulkCheckerImpl createFromSlotPool = PhysicalSlotRequestBulkCheckerImpl.createFromSlotPool(slotPool, SystemClock.getInstance());
        SlotSharingExecutionSlotAllocatorFactory slotSharingExecutionSlotAllocatorFactory = new SlotSharingExecutionSlotAllocatorFactory(new PhysicalSlotProviderImpl(selectSlotSelectionStrategy, slotPool), false, createFromSlotPool, time2);
        RestartBackoffTimeStrategy create = RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory(jobGraph.getSerializedExecutionConfig().deserializeValue(classLoader).getRestartStrategy(), configuration, jobGraph.isCheckpointingEnabled()).create();
        logger.info("Using restart back off time strategy {} for {} ({}).", create, jobGraph.getName(), jobGraph.getJobID());
        DefaultExecutionGraphFactory defaultExecutionGraphFactory = new DefaultExecutionGraphFactory(configuration, classLoader, executionDeploymentTracker, scheduledExecutorService, executor, time, jobManagerJobMetricGroup, blobWriter, shuffleMaster, jobMasterPartitionTracker, true);
        createFromSlotPool.getClass();
        return new AdaptiveBatchScheduler(logger, jobGraph, executor, configuration, createFromSlotPool::start, new ScheduledExecutorServiceAdapter(scheduledExecutorService), classLoader, new CheckpointsCleaner(), checkpointRecoveryFactory, jobManagerJobMetricGroup, new VertexwiseSchedulingStrategy.Factory(), FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(configuration), create, new DefaultExecutionVertexOperations(), new ExecutionVertexVersioner(), slotSharingExecutionSlotAllocatorFactory, j, componentMainThreadExecutor, jobStatusListener, defaultExecutionGraphFactory, shuffleMaster, time, DefaultVertexParallelismDecider.from(configuration), DefaultVertexParallelismDecider.getNormalizedMaxParallelism(configuration));
    }

    private void checkAllExchangesBlocking(JobGraph jobGraph) {
        Iterator<JobVertex> it = jobGraph.getVertices().iterator();
        while (it.hasNext()) {
            Iterator<IntermediateDataSet> it2 = it.next().getProducedDataSets().iterator();
            while (it2.hasNext()) {
                Preconditions.checkState(it2.next().getResultType().isBlocking(), String.format("At the moment, adaptive batch scheduler requires batch workloads to be executed with types of all edges being BLOCKING. To do that, you need to configure '%s' to '%s'.", ExecutionOptions.BATCH_SHUFFLE_MODE.key(), BatchShuffleMode.ALL_EXCHANGES_BLOCKING));
            }
        }
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNGFactory
    public JobManagerOptions.SchedulerType getSchedulerType() {
        return JobManagerOptions.SchedulerType.AdaptiveBatch;
    }
}
