package org.apache.flink.runtime.scheduler;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.util.DualKeyLinkedMap;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/SharedSlot.class */
class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SharedSlot.class);
    private final SlotRequestId physicalSlotRequestId;
    private final ResourceProfile physicalSlotResourceProfile;
    private final ExecutionSlotSharingGroup executionSlotSharingGroup;
    private final CompletableFuture<PhysicalSlot> slotContextFuture;
    private final DualKeyLinkedMap<ExecutionVertexID, SlotRequestId, CompletableFuture<SingleLogicalSlot>> requestedLogicalSlots;
    private final boolean slotWillBeOccupiedIndefinitely;
    private final Consumer<ExecutionSlotSharingGroup> externalReleaseCallback;
    private State state = State.ALLOCATED;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/SharedSlot$State.class */
    public enum State {
        ALLOCATED,
        RELEASED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SharedSlot(SlotRequestId slotRequestId, ResourceProfile resourceProfile, ExecutionSlotSharingGroup executionSlotSharingGroup, CompletableFuture<PhysicalSlot> completableFuture, boolean z, Consumer<ExecutionSlotSharingGroup> consumer) {
        this.physicalSlotRequestId = slotRequestId;
        this.physicalSlotResourceProfile = resourceProfile;
        this.executionSlotSharingGroup = executionSlotSharingGroup;
        this.slotContextFuture = completableFuture.thenApply(physicalSlot -> {
            Preconditions.checkState(physicalSlot.tryAssignPayload(this), "Unexpected physical slot payload assignment failure!");
            return physicalSlot;
        });
        this.requestedLogicalSlots = new DualKeyLinkedMap<>(executionSlotSharingGroup.getExecutionVertexIds().size());
        this.slotWillBeOccupiedIndefinitely = z;
        this.externalReleaseCallback = consumer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SlotRequestId getPhysicalSlotRequestId() {
        return this.physicalSlotRequestId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ResourceProfile getPhysicalSlotResourceProfile() {
        return this.physicalSlotResourceProfile;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<PhysicalSlot> getSlotContextFuture() {
        return this.slotContextFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<LogicalSlot> allocateLogicalSlot(ExecutionVertexID executionVertexID) {
        Preconditions.checkState(this.state == State.ALLOCATED, "SharedSlot (physical request %s) has been released", this.physicalSlotRequestId);
        Preconditions.checkArgument(this.executionSlotSharingGroup.getExecutionVertexIds().contains(executionVertexID), "Trying to allocate a logical slot for execution %s which is not in the ExecutionSlotSharingGroup", executionVertexID);
        CompletableFuture<SingleLogicalSlot> valueByKeyA = this.requestedLogicalSlots.getValueByKeyA(executionVertexID);
        if (valueByKeyA != null) {
            LOG.debug("Request for {} already exists", getLogicalSlotString(executionVertexID));
        } else {
            valueByKeyA = allocateNonExistentLogicalSlot(executionVertexID);
        }
        return valueByKeyA.thenApply(Function.identity());
    }

    /* JADX WARN: Multi-variable type inference failed */
    private CompletableFuture<SingleLogicalSlot> allocateNonExistentLogicalSlot(ExecutionVertexID executionVertexID) {
        SlotRequestId slotRequestId = new SlotRequestId();
        String logicalSlotString = getLogicalSlotString(slotRequestId, executionVertexID);
        LOG.debug("Request a {}", logicalSlotString);
        CompletableFuture thenApply = this.slotContextFuture.thenApply(physicalSlot -> {
            LOG.debug("Allocated {}", logicalSlotString);
            return createLogicalSlot(physicalSlot, slotRequestId);
        });
        this.requestedLogicalSlots.put(executionVertexID, slotRequestId, thenApply);
        thenApply.exceptionally(th -> {
            LOG.debug("Failed {}", logicalSlotString, th);
            removeLogicalSlotRequest(slotRequestId);
            return null;
        });
        return thenApply;
    }

    private SingleLogicalSlot createLogicalSlot(PhysicalSlot physicalSlot, SlotRequestId slotRequestId) {
        return new SingleLogicalSlot(slotRequestId, physicalSlot, null, Locality.UNKNOWN, this, this.slotWillBeOccupiedIndefinitely);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cancelLogicalSlotRequest(ExecutionVertexID executionVertexID, @Nullable Throwable th) {
        Preconditions.checkState(this.state == State.ALLOCATED, "SharedSlot (physical request %s) has been released", this.physicalSlotRequestId);
        CompletableFuture<SingleLogicalSlot> valueByKeyA = this.requestedLogicalSlots.getValueByKeyA(executionVertexID);
        SlotRequestId keyBByKeyA = this.requestedLogicalSlots.getKeyBByKeyA(executionVertexID);
        if (valueByKeyA == null) {
            LOG.debug("No SlotExecutionVertexAssignment for logical {} from physical {}}", keyBByKeyA, this.physicalSlotRequestId);
            return;
        }
        LOG.debug("Cancel {} from {}", getLogicalSlotString(keyBByKeyA), executionVertexID);
        if (th == null) {
            valueByKeyA.cancel(false);
        } else {
            valueByKeyA.completeExceptionally(th);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.SlotOwner
    public void returnLogicalSlot(LogicalSlot logicalSlot) {
        removeLogicalSlotRequest(logicalSlot.getSlotRequestId());
    }

    private void removeLogicalSlotRequest(SlotRequestId slotRequestId) {
        LOG.debug("Remove {}", getLogicalSlotString(slotRequestId));
        Preconditions.checkState(this.requestedLogicalSlots.removeKeyB(slotRequestId) != null, "Trying to remove a logical slot request which has been either already removed or never created.");
        releaseExternally();
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot.Payload
    public void release(Throwable th) {
        Preconditions.checkState(this.slotContextFuture.isDone(), "Releasing of the shared slot is expected only from its successfully allocated physical slot ({})", this.physicalSlotRequestId);
        LOG.debug("Release shared slot ({})", this.physicalSlotRequestId);
        Stream<ExecutionVertexID> stream = this.requestedLogicalSlots.keySetA().stream();
        Function function = executionVertexID -> {
            return executionVertexID;
        };
        DualKeyLinkedMap<ExecutionVertexID, SlotRequestId, CompletableFuture<SingleLogicalSlot>> dualKeyLinkedMap = this.requestedLogicalSlots;
        dualKeyLinkedMap.getClass();
        for (Map.Entry entry : ((Map) stream.collect(Collectors.toMap(function, (v1) -> {
            return r2.getValueByKeyA(v1);
        }))).entrySet()) {
            LOG.debug("Release {}", getLogicalSlotString((ExecutionVertexID) entry.getKey()));
            CompletableFuture completableFuture = (CompletableFuture) entry.getValue();
            Preconditions.checkNotNull(completableFuture);
            Preconditions.checkState(completableFuture.isDone(), "Logical slot future must already done when release call comes from the successfully allocated physical slot ({})", this.physicalSlotRequestId);
            completableFuture.thenAccept(singleLogicalSlot -> {
                singleLogicalSlot.release(th);
            });
        }
        this.requestedLogicalSlots.clear();
        releaseExternally();
    }

    private void releaseExternally() {
        if (this.state == State.RELEASED || !this.requestedLogicalSlots.values().isEmpty()) {
            return;
        }
        this.state = State.RELEASED;
        LOG.debug("Release shared slot externally ({})", this.physicalSlotRequestId);
        this.externalReleaseCallback.accept(this.executionSlotSharingGroup);
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot.Payload
    public boolean willOccupySlotIndefinitely() {
        return this.slotWillBeOccupiedIndefinitely;
    }

    private String getLogicalSlotString(SlotRequestId slotRequestId) {
        return getLogicalSlotString(slotRequestId, this.requestedLogicalSlots.getKeyAByKeyB(slotRequestId));
    }

    private String getLogicalSlotString(ExecutionVertexID executionVertexID) {
        return getLogicalSlotString(this.requestedLogicalSlots.getKeyBByKeyA(executionVertexID), executionVertexID);
    }

    private String getLogicalSlotString(SlotRequestId slotRequestId, ExecutionVertexID executionVertexID) {
        return String.format("logical slot (%s) for execution vertex (id %s) from the physical slot (%s)", slotRequestId, executionVertexID, this.physicalSlotRequestId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isEmpty() {
        return this.requestedLogicalSlots.size() == 0;
    }
}
