package org.apache.flink.runtime.scheduler.strategy;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.class */
public class PipelinedRegionSchedulingStrategyTest extends TestLogger {
    private TestingSchedulerOperations testingSchedulerOperation;
    private static final int PARALLELISM = 2;
    private TestingSchedulingTopology testingSchedulingTopology;
    private List<TestingSchedulingExecutionVertex> source;
    private List<TestingSchedulingExecutionVertex> map;
    private List<TestingSchedulingExecutionVertex> sink;

    @Before
    public void setUp() {
        this.testingSchedulerOperation = new TestingSchedulerOperations();
        buildTopology();
    }

    private void buildTopology() {
        this.testingSchedulingTopology = new TestingSchedulingTopology();
        this.source = this.testingSchedulingTopology.addExecutionVertices().withParallelism(PARALLELISM).finish();
        this.map = this.testingSchedulingTopology.addExecutionVertices().withParallelism(PARALLELISM).finish();
        this.sink = this.testingSchedulingTopology.addExecutionVertices().withParallelism(PARALLELISM).finish();
        this.testingSchedulingTopology.connectPointwise(this.source, this.map).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED).finish();
        this.testingSchedulingTopology.connectAllToAll(this.map, this.sink).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
    }

    @Test
    public void testStartScheduling() {
        startScheduling(this.testingSchedulingTopology);
        ArrayList arrayList = new ArrayList();
        arrayList.add(Arrays.asList(this.source.get(0), this.map.get(0)));
        arrayList.add(Arrays.asList(this.source.get(1), this.map.get(1)));
        assertLatestScheduledVerticesAreEqualTo(arrayList);
    }

    @Test
    public void testRestartTasks() {
        startScheduling(this.testingSchedulingTopology).restartTasks((Set) Stream.of((Object[]) new List[]{this.source, this.map, this.sink}).flatMap((v0) -> {
            return v0.stream();
        }).map((v0) -> {
            return v0.m452getId();
        }).collect(Collectors.toSet()));
        ArrayList arrayList = new ArrayList();
        arrayList.add(Arrays.asList(this.source.get(0), this.map.get(0)));
        arrayList.add(Arrays.asList(this.source.get(1), this.map.get(1)));
        assertLatestScheduledVerticesAreEqualTo(arrayList);
    }

    @Test
    public void testNotifyingBlockingResultPartitionProducerFinished() {
        PipelinedRegionSchedulingStrategy startScheduling = startScheduling(this.testingSchedulingTopology);
        TestingSchedulingExecutionVertex testingSchedulingExecutionVertex = this.map.get(0);
        testingSchedulingExecutionVertex.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
        startScheduling.onExecutionStateChange(testingSchedulingExecutionVertex.m452getId(), ExecutionState.FINISHED);
        Assert.assertThat(this.testingSchedulerOperation.getScheduledVertices(), Matchers.hasSize(PARALLELISM));
        TestingSchedulingExecutionVertex testingSchedulingExecutionVertex2 = this.map.get(1);
        testingSchedulingExecutionVertex2.getProducedResults().iterator().next().setState(ResultPartitionState.CONSUMABLE);
        startScheduling.onExecutionStateChange(testingSchedulingExecutionVertex2.m452getId(), ExecutionState.FINISHED);
        Assert.assertThat(this.testingSchedulerOperation.getScheduledVertices(), Matchers.hasSize(4));
        ArrayList arrayList = new ArrayList();
        arrayList.add(Arrays.asList(this.sink.get(0)));
        arrayList.add(Arrays.asList(this.sink.get(1)));
        assertLatestScheduledVerticesAreEqualTo(arrayList);
    }

    @Test
    public void testFinishedBlockingResultPartitionProducerDoNotScheduleNonCreatedRegions() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().withParallelism(PARALLELISM).finish();
        List<TestingSchedulingExecutionVertex> finish2 = testingSchedulingTopology.addExecutionVertices().withParallelism(PARALLELISM).finish();
        testingSchedulingTopology.connectPointwise(finish, finish2).withResultPartitionState(ResultPartitionState.CONSUMABLE).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
        PipelinedRegionSchedulingStrategy startScheduling = startScheduling(testingSchedulingTopology);
        finish2.get(0).setState(ExecutionState.SCHEDULED);
        startScheduling.onExecutionStateChange(finish.get(0).m452getId(), ExecutionState.FINISHED);
        Assert.assertThat(this.testingSchedulerOperation.getScheduledVertices(), Matchers.hasSize(3));
        ArrayList arrayList = new ArrayList();
        arrayList.add(Collections.singletonList(finish.get(0)));
        arrayList.add(Collections.singletonList(finish.get(1)));
        arrayList.add(Collections.singletonList(finish2.get(1)));
        assertLatestScheduledVerticesAreEqualTo(arrayList);
    }

    @Test
    public void testSchedulingTopologyWithPersistentBlockingEdges() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().withParallelism(1).finish();
        testingSchedulingTopology.connectPointwise(finish, testingSchedulingTopology.addExecutionVertices().withParallelism(1).finish()).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.BLOCKING_PERSISTENT).finish();
        startScheduling(testingSchedulingTopology);
        ArrayList arrayList = new ArrayList();
        arrayList.add(Arrays.asList(finish.get(0)));
        assertLatestScheduledVerticesAreEqualTo(arrayList);
    }

    private PipelinedRegionSchedulingStrategy startScheduling(TestingSchedulingTopology testingSchedulingTopology) {
        PipelinedRegionSchedulingStrategy pipelinedRegionSchedulingStrategy = new PipelinedRegionSchedulingStrategy(this.testingSchedulerOperation, testingSchedulingTopology);
        pipelinedRegionSchedulingStrategy.startScheduling();
        return pipelinedRegionSchedulingStrategy;
    }

    private void assertLatestScheduledVerticesAreEqualTo(List<List<TestingSchedulingExecutionVertex>> list) {
        List<List<ExecutionVertexDeploymentOption>> scheduledVertices = this.testingSchedulerOperation.getScheduledVertices();
        int size = list.size();
        Assert.assertThat(Integer.valueOf(size), Matchers.lessThanOrEqualTo(Integer.valueOf(scheduledVertices.size())));
        for (int i = 0; i < size; i++) {
            Assert.assertEquals(idsFromVertices(list.get((size - i) - 1)), idsFromDeploymentOptions(scheduledVertices.get((scheduledVertices.size() - i) - 1)));
        }
    }

    private static List<ExecutionVertexID> idsFromVertices(List<TestingSchedulingExecutionVertex> list) {
        return (List) list.stream().map((v0) -> {
            return v0.m452getId();
        }).collect(Collectors.toList());
    }

    private static List<ExecutionVertexID> idsFromDeploymentOptions(List<ExecutionVertexDeploymentOption> list) {
        return (List) list.stream().map((v0) -> {
            return v0.getExecutionVertexId();
        }).collect(Collectors.toList());
    }
}
