package org.apache.flink.runtime.scheduler;

import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.class */
class SlotSharingExecutionSlotAllocator implements ExecutionSlotAllocator {
    private static final Logger LOG = LoggerFactory.getLogger(SlotSharingExecutionSlotAllocator.class);
    private final PhysicalSlotProvider slotProvider;
    private final boolean slotWillBeOccupiedIndefinitely;
    private final SlotSharingStrategy slotSharingStrategy;
    private final Map<ExecutionSlotSharingGroup, SharedSlot> sharedSlots = new IdentityHashMap();
    private final SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory;
    private final PhysicalSlotRequestBulkChecker bulkChecker;
    private final Time allocationTimeout;
    private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SlotSharingExecutionSlotAllocator(PhysicalSlotProvider physicalSlotProvider, boolean z, SlotSharingStrategy slotSharingStrategy, SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory, PhysicalSlotRequestBulkChecker physicalSlotRequestBulkChecker, Time time, Function<ExecutionVertexID, ResourceProfile> function) {
        this.slotProvider = (PhysicalSlotProvider) Preconditions.checkNotNull(physicalSlotProvider);
        this.slotWillBeOccupiedIndefinitely = z;
        this.slotSharingStrategy = (SlotSharingStrategy) Preconditions.checkNotNull(slotSharingStrategy);
        this.sharedSlotProfileRetrieverFactory = (SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory) Preconditions.checkNotNull(sharedSlotProfileRetrieverFactory);
        this.bulkChecker = (PhysicalSlotRequestBulkChecker) Preconditions.checkNotNull(physicalSlotRequestBulkChecker);
        this.allocationTimeout = (Time) Preconditions.checkNotNull(time);
        this.resourceProfileRetriever = (Function) Preconditions.checkNotNull(function);
    }

    @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocator
    public List<SlotExecutionVertexAssignment> allocateSlotsFor(List<ExecutionVertexSchedulingRequirements> list) {
        List list2 = (List) list.stream().map((v0) -> {
            return v0.getExecutionVertexId();
        }).collect(Collectors.toList());
        SharedSlotProfileRetriever createFromBulk = this.sharedSlotProfileRetrieverFactory.createFromBulk(new HashSet(list2));
        Stream stream = list2.stream();
        SlotSharingStrategy slotSharingStrategy = this.slotSharingStrategy;
        slotSharingStrategy.getClass();
        Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> map = (Map) stream.collect(Collectors.groupingBy(slotSharingStrategy::getExecutionSlotSharingGroup));
        Map<ExecutionVertexID, SlotExecutionVertexAssignment> allocateLogicalSlotsFromSharedSlots = allocateLogicalSlotsFromSharedSlots(createFromBulk, map);
        this.bulkChecker.schedulePendingRequestBulkTimeoutCheck(createBulk(map), this.allocationTimeout);
        Stream stream2 = list2.stream();
        allocateLogicalSlotsFromSharedSlots.getClass();
        return (List) stream2.map((v1) -> {
            return r1.get(v1);
        }).collect(Collectors.toList());
    }

    @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocator
    public void cancel(ExecutionVertexID executionVertexID) {
        cancelLogicalSlotRequest(executionVertexID, null);
    }

    private void cancelLogicalSlotRequest(ExecutionVertexID executionVertexID, Throwable th) {
        ExecutionSlotSharingGroup executionSlotSharingGroup = this.slotSharingStrategy.getExecutionSlotSharingGroup(executionVertexID);
        Preconditions.checkNotNull(executionSlotSharingGroup, "There is no ExecutionSlotSharingGroup for ExecutionVertexID " + executionVertexID);
        SharedSlot sharedSlot = this.sharedSlots.get(executionSlotSharingGroup);
        if (sharedSlot != null) {
            sharedSlot.cancelLogicalSlotRequest(executionVertexID, th);
        } else {
            LOG.debug("There is no SharedSlot for ExecutionSlotSharingGroup of ExecutionVertexID {}", executionVertexID);
        }
    }

    private Map<ExecutionVertexID, SlotExecutionVertexAssignment> allocateLogicalSlotsFromSharedSlots(SharedSlotProfileRetriever sharedSlotProfileRetriever, Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<ExecutionSlotSharingGroup, List<ExecutionVertexID>> entry : map.entrySet()) {
            ExecutionSlotSharingGroup key = entry.getKey();
            List<ExecutionVertexID> value = entry.getValue();
            SharedSlot orAllocateSharedSlot = getOrAllocateSharedSlot(key, sharedSlotProfileRetriever);
            for (ExecutionVertexID executionVertexID : value) {
                hashMap.put(executionVertexID, new SlotExecutionVertexAssignment(executionVertexID, orAllocateSharedSlot.allocateLogicalSlot(executionVertexID)));
            }
        }
        return hashMap;
    }

    private SharedSlot getOrAllocateSharedSlot(ExecutionSlotSharingGroup executionSlotSharingGroup, SharedSlotProfileRetriever sharedSlotProfileRetriever) {
        return this.sharedSlots.computeIfAbsent(executionSlotSharingGroup, executionSlotSharingGroup2 -> {
            SlotRequestId slotRequestId = new SlotRequestId();
            ResourceProfile physicalSlotResourceProfile = getPhysicalSlotResourceProfile(executionSlotSharingGroup2);
            return new SharedSlot(slotRequestId, physicalSlotResourceProfile, executionSlotSharingGroup2, this.slotProvider.allocatePhysicalSlot(new PhysicalSlotRequest(slotRequestId, sharedSlotProfileRetriever.getSlotProfile(executionSlotSharingGroup2, physicalSlotResourceProfile), this.slotWillBeOccupiedIndefinitely)).thenApply((v0) -> {
                return v0.getPhysicalSlot();
            }), this.slotWillBeOccupiedIndefinitely, this::releaseSharedSlot);
        });
    }

    private void releaseSharedSlot(ExecutionSlotSharingGroup executionSlotSharingGroup) {
        SharedSlot remove = this.sharedSlots.remove(executionSlotSharingGroup);
        Preconditions.checkNotNull(remove);
        Preconditions.checkState(remove.isEmpty(), "Trying to remove a shared slot with physical request id %s which has assigned logical slots", remove.getPhysicalSlotRequestId());
        this.slotProvider.cancelSlotRequest(remove.getPhysicalSlotRequestId(), new FlinkException("Slot is being returned from SlotSharingExecutionSlotAllocator."));
    }

    private ResourceProfile getPhysicalSlotResourceProfile(ExecutionSlotSharingGroup executionSlotSharingGroup) {
        return (ResourceProfile) executionSlotSharingGroup.getExecutionVertexIds().stream().reduce(ResourceProfile.ZERO, (resourceProfile, executionVertexID) -> {
            return resourceProfile.merge(this.resourceProfileRetriever.apply(executionVertexID));
        }, (v0, v1) -> {
            return v0.merge(v1);
        });
    }

    private SharingPhysicalSlotRequestBulk createBulk(Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> map) {
        SharingPhysicalSlotRequestBulk sharingPhysicalSlotRequestBulk = new SharingPhysicalSlotRequestBulk(map, (Map) map.keySet().stream().collect(Collectors.toMap(executionSlotSharingGroup -> {
            return executionSlotSharingGroup;
        }, executionSlotSharingGroup2 -> {
            return this.sharedSlots.get(executionSlotSharingGroup2).getPhysicalSlotResourceProfile();
        })), this::cancelLogicalSlotRequest);
        registerPhysicalSlotRequestBulkCallbacks(map.keySet(), sharingPhysicalSlotRequestBulk);
        return sharingPhysicalSlotRequestBulk;
    }

    private void registerPhysicalSlotRequestBulkCallbacks(Iterable<ExecutionSlotSharingGroup> iterable, SharingPhysicalSlotRequestBulk sharingPhysicalSlotRequestBulk) {
        for (ExecutionSlotSharingGroup executionSlotSharingGroup : iterable) {
            CompletableFuture<PhysicalSlot> slotContextFuture = this.sharedSlots.get(executionSlotSharingGroup).getSlotContextFuture();
            slotContextFuture.thenAccept(physicalSlot -> {
                sharingPhysicalSlotRequestBulk.markFulfilled(executionSlotSharingGroup, physicalSlot.getAllocationId());
            });
            slotContextFuture.exceptionally(th -> {
                sharingPhysicalSlotRequestBulk.clearPendingRequests();
                return null;
            });
        }
    }
}
