package org.apache.flink.runtime.scheduler.adaptive.allocator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.util.ResourceCounter;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.class */
public class SlotSharingSlotAllocator implements SlotAllocator {
    private final ReserveSlotFunction reserveSlotFunction;
    private final FreeSlotFunction freeSlotFunction;
    private final IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction;
    private final boolean localRecoveryEnabled;
    private final Map<AllocationID, SharedSlot> sharedSlots = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator$ExecutionSlotSharingGroup.class */
    public static class ExecutionSlotSharingGroup {
        private final String id;
        private final Set<ExecutionVertexID> containedExecutionVertices;

        public ExecutionSlotSharingGroup(Set<ExecutionVertexID> set) {
            this(set, UUID.randomUUID().toString());
        }

        public ExecutionSlotSharingGroup(Set<ExecutionVertexID> set, String str) {
            this.containedExecutionVertices = set;
            this.id = str;
        }

        public String getId() {
            return this.id;
        }

        public Collection<ExecutionVertexID> getContainedExecutionVertices() {
            return this.containedExecutionVertices;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator$SlotSharingGroupMetaInfo.class */
    public static class SlotSharingGroupMetaInfo {
        private final int minLowerBound;
        private final int maxLowerBound;
        private final int maxUpperBound;

        private SlotSharingGroupMetaInfo(int i, int i2, int i3) {
            this.minLowerBound = i;
            this.maxLowerBound = i2;
            this.maxUpperBound = i3;
        }

        public int getMinLowerBound() {
            return this.minLowerBound;
        }

        public int getMaxLowerBound() {
            return this.maxLowerBound;
        }

        public int getMaxUpperBound() {
            return this.maxUpperBound;
        }

        public int getMaxLowerUpperBoundRange() {
            return this.maxUpperBound - this.maxLowerBound;
        }

        public static Map<SlotSharingGroupId, SlotSharingGroupMetaInfo> from(Iterable<JobInformation.VertexInformation> iterable) {
            return getPerSlotSharingGroups(iterable, vertexInformation -> {
                return new SlotSharingGroupMetaInfo(vertexInformation.getMinParallelism(), vertexInformation.getMinParallelism(), vertexInformation.getParallelism());
            }, (slotSharingGroupMetaInfo, slotSharingGroupMetaInfo2) -> {
                return new SlotSharingGroupMetaInfo(Math.min(slotSharingGroupMetaInfo.getMinLowerBound(), slotSharingGroupMetaInfo2.getMinLowerBound()), Math.max(slotSharingGroupMetaInfo.getMaxLowerBound(), slotSharingGroupMetaInfo2.getMaxLowerBound()), Math.max(slotSharingGroupMetaInfo.getMaxUpperBound(), slotSharingGroupMetaInfo2.getMaxUpperBound()));
            });
        }

        private static <T> Map<SlotSharingGroupId, T> getPerSlotSharingGroups(Iterable<JobInformation.VertexInformation> iterable, Function<JobInformation.VertexInformation, T> function, BiFunction<T, T, T> biFunction) {
            HashMap hashMap = new HashMap();
            for (JobInformation.VertexInformation vertexInformation : iterable) {
                hashMap.compute(vertexInformation.getSlotSharingGroup().getSlotSharingGroupId(), (slotSharingGroupId, obj) -> {
                    return obj == null ? function.apply(vertexInformation) : biFunction.apply(obj, function.apply(vertexInformation));
                });
            }
            return hashMap;
        }
    }

    private SlotSharingSlotAllocator(ReserveSlotFunction reserveSlotFunction, FreeSlotFunction freeSlotFunction, IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction, boolean z) {
        this.reserveSlotFunction = reserveSlotFunction;
        this.freeSlotFunction = freeSlotFunction;
        this.isSlotAvailableAndFreeFunction = isSlotAvailableAndFreeFunction;
        this.localRecoveryEnabled = z;
    }

    public static SlotSharingSlotAllocator createSlotSharingSlotAllocator(ReserveSlotFunction reserveSlotFunction, FreeSlotFunction freeSlotFunction, IsSlotAvailableAndFreeFunction isSlotAvailableAndFreeFunction, boolean z) {
        return new SlotSharingSlotAllocator(reserveSlotFunction, freeSlotFunction, isSlotAvailableAndFreeFunction, z);
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator
    public ResourceCounter calculateRequiredSlots(Iterable<JobInformation.VertexInformation> iterable) {
        int i = 0;
        Iterator<SlotSharingGroupMetaInfo> it = SlotSharingGroupMetaInfo.from(iterable).values().iterator();
        while (it.hasNext()) {
            i += it.next().getMaxUpperBound();
        }
        return ResourceCounter.withResource(ResourceProfile.UNKNOWN, i);
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator
    public Optional<VertexParallelism> determineParallelism(JobInformation jobInformation, Collection<? extends SlotInfo> collection) {
        Map<SlotSharingGroupId, SlotSharingGroupMetaInfo> from = SlotSharingGroupMetaInfo.from(jobInformation.getVertices());
        int intValue = ((Integer) from.values().stream().map((v0) -> {
            return v0.getMaxLowerBound();
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        })).intValue();
        if (intValue > collection.size()) {
            return Optional.empty();
        }
        Map<SlotSharingGroupId, Integer> determineSlotsPerSharingGroup = determineSlotsPerSharingGroup(jobInformation, collection.size(), intValue, from);
        HashMap hashMap = new HashMap();
        for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) {
            Stream<JobVertexID> stream = slotSharingGroup.getJobVertexIds().stream();
            Objects.requireNonNull(jobInformation);
            hashMap.putAll(determineVertexParallelism((List) stream.map(jobInformation::getVertexInformation).collect(Collectors.toList()), determineSlotsPerSharingGroup.get(slotSharingGroup.getSlotSharingGroupId()).intValue()));
        }
        return Optional.of(new VertexParallelism(hashMap));
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator
    public Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignment(JobInformation jobInformation, Collection<? extends SlotInfo> collection, @Nullable JobAllocationsInformation jobAllocationsInformation) {
        return determineParallelism(jobInformation, collection).map(vertexParallelism -> {
            return new JobSchedulingPlan(vertexParallelism, ((!this.localRecoveryEnabled || (jobAllocationsInformation == null || jobAllocationsInformation.isEmpty())) ? new DefaultSlotAssigner() : new StateLocalitySlotAssigner()).assignSlots(jobInformation, collection, vertexParallelism, jobAllocationsInformation));
        });
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator
    public Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignmentForRuntimeRescaling(JobInformation jobInformation, Collection<? extends SlotInfo> collection, @Nullable JobAllocationsInformation jobAllocationsInformation) {
        Optional<VertexParallelism> determineParallelism = determineParallelism(jobInformation, collection);
        if (determineParallelism.isEmpty()) {
            return Optional.empty();
        }
        VertexParallelism vertexParallelism = determineParallelism.get();
        Collection<JobSchedulingPlan.SlotAssignment> assignSlots = new RuntimeRescalingSlotAssigner().assignSlots(jobInformation, collection, vertexParallelism, jobAllocationsInformation);
        return (assignSlots == null || assignSlots.isEmpty()) ? Optional.empty() : Optional.of(new JobSchedulingPlan(vertexParallelism, assignSlots));
    }

    private static Map<SlotSharingGroupId, Integer> determineSlotsPerSharingGroup(JobInformation jobInformation, int i, int i2, Map<SlotSharingGroupId, SlotSharingGroupMetaInfo> map) {
        int i3 = i;
        int size = jobInformation.getSlotSharingGroups().size();
        int i4 = i2;
        HashMap hashMap = new HashMap();
        for (SlotSharingGroupId slotSharingGroupId : sortSlotSharingGroupsByHighestParallelismRange(map)) {
            int maxLowerBound = map.get(slotSharingGroupId).getMaxLowerBound();
            int min = maxLowerBound + Math.min(map.get(slotSharingGroupId).getMaxUpperBound() - maxLowerBound, (i3 - i4) / size);
            hashMap.put(slotSharingGroupId, Integer.valueOf(min));
            i4 -= maxLowerBound;
            i3 -= min;
            size--;
        }
        return hashMap;
    }

    private static List<SlotSharingGroupId> sortSlotSharingGroupsByHighestParallelismRange(Map<SlotSharingGroupId, SlotSharingGroupMetaInfo> map) {
        return (List) map.entrySet().stream().sorted(Comparator.comparingInt(entry -> {
            return ((SlotSharingGroupMetaInfo) entry.getValue()).getMaxLowerUpperBoundRange();
        })).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
    }

    private static Map<JobVertexID, Integer> determineVertexParallelism(Collection<JobInformation.VertexInformation> collection, int i) {
        HashMap hashMap = new HashMap();
        for (JobInformation.VertexInformation vertexInformation : collection) {
            hashMap.put(vertexInformation.getJobVertexID(), Integer.valueOf(Math.min(vertexInformation.getParallelism(), i)));
        }
        return hashMap;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator
    public Optional<ReservedSlots> tryReserveResources(JobSchedulingPlan jobSchedulingPlan) {
        this.sharedSlots.clear();
        if (!areAllExpectedSlotsAvailableAndFree(calculateExpectedSlots(jobSchedulingPlan.getSlotAssignments()))) {
            return Optional.empty();
        }
        HashMap hashMap = new HashMap();
        for (JobSchedulingPlan.SlotAssignment slotAssignment : jobSchedulingPlan.getSlotAssignments()) {
            SharedSlot reserveSharedSlot = reserveSharedSlot(slotAssignment.getSlotInfo());
            this.sharedSlots.put(slotAssignment.getSlotInfo().getAllocationId(), reserveSharedSlot);
            Iterator<ExecutionVertexID> it = ((ExecutionSlotSharingGroup) slotAssignment.getTargetAs(ExecutionSlotSharingGroup.class)).getContainedExecutionVertices().iterator();
            while (it.hasNext()) {
                hashMap.put(it.next(), reserveSharedSlot.allocateLogicalSlot());
            }
        }
        return Optional.of(ReservedSlots.create(hashMap, this.sharedSlots));
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator
    public boolean hasRelocations(JobSchedulingPlan jobSchedulingPlan, JobAllocationsInformation jobAllocationsInformation) {
        Map<ExecutionVertexID, AllocationID> executionVertexAllocations = jobAllocationsInformation.getExecutionVertexAllocations();
        for (JobSchedulingPlan.SlotAssignment slotAssignment : jobSchedulingPlan.getSlotAssignments()) {
            AllocationID allocationId = slotAssignment.getSlotInfo().getAllocationId();
            for (ExecutionVertexID executionVertexID : ((ExecutionSlotSharingGroup) slotAssignment.getTargetAs(ExecutionSlotSharingGroup.class)).getContainedExecutionVertices()) {
                if (executionVertexAllocations.containsKey(executionVertexID) && !allocationId.equals(executionVertexAllocations.get(executionVertexID))) {
                    return true;
                }
            }
        }
        return false;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator
    public ReservedSlots reserveAdditionalResources(JobSchedulingPlan jobSchedulingPlan, List<Execution> list) {
        HashMap hashMap = new HashMap();
        Set set = (Set) list.stream().map(execution -> {
            return execution.getVertex().getID();
        }).collect(Collectors.toSet());
        HashMap hashMap2 = new HashMap();
        try {
            for (JobSchedulingPlan.SlotAssignment slotAssignment : jobSchedulingPlan.getSlotAssignments()) {
                SlotInfo slotInfo = slotAssignment.getSlotInfo();
                AllocationID allocationId = slotInfo.getAllocationId();
                SharedSlot sharedSlot = this.sharedSlots.get(allocationId);
                for (ExecutionVertexID executionVertexID : ((ExecutionSlotSharingGroup) slotAssignment.getTargetAs(ExecutionSlotSharingGroup.class)).getContainedExecutionVertices()) {
                    if (set.contains(executionVertexID)) {
                        if (sharedSlot == null || !sharedSlot.isAllocated()) {
                            if (!this.isSlotAvailableAndFreeFunction.isSlotAvailableAndFree(allocationId)) {
                                throw new RuntimeException("The slot should be available and free");
                            }
                            sharedSlot = reserveSharedSlot(slotInfo);
                            hashMap2.put(allocationId, sharedSlot);
                        }
                        hashMap.put(executionVertexID, sharedSlot.allocateLogicalSlot());
                    }
                }
            }
            this.sharedSlots.putAll(hashMap2);
            return ReservedSlots.create(hashMap, hashMap2);
        } catch (Exception e) {
            ReservedSlots.create(hashMap, hashMap2).releaseAll(e);
            throw e;
        }
    }

    @Nonnull
    private Collection<AllocationID> calculateExpectedSlots(Iterable<JobSchedulingPlan.SlotAssignment> iterable) {
        ArrayList arrayList = new ArrayList();
        Iterator<JobSchedulingPlan.SlotAssignment> it = iterable.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getSlotInfo().getAllocationId());
        }
        return arrayList;
    }

    private boolean areAllExpectedSlotsAvailableAndFree(Iterable<? extends AllocationID> iterable) {
        Iterator<? extends AllocationID> it = iterable.iterator();
        while (it.hasNext()) {
            if (!this.isSlotAvailableAndFreeFunction.isSlotAvailableAndFree(it.next())) {
                return false;
            }
        }
        return true;
    }

    private SharedSlot reserveSharedSlot(SlotInfo slotInfo) {
        return new SharedSlot(new SlotRequestId(), this.reserveSlotFunction.reserveSlot(slotInfo.getAllocationId(), ResourceProfile.UNKNOWN), slotInfo.willBeOccupiedIndefinitely(), () -> {
            this.freeSlotFunction.freeSlot(slotInfo.getAllocationId(), null, System.currentTimeMillis());
        });
    }
}
