package org.apache.flink.runtime.jobmanager.scheduler;

import java.util.Collections;
import java.util.concurrent.ExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.class */
public class ScheduleWithCoLocationHintTest extends SchedulerTestBase {
    @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase
    protected ComponentMainThreadExecutor getComponentMainThreadExecutor() {
        return ComponentMainThreadExecutorServiceAdapter.forMainThread();
    }

    @Test
    public void scheduleAllSharedAndCoLocated() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        this.testingSlotProvider.addTaskManager(2);
        this.testingSlotProvider.addTaskManager(2);
        this.testingSlotProvider.addTaskManager(2);
        Assert.assertEquals(6L, this.testingSlotProvider.getNumberOfAvailableSlots());
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        CoLocationGroup coLocationGroup = new CoLocationGroup();
        CoLocationConstraint coLocationConstraint = new CoLocationConstraint(coLocationGroup);
        CoLocationConstraint coLocationConstraint2 = new CoLocationConstraint(coLocationGroup);
        CoLocationConstraint coLocationConstraint3 = new CoLocationConstraint(coLocationGroup);
        CoLocationConstraint coLocationConstraint4 = new CoLocationConstraint(coLocationGroup);
        CoLocationConstraint coLocationConstraint5 = new CoLocationConstraint(coLocationGroup);
        CoLocationConstraint coLocationConstraint6 = new CoLocationConstraint(coLocationGroup);
        LogicalSlot logicalSlot = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 0, 6, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot2 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 1, 6, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint2), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot3 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 2, 6, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint3), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot4 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 3, 6, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint4), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot5 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 0, 6, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot6 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 1, 6, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint2), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot7 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 2, 6, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint3), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot8 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 4, 6, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint5), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot9 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 5, 6, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint6), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot10 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 3, 6, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint4), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot11 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 4, 6, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint5), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot12 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 5, 6, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint6), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        Assert.assertNotNull(logicalSlot);
        Assert.assertNotNull(logicalSlot2);
        Assert.assertNotNull(logicalSlot3);
        Assert.assertNotNull(logicalSlot4);
        Assert.assertNotNull(logicalSlot5);
        Assert.assertNotNull(logicalSlot6);
        Assert.assertNotNull(logicalSlot7);
        Assert.assertNotNull(logicalSlot8);
        Assert.assertNotNull(logicalSlot9);
        Assert.assertNotNull(logicalSlot10);
        Assert.assertNotNull(logicalSlot11);
        Assert.assertNotNull(logicalSlot12);
        Assert.assertEquals(logicalSlot.getTaskManagerLocation(), logicalSlot5.getTaskManagerLocation());
        Assert.assertEquals(logicalSlot2.getTaskManagerLocation(), logicalSlot6.getTaskManagerLocation());
        Assert.assertEquals(logicalSlot3.getTaskManagerLocation(), logicalSlot7.getTaskManagerLocation());
        Assert.assertEquals(logicalSlot4.getTaskManagerLocation(), logicalSlot10.getTaskManagerLocation());
        Assert.assertEquals(logicalSlot8.getTaskManagerLocation(), logicalSlot11.getTaskManagerLocation());
        Assert.assertEquals(logicalSlot9.getTaskManagerLocation(), logicalSlot12.getTaskManagerLocation());
        Assert.assertEquals(coLocationConstraint.getLocation(), logicalSlot.getTaskManagerLocation());
        Assert.assertEquals(coLocationConstraint2.getLocation(), logicalSlot2.getTaskManagerLocation());
        Assert.assertEquals(coLocationConstraint3.getLocation(), logicalSlot3.getTaskManagerLocation());
        Assert.assertEquals(coLocationConstraint4.getLocation(), logicalSlot4.getTaskManagerLocation());
        Assert.assertEquals(coLocationConstraint5.getLocation(), logicalSlot8.getTaskManagerLocation());
        Assert.assertEquals(coLocationConstraint6.getLocation(), logicalSlot9.getTaskManagerLocation());
        Assert.assertEquals(0L, this.testingSlotProvider.getNumberOfAvailableSlots());
        Assert.assertEquals(6L, this.testingSlotProvider.getNumberOfLocalizedAssignments());
        Assert.assertEquals(0L, this.testingSlotProvider.getNumberOfNonLocalizedAssignments());
        Assert.assertEquals(6L, this.testingSlotProvider.getNumberOfUnconstrainedAssignments());
        logicalSlot.releaseSlot();
        logicalSlot2.releaseSlot();
        logicalSlot3.releaseSlot();
        logicalSlot4.releaseSlot();
        logicalSlot7.releaseSlot();
        logicalSlot10.releaseSlot();
        logicalSlot11.releaseSlot();
        logicalSlot12.releaseSlot();
        Assert.assertTrue(this.testingSlotProvider.getNumberOfAvailableSlots() >= 1);
        Assert.assertNotNull((LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(new JobVertexID(), 0, 1, null)), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get());
        logicalSlot.releaseSlot();
        logicalSlot2.releaseSlot();
        logicalSlot3.releaseSlot();
        logicalSlot5.releaseSlot();
        logicalSlot6.releaseSlot();
        logicalSlot7.releaseSlot();
        logicalSlot8.releaseSlot();
        logicalSlot9.releaseSlot();
        logicalSlot11.releaseSlot();
        logicalSlot12.releaseSlot();
        Assert.assertEquals(5L, this.testingSlotProvider.getNumberOfAvailableSlots());
        Assert.assertEquals(6L, this.testingSlotProvider.getNumberOfLocalizedAssignments());
        Assert.assertEquals(0L, this.testingSlotProvider.getNumberOfNonLocalizedAssignments());
        Assert.assertEquals(7L, this.testingSlotProvider.getNumberOfUnconstrainedAssignments());
    }

    @Test
    public void scheduleWithIntermediateRelease() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        JobVertexID jobVertexID4 = new JobVertexID();
        this.testingSlotProvider.addTaskManager(1);
        this.testingSlotProvider.addTaskManager(1);
        Assert.assertEquals(2L, this.testingSlotProvider.getNumberOfAvailableSlots());
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        CoLocationConstraint coLocationConstraint = new CoLocationConstraint(new CoLocationGroup());
        LogicalSlot logicalSlot = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 0, 1, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot2 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 0, 1, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot3 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID4, 0, 1, null)), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        ResourceID resourceID = logicalSlot.getTaskManagerLocation().getResourceID();
        logicalSlot.releaseSlot();
        logicalSlot2.releaseSlot();
        logicalSlot3.releaseSlot();
        Assert.assertEquals(resourceID, ((LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID3, 0, 1, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get()).getTaskManagerLocation().getResourceID());
        Assert.assertEquals(2L, this.testingSlotProvider.getNumberOfLocalizedAssignments());
        Assert.assertEquals(0L, this.testingSlotProvider.getNumberOfNonLocalizedAssignments());
        Assert.assertEquals(2L, this.testingSlotProvider.getNumberOfUnconstrainedAssignments());
    }

    @Test
    public void scheduleWithReleaseNoResource() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        this.testingSlotProvider.addTaskManager(1);
        this.testingSlotProvider.addTaskManager(1);
        Assert.assertEquals(2L, this.testingSlotProvider.getNumberOfAvailableSlots());
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        CoLocationConstraint coLocationConstraint = new CoLocationConstraint(new CoLocationGroup());
        ((LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 0, 1, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get()).releaseSlot();
        this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 0, 1, null)), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 1, 2, null)), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        try {
            this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID3, 0, 1, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
            Assert.fail("Scheduled even though no resource was available.");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof NoResourceAvailableException);
        }
        Assert.assertEquals(0L, this.testingSlotProvider.getNumberOfLocalizedAssignments());
        Assert.assertEquals(0L, this.testingSlotProvider.getNumberOfNonLocalizedAssignments());
        Assert.assertEquals(3L, this.testingSlotProvider.getNumberOfUnconstrainedAssignments());
    }

    @Test
    public void scheduleMixedCoLocationSlotSharing() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        JobVertexID jobVertexID4 = new JobVertexID();
        this.testingSlotProvider.addTaskManager(1);
        this.testingSlotProvider.addTaskManager(1);
        this.testingSlotProvider.addTaskManager(1);
        this.testingSlotProvider.addTaskManager(1);
        Assert.assertEquals(4L, this.testingSlotProvider.getNumberOfAvailableSlots());
        CoLocationGroup coLocationGroup = new CoLocationGroup();
        CoLocationConstraint coLocationConstraint = new CoLocationConstraint(coLocationGroup);
        CoLocationConstraint coLocationConstraint2 = new CoLocationConstraint(coLocationGroup);
        CoLocationConstraint coLocationConstraint3 = new CoLocationConstraint(coLocationGroup);
        CoLocationConstraint coLocationConstraint4 = new CoLocationConstraint(coLocationGroup);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 0, 4, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 2, 4, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 1, 4, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 3, 4, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 0, 4, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot2 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 2, 4, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint2), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot3 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 1, 4, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint3), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot4 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 3, 4, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint4), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot5 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID3, 1, 4, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint2), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot6 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID3, 2, 4, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint3), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot7 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID3, 3, 4, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint4), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot8 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID3, 0, 4, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID4, 0, 4, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID4, 1, 4, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID4, 2, 4, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID4, 3, 4, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        Assert.assertEquals(logicalSlot.getTaskManagerLocation(), logicalSlot8.getTaskManagerLocation());
        Assert.assertEquals(logicalSlot2.getTaskManagerLocation(), logicalSlot5.getTaskManagerLocation());
        Assert.assertEquals(logicalSlot3.getTaskManagerLocation(), logicalSlot6.getTaskManagerLocation());
        Assert.assertEquals(logicalSlot4.getTaskManagerLocation(), logicalSlot7.getTaskManagerLocation());
        Assert.assertEquals(4L, this.testingSlotProvider.getNumberOfLocalizedAssignments());
        Assert.assertEquals(0L, this.testingSlotProvider.getNumberOfNonLocalizedAssignments());
        Assert.assertEquals(12L, this.testingSlotProvider.getNumberOfUnconstrainedAssignments());
    }

    @Test
    public void testGetsNonLocalFromSharingGroupFirst() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        TaskManagerLocation addTaskManager = this.testingSlotProvider.addTaskManager(1);
        TaskManagerLocation addTaskManager2 = this.testingSlotProvider.addTaskManager(1);
        Assert.assertEquals(2L, this.testingSlotProvider.getNumberOfAvailableSlots());
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        CoLocationGroup coLocationGroup = new CoLocationGroup();
        CoLocationConstraint coLocationConstraint = new CoLocationConstraint(coLocationGroup);
        CoLocationConstraint coLocationConstraint2 = new CoLocationConstraint(coLocationGroup);
        LogicalSlot logicalSlot = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 0, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId()), slotProfileForLocation(addTaskManager), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot2 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 1, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId()), slotProfileForLocation(addTaskManager2), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot3 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 0, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint), slotProfileForLocation(addTaskManager), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot4 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 1, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint2), slotProfileForLocation(addTaskManager), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot5 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID3, 0, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint), slotProfileForLocation(addTaskManager2), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot6 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID3, 1, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint2), slotProfileForLocation(addTaskManager), TestingUtils.infiniteTime()).get();
        Assert.assertEquals(logicalSlot.getTaskManagerLocation(), logicalSlot3.getTaskManagerLocation());
        Assert.assertEquals(logicalSlot2.getTaskManagerLocation(), logicalSlot4.getTaskManagerLocation());
        Assert.assertEquals(logicalSlot.getTaskManagerLocation(), logicalSlot5.getTaskManagerLocation());
        Assert.assertEquals(logicalSlot2.getTaskManagerLocation(), logicalSlot6.getTaskManagerLocation());
        Assert.assertEquals(0L, this.testingSlotProvider.getNumberOfAvailableSlots());
        Assert.assertEquals(5L, this.testingSlotProvider.getNumberOfLocalizedAssignments());
        Assert.assertTrue(1 == this.testingSlotProvider.getNumberOfNonLocalizedAssignments() || 1 == this.testingSlotProvider.getNumberOfHostLocalizedAssignments());
        Assert.assertEquals(0L, this.testingSlotProvider.getNumberOfUnconstrainedAssignments());
        logicalSlot.releaseSlot();
        logicalSlot2.releaseSlot();
        logicalSlot3.releaseSlot();
        logicalSlot4.releaseSlot();
        logicalSlot5.releaseSlot();
        logicalSlot6.releaseSlot();
        Assert.assertEquals(2L, this.testingSlotProvider.getNumberOfAvailableSlots());
    }

    @Test
    public void testSlotReleasedInBetween() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        TaskManagerLocation addTaskManager = this.testingSlotProvider.addTaskManager(1);
        TaskManagerLocation addTaskManager2 = this.testingSlotProvider.addTaskManager(1);
        Assert.assertEquals(2L, this.testingSlotProvider.getNumberOfAvailableSlots());
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        CoLocationGroup coLocationGroup = new CoLocationGroup();
        CoLocationConstraint coLocationConstraint = new CoLocationConstraint(coLocationGroup);
        CoLocationConstraint coLocationConstraint2 = new CoLocationConstraint(coLocationGroup);
        LogicalSlot logicalSlot = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 0, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint), slotProfileForLocation(addTaskManager), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot2 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 1, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint2), slotProfileForLocation(addTaskManager2), TestingUtils.infiniteTime()).get();
        logicalSlot.releaseSlot();
        logicalSlot2.releaseSlot();
        Assert.assertEquals(2L, this.testingSlotProvider.getNumberOfAvailableSlots());
        LogicalSlot logicalSlot3 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 0, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint), slotProfileForLocation(addTaskManager2), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot4 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 1, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint2), slotProfileForLocation(addTaskManager), TestingUtils.infiniteTime()).get();
        Assert.assertEquals(addTaskManager, logicalSlot3.getTaskManagerLocation());
        Assert.assertEquals(addTaskManager2, logicalSlot4.getTaskManagerLocation());
        logicalSlot3.releaseSlot();
        logicalSlot4.releaseSlot();
        Assert.assertEquals(2L, this.testingSlotProvider.getNumberOfAvailableSlots());
        Assert.assertEquals(4L, this.testingSlotProvider.getNumberOfLocalizedAssignments());
        Assert.assertEquals(0L, this.testingSlotProvider.getNumberOfNonLocalizedAssignments());
        Assert.assertEquals(0L, this.testingSlotProvider.getNumberOfUnconstrainedAssignments());
    }

    @Test
    public void testSlotReleasedInBetweenAndNoNewLocal() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        TaskManagerLocation addTaskManager = this.testingSlotProvider.addTaskManager(1);
        TaskManagerLocation addTaskManager2 = this.testingSlotProvider.addTaskManager(1);
        Assert.assertEquals(2L, this.testingSlotProvider.getNumberOfAvailableSlots());
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        CoLocationGroup coLocationGroup = new CoLocationGroup();
        CoLocationConstraint coLocationConstraint = new CoLocationConstraint(coLocationGroup);
        CoLocationConstraint coLocationConstraint2 = new CoLocationConstraint(coLocationGroup);
        LogicalSlot logicalSlot = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 0, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint), slotProfileForLocation(addTaskManager), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot2 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 1, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint2), slotProfileForLocation(addTaskManager2), TestingUtils.infiniteTime()).get();
        logicalSlot.releaseSlot();
        logicalSlot2.releaseSlot();
        Assert.assertEquals(2L, this.testingSlotProvider.getNumberOfAvailableSlots());
        LogicalSlot logicalSlot3 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID3, 0, 2, null)), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot4 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID3, 1, 2, null)), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
        try {
            this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 0, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint), slotProfileForLocation(addTaskManager2), TestingUtils.infiniteTime()).get();
            Assert.fail("should not be able to find a resource");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof NoResourceAvailableException);
        } catch (Exception e2) {
            Assert.fail("wrong exception");
        }
        logicalSlot3.releaseSlot();
        logicalSlot4.releaseSlot();
        Assert.assertEquals(2L, this.testingSlotProvider.getNumberOfAvailableSlots());
        Assert.assertEquals(2L, this.testingSlotProvider.getNumberOfLocalizedAssignments());
        Assert.assertEquals(0L, this.testingSlotProvider.getNumberOfNonLocalizedAssignments());
        Assert.assertEquals(2L, this.testingSlotProvider.getNumberOfUnconstrainedAssignments());
    }

    @Test
    public void testScheduleOutOfOrder() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        TaskManagerLocation addTaskManager = this.testingSlotProvider.addTaskManager(1);
        this.testingSlotProvider.addTaskManager(1);
        Assert.assertEquals(2L, this.testingSlotProvider.getNumberOfAvailableSlots());
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        CoLocationGroup coLocationGroup = new CoLocationGroup();
        CoLocationConstraint coLocationConstraint = new CoLocationConstraint(coLocationGroup);
        CoLocationConstraint coLocationConstraint2 = new CoLocationConstraint(coLocationGroup);
        LogicalSlot logicalSlot = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 0, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint), slotProfileForLocation(addTaskManager), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot2 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 0, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint2), slotProfileForLocation(addTaskManager), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot3 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 1, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint), slotProfileForLocation(addTaskManager), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot4 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 1, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint2), slotProfileForLocation(addTaskManager), TestingUtils.infiniteTime()).get();
        Assert.assertEquals(logicalSlot.getTaskManagerLocation(), logicalSlot3.getTaskManagerLocation());
        Assert.assertEquals(logicalSlot2.getTaskManagerLocation(), logicalSlot4.getTaskManagerLocation());
        Assert.assertEquals(0L, this.testingSlotProvider.getNumberOfAvailableSlots());
        Assert.assertEquals(3L, this.testingSlotProvider.getNumberOfLocalizedAssignments());
        Assert.assertTrue(1 == this.testingSlotProvider.getNumberOfNonLocalizedAssignments() || 1 == this.testingSlotProvider.getNumberOfHostLocalizedAssignments());
        Assert.assertEquals(0L, this.testingSlotProvider.getNumberOfUnconstrainedAssignments());
        logicalSlot.releaseSlot();
        logicalSlot2.releaseSlot();
        logicalSlot3.releaseSlot();
        logicalSlot4.releaseSlot();
        Assert.assertEquals(2L, this.testingSlotProvider.getNumberOfAvailableSlots());
    }

    @Test
    public void nonColocationFollowsCoLocation() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        TaskManagerLocation addTaskManager = this.testingSlotProvider.addTaskManager(1);
        TaskManagerLocation addTaskManager2 = this.testingSlotProvider.addTaskManager(1);
        Assert.assertEquals(2L, this.testingSlotProvider.getNumberOfAvailableSlots());
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        CoLocationGroup coLocationGroup = new CoLocationGroup();
        CoLocationConstraint coLocationConstraint = new CoLocationConstraint(coLocationGroup);
        CoLocationConstraint coLocationConstraint2 = new CoLocationConstraint(coLocationGroup);
        LogicalSlot logicalSlot = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 0, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint), slotProfileForLocation(addTaskManager), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot2 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID, 1, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId(), coLocationConstraint2), slotProfileForLocation(addTaskManager2), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot3 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 0, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId()), slotProfileForLocation(addTaskManager), TestingUtils.infiniteTime()).get();
        LogicalSlot logicalSlot4 = (LogicalSlot) this.testingSlotProvider.allocateSlot(new ScheduledUnit(ExecutionGraphTestUtils.getExecution(jobVertexID2, 1, 2, slotSharingGroup), slotSharingGroup.getSlotSharingGroupId()), slotProfileForLocation(addTaskManager), TestingUtils.infiniteTime()).get();
        Assert.assertEquals(logicalSlot.getTaskManagerLocation(), logicalSlot3.getTaskManagerLocation());
        Assert.assertEquals(logicalSlot2.getTaskManagerLocation(), logicalSlot4.getTaskManagerLocation());
        logicalSlot.releaseSlot();
        logicalSlot2.releaseSlot();
        logicalSlot3.releaseSlot();
        logicalSlot4.releaseSlot();
        Assert.assertEquals(2L, this.testingSlotProvider.getNumberOfAvailableSlots());
    }

    private static SlotProfile slotProfileForLocation(TaskManagerLocation taskManagerLocation) {
        return SlotProfile.preferredLocality(ResourceProfile.UNKNOWN, Collections.singletonList(taskManagerLocation));
    }
}
