package org.apache.flink.runtime.scheduler;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/SchedulerTestingUtils.class */
public class SchedulerTestingUtils {
    private static final Logger LOG;
    private static final long DEFAULT_CHECKPOINT_TIMEOUT_MS = 600000;
    private static final Time DEFAULT_TIMEOUT;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/SchedulerTestingUtils$DefaultSchedulerBuilder.class */
    public static class DefaultSchedulerBuilder {
        private final JobGraph jobGraph;
        private SchedulingStrategyFactory schedulingStrategyFactory;
        private Logger log;
        private BackPressureStatsTracker backPressureStatsTracker;
        private Executor ioExecutor;
        private Configuration jobMasterConfiguration;
        private ScheduledExecutorService futureExecutor;
        private ScheduledExecutor delayExecutor;
        private ClassLoader userCodeLoader;
        private CheckpointRecoveryFactory checkpointRecoveryFactory;
        private Time rpcTimeout;
        private BlobWriter blobWriter;
        private JobManagerJobMetricGroup jobManagerJobMetricGroup;
        private ShuffleMaster<?> shuffleMaster;
        private JobMasterPartitionTracker partitionTracker;
        private FailoverStrategy.Factory failoverStrategyFactory;
        private RestartBackoffTimeStrategy restartBackoffTimeStrategy;
        private ExecutionVertexOperations executionVertexOperations;
        private ExecutionVertexVersioner executionVertexVersioner;
        private ExecutionSlotAllocatorFactory executionSlotAllocatorFactory;

        private DefaultSchedulerBuilder(JobGraph jobGraph) {
            this.schedulingStrategyFactory = new PipelinedRegionSchedulingStrategy.Factory();
            this.log = SchedulerTestingUtils.LOG;
            this.backPressureStatsTracker = VoidBackPressureStatsTracker.INSTANCE;
            this.ioExecutor = TestingUtils.defaultExecutor();
            this.jobMasterConfiguration = new Configuration();
            this.futureExecutor = TestingUtils.defaultExecutor();
            this.delayExecutor = new ScheduledExecutorServiceAdapter(this.futureExecutor);
            this.userCodeLoader = ClassLoader.getSystemClassLoader();
            this.checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
            this.rpcTimeout = SchedulerTestingUtils.DEFAULT_TIMEOUT;
            this.blobWriter = VoidBlobWriter.getInstance();
            this.jobManagerJobMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
            this.shuffleMaster = NettyShuffleMaster.INSTANCE;
            this.partitionTracker = NoOpJobMasterPartitionTracker.INSTANCE;
            this.failoverStrategyFactory = new RestartPipelinedRegionFailoverStrategy.Factory();
            this.restartBackoffTimeStrategy = NoRestartBackoffTimeStrategy.INSTANCE;
            this.executionVertexOperations = new DefaultExecutionVertexOperations();
            this.executionVertexVersioner = new ExecutionVertexVersioner();
            this.executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory();
            this.jobGraph = jobGraph;
        }

        public DefaultSchedulerBuilder setLogger(Logger logger) {
            this.log = logger;
            return this;
        }

        public DefaultSchedulerBuilder setBackPressureStatsTracker(BackPressureStatsTracker backPressureStatsTracker) {
            this.backPressureStatsTracker = backPressureStatsTracker;
            return this;
        }

        public DefaultSchedulerBuilder setIoExecutor(Executor executor) {
            this.ioExecutor = executor;
            return this;
        }

        public DefaultSchedulerBuilder setJobMasterConfiguration(Configuration configuration) {
            this.jobMasterConfiguration = configuration;
            return this;
        }

        public DefaultSchedulerBuilder setFutureExecutor(ScheduledExecutorService scheduledExecutorService) {
            this.futureExecutor = scheduledExecutorService;
            return this;
        }

        public DefaultSchedulerBuilder setDelayExecutor(ScheduledExecutor scheduledExecutor) {
            this.delayExecutor = scheduledExecutor;
            return this;
        }

        public DefaultSchedulerBuilder setUserCodeLoader(ClassLoader classLoader) {
            this.userCodeLoader = classLoader;
            return this;
        }

        public DefaultSchedulerBuilder setCheckpointRecoveryFactory(CheckpointRecoveryFactory checkpointRecoveryFactory) {
            this.checkpointRecoveryFactory = checkpointRecoveryFactory;
            return this;
        }

        public DefaultSchedulerBuilder setRpcTimeout(Time time) {
            this.rpcTimeout = time;
            return this;
        }

        public DefaultSchedulerBuilder setBlobWriter(BlobWriter blobWriter) {
            this.blobWriter = blobWriter;
            return this;
        }

        public DefaultSchedulerBuilder setJobManagerJobMetricGroup(JobManagerJobMetricGroup jobManagerJobMetricGroup) {
            this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
            return this;
        }

        public DefaultSchedulerBuilder setShuffleMaster(ShuffleMaster<?> shuffleMaster) {
            this.shuffleMaster = shuffleMaster;
            return this;
        }

        public DefaultSchedulerBuilder setPartitionTracker(JobMasterPartitionTracker jobMasterPartitionTracker) {
            this.partitionTracker = jobMasterPartitionTracker;
            return this;
        }

        public DefaultSchedulerBuilder setSchedulingStrategyFactory(SchedulingStrategyFactory schedulingStrategyFactory) {
            this.schedulingStrategyFactory = schedulingStrategyFactory;
            return this;
        }

        public DefaultSchedulerBuilder setFailoverStrategyFactory(FailoverStrategy.Factory factory) {
            this.failoverStrategyFactory = factory;
            return this;
        }

        public DefaultSchedulerBuilder setRestartBackoffTimeStrategy(RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
            this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
            return this;
        }

        public DefaultSchedulerBuilder setExecutionVertexOperations(ExecutionVertexOperations executionVertexOperations) {
            this.executionVertexOperations = executionVertexOperations;
            return this;
        }

        public DefaultSchedulerBuilder setExecutionVertexVersioner(ExecutionVertexVersioner executionVertexVersioner) {
            this.executionVertexVersioner = executionVertexVersioner;
            return this;
        }

        public DefaultSchedulerBuilder setExecutionSlotAllocatorFactory(ExecutionSlotAllocatorFactory executionSlotAllocatorFactory) {
            this.executionSlotAllocatorFactory = executionSlotAllocatorFactory;
            return this;
        }

        public DefaultScheduler build() throws Exception {
            return new DefaultScheduler(this.log, this.jobGraph, this.backPressureStatsTracker, this.ioExecutor, this.jobMasterConfiguration, componentMainThreadExecutor -> {
            }, this.futureExecutor, this.delayExecutor, this.userCodeLoader, this.checkpointRecoveryFactory, this.rpcTimeout, this.blobWriter, this.jobManagerJobMetricGroup, this.shuffleMaster, this.partitionTracker, this.schedulingStrategyFactory, this.failoverStrategyFactory, this.restartBackoffTimeStrategy, this.executionVertexOperations, this.executionVertexVersioner, this.executionSlotAllocatorFactory, new DefaultExecutionDeploymentTracker(), System.currentTimeMillis());
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/SchedulerTestingUtils$TaskExecutorOperatorEventGatewayAdapter.class */
    private static final class TaskExecutorOperatorEventGatewayAdapter extends SimpleAckingTaskManagerGateway {
        private final TaskExecutorOperatorEventGateway operatorGateway;

        TaskExecutorOperatorEventGatewayAdapter(TaskExecutorOperatorEventGateway taskExecutorOperatorEventGateway) {
            this.operatorGateway = taskExecutorOperatorEventGateway;
        }

        @Override // org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway
        public CompletableFuture<Acknowledge> sendOperatorEventToTask(ExecutionAttemptID executionAttemptID, OperatorID operatorID, SerializedValue<OperatorEvent> serializedValue) {
            return this.operatorGateway.sendOperatorEventToTask(executionAttemptID, operatorID, serializedValue);
        }
    }

    private SchedulerTestingUtils() {
    }

    public static DefaultSchedulerBuilder newSchedulerBuilder(JobGraph jobGraph) {
        return new DefaultSchedulerBuilder(jobGraph);
    }

    public static DefaultSchedulerBuilder newSchedulerBuilderWithDefaultSlotAllocator(JobGraph jobGraph, SlotProvider slotProvider) {
        return newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, slotProvider, DEFAULT_TIMEOUT);
    }

    public static DefaultSchedulerBuilder newSchedulerBuilderWithDefaultSlotAllocator(JobGraph jobGraph, SlotProvider slotProvider, Time time) {
        return new DefaultSchedulerBuilder(jobGraph).setExecutionSlotAllocatorFactory(createDefaultExecutionSlotAllocatorFactory(jobGraph.getScheduleMode(), slotProvider, time));
    }

    public static DefaultScheduler createScheduler(JobGraph jobGraph, SlotProvider slotProvider) throws Exception {
        return createScheduler(jobGraph, slotProvider, DEFAULT_TIMEOUT);
    }

    public static DefaultScheduler createScheduler(JobGraph jobGraph, SlotProvider slotProvider, Time time) throws Exception {
        return newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, slotProvider, time).build();
    }

    public static DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph, ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService) throws Exception {
        return createScheduler(jobGraph, manuallyTriggeredScheduledExecutorService, new SimpleAckingTaskManagerGateway());
    }

    public static DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph, ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService, TaskExecutorOperatorEventGateway taskExecutorOperatorEventGateway) throws Exception {
        return createScheduler(jobGraph, manuallyTriggeredScheduledExecutorService, taskExecutorOperatorEventGateway instanceof TaskManagerGateway ? (TaskManagerGateway) taskExecutorOperatorEventGateway : new TaskExecutorOperatorEventGatewayAdapter(taskExecutorOperatorEventGateway));
    }

    public static DefaultSchedulerBuilder createScheduler(JobGraph jobGraph, ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService, TaskManagerGateway taskManagerGateway) throws Exception {
        return newSchedulerBuilder(jobGraph).setFutureExecutor(manuallyTriggeredScheduledExecutorService).setDelayExecutor(manuallyTriggeredScheduledExecutorService).setSchedulingStrategyFactory(new EagerSchedulingStrategy.Factory()).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0L)).setExecutionSlotAllocatorFactory(new TestExecutionSlotAllocatorFactory(taskManagerGateway));
    }

    public static DefaultExecutionSlotAllocatorFactory createDefaultExecutionSlotAllocatorFactory(ScheduleMode scheduleMode, SlotProvider slotProvider, Time time) {
        return new DefaultExecutionSlotAllocatorFactory(SlotProviderStrategy.from(scheduleMode, slotProvider, time));
    }

    public static void enableCheckpointing(JobGraph jobGraph) {
        enableCheckpointing(jobGraph, null);
    }

    public static void enableCheckpointing(JobGraph jobGraph, @Nullable StateBackend stateBackend) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            if (jobVertex.isInputVertex()) {
                arrayList.add(jobVertex.getID());
            }
            arrayList2.add(jobVertex.getID());
        }
        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(Long.MAX_VALUE, DEFAULT_CHECKPOINT_TIMEOUT_MS, 0L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, false, false, false, 0);
        SerializedValue serializedValue = null;
        if (stateBackend != null) {
            try {
                serializedValue = new SerializedValue(stateBackend);
            } catch (IOException e) {
                throw new RuntimeException("could not serialize state backend", e);
            }
        }
        jobGraph.setSnapshotSettings(new JobCheckpointingSettings(arrayList, arrayList2, arrayList2, checkpointCoordinatorConfiguration, serializedValue));
    }

    public static Collection<ExecutionAttemptID> getAllCurrentExecutionAttempts(DefaultScheduler defaultScheduler) {
        return (Collection) StreamSupport.stream(defaultScheduler.requestJob().getAllExecutionVertices().spliterator(), false).map(archivedExecutionVertex -> {
            return archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        }).collect(Collectors.toList());
    }

    public static ExecutionState getExecutionState(DefaultScheduler defaultScheduler, JobVertexID jobVertexID, int i) {
        return getJobVertex(defaultScheduler, jobVertexID).getTaskVertices()[i].getCurrentExecutionAttempt().getState();
    }

    public static void failExecution(DefaultScheduler defaultScheduler, JobVertexID jobVertexID, int i) {
        defaultScheduler.updateTaskExecutionState(new TaskExecutionState(defaultScheduler.getJobId(), getAttemptId(defaultScheduler, jobVertexID, i), ExecutionState.FAILED, new Exception("test task failure")));
    }

    public static void canceledExecution(DefaultScheduler defaultScheduler, JobVertexID jobVertexID, int i) {
        defaultScheduler.updateTaskExecutionState(new TaskExecutionState(defaultScheduler.getJobId(), getAttemptId(defaultScheduler, jobVertexID, i), ExecutionState.CANCELED, new Exception("test task failure")));
    }

    public static void setExecutionToState(ExecutionState executionState, DefaultScheduler defaultScheduler, JobVertexID jobVertexID, int i) {
        defaultScheduler.updateTaskExecutionState(new TaskExecutionState(defaultScheduler.getJobId(), getAttemptId(defaultScheduler, jobVertexID, i), executionState));
    }

    public static void setAllExecutionsToRunning(DefaultScheduler defaultScheduler) {
        JobID jobId = defaultScheduler.getJobId();
        getAllCurrentExecutionAttempts(defaultScheduler).forEach(executionAttemptID -> {
            defaultScheduler.updateTaskExecutionState(new TaskExecutionState(jobId, executionAttemptID, ExecutionState.INITIALIZING));
            defaultScheduler.updateTaskExecutionState(new TaskExecutionState(jobId, executionAttemptID, ExecutionState.RUNNING));
        });
    }

    public static void setAllExecutionsToCancelled(DefaultScheduler defaultScheduler) {
        JobID jobId = defaultScheduler.getJobId();
        Iterator<ExecutionAttemptID> it = getAllCurrentExecutionAttempts(defaultScheduler).iterator();
        while (it.hasNext()) {
            Assert.assertTrue("could not switch task to RUNNING", defaultScheduler.updateTaskExecutionState(new TaskExecutionState(jobId, it.next(), ExecutionState.CANCELED)));
        }
    }

    public static void acknowledgePendingCheckpoint(DefaultScheduler defaultScheduler, long j) throws CheckpointException {
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(defaultScheduler);
        JobID jobId = defaultScheduler.getJobId();
        Iterator<ExecutionAttemptID> it = getAllCurrentExecutionAttempts(defaultScheduler).iterator();
        while (it.hasNext()) {
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, it.next(), j), "Unknown location");
        }
    }

    public static CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler defaultScheduler) throws Exception {
        return getCheckpointCoordinator(defaultScheduler).triggerCheckpoint(false);
    }

    public static void acknowledgeCurrentCheckpoint(DefaultScheduler defaultScheduler) {
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(defaultScheduler);
        Assert.assertEquals("Coordinator has not ", 1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        while (pendingCheckpoint.getNumberOfNonAcknowledgedOperatorCoordinators() > 0) {
            try {
                Thread.sleep(1L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                Assert.fail("interrupted");
            }
        }
        getAllCurrentExecutionAttempts(defaultScheduler).forEach(executionAttemptID -> {
            defaultScheduler.acknowledgeCheckpoint(pendingCheckpoint.getJobId(), executionAttemptID, pendingCheckpoint.getCheckpointId(), new CheckpointMetrics(), (TaskStateSnapshot) null);
        });
    }

    public static CompletedCheckpoint takeCheckpoint(DefaultScheduler defaultScheduler) throws Exception {
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(defaultScheduler);
        checkpointCoordinator.triggerCheckpoint(false);
        Assert.assertEquals("test setup inconsistent", 1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        CompletableFuture completionFuture = pendingCheckpoint.getCompletionFuture();
        acknowledgePendingCheckpoint(defaultScheduler, pendingCheckpoint.getCheckpointId());
        CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint) completionFuture.getNow(null);
        Assert.assertNotNull("checkpoint not complete", completedCheckpoint);
        return completedCheckpoint;
    }

    public static CheckpointCoordinator getCheckpointCoordinator(SchedulerBase schedulerBase) {
        return schedulerBase.getCheckpointCoordinator();
    }

    private static ExecutionJobVertex getJobVertex(DefaultScheduler defaultScheduler, JobVertexID jobVertexID) {
        return defaultScheduler.getExecutionVertex(new ExecutionVertexID(jobVertexID, 0)).getJobVertex();
    }

    public static ExecutionAttemptID getAttemptId(DefaultScheduler defaultScheduler, JobVertexID jobVertexID, int i) {
        ExecutionJobVertex jobVertex = getJobVertex(defaultScheduler, jobVertexID);
        if ($assertionsDisabled || jobVertex != null) {
            return jobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getAttemptId();
        }
        throw new AssertionError();
    }

    static {
        $assertionsDisabled = !SchedulerTestingUtils.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(SchedulerTestingUtils.class);
        DEFAULT_TIMEOUT = Time.seconds(300L);
    }
}
