package org.apache.flink.runtime.scheduler.adaptive.allocator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.class */
public class StateLocalitySlotAssigner implements SlotAssigner {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner$AllocationScore.class */
    public static class AllocationScore implements Comparable<AllocationScore> {
        private final String groupId;
        private final AllocationID allocationId;
        private final long score;

        public AllocationScore(String str, AllocationID allocationID, long j) {
            this.groupId = str;
            this.allocationId = allocationID;
            this.score = j;
        }

        public String getGroupId() {
            return this.groupId;
        }

        public AllocationID getAllocationId() {
            return this.allocationId;
        }

        public long getScore() {
            return this.score;
        }

        @Override // java.lang.Comparable
        public int compareTo(AllocationScore allocationScore) {
            int compare = Long.compare(this.score, allocationScore.score);
            if (compare != 0) {
                return compare;
            }
            int compareTo = allocationScore.allocationId.compareTo(this.allocationId);
            return compareTo != 0 ? compareTo : allocationScore.groupId.compareTo(this.groupId);
        }
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAssigner
    public Collection<JobSchedulingPlan.SlotAssignment> assignSlots(JobInformation jobInformation, Collection<? extends SlotInfo> collection, VertexParallelism vertexParallelism, JobAllocationsInformation jobAllocationsInformation) {
        Preconditions.checkState(collection.size() >= jobInformation.getSlotSharingGroups().size(), "Not enough slots to allocate all the slot sharing groups (have: %s, need: %s)", new Object[]{Integer.valueOf(collection.size()), Integer.valueOf(jobInformation.getSlotSharingGroups().size())});
        ArrayList arrayList = new ArrayList();
        Iterator<SlotSharingGroup> it = jobInformation.getSlotSharingGroups().iterator();
        while (it.hasNext()) {
            arrayList.addAll(DefaultSlotAssigner.createExecutionSlotSharingGroups(vertexParallelism, it.next()));
        }
        PriorityQueue<AllocationScore> calculateScores = calculateScores(jobInformation, jobAllocationsInformation, arrayList, getParallelism(arrayList));
        Map map = (Map) arrayList.stream().collect(Collectors.toMap((v0) -> {
            return v0.getId();
        }, Function.identity()));
        Map map2 = (Map) collection.stream().collect(Collectors.toMap((v0) -> {
            return v0.getAllocationId();
        }, Function.identity()));
        ArrayList arrayList2 = new ArrayList();
        while (true) {
            AllocationScore poll = calculateScores.poll();
            if (poll == null) {
                break;
            }
            if (map2.containsKey(poll.getAllocationId()) && map.containsKey(poll.getGroupId())) {
                arrayList2.add(new JobSchedulingPlan.SlotAssignment((SlotInfo) map2.remove(poll.getAllocationId()), map.remove(poll.getGroupId())));
            }
        }
        Iterator it2 = ((List) map2.values().stream().sorted(Comparator.comparing(slotInfo -> {
            return slotInfo.getTaskManagerLocation().getResourceID().toString();
        })).collect(Collectors.toList())).iterator();
        for (SlotSharingSlotAllocator.ExecutionSlotSharingGroup executionSlotSharingGroup : map.values()) {
            Preconditions.checkState(it2.hasNext(), "No slots available for group %s (%s more in total). This is likely a bug.", new Object[]{executionSlotSharingGroup, Integer.valueOf(map.size())});
            arrayList2.add(new JobSchedulingPlan.SlotAssignment((SlotInfo) it2.next(), executionSlotSharingGroup));
            it2.remove();
        }
        return arrayList2;
    }

    @Nonnull
    private PriorityQueue<AllocationScore> calculateScores(JobInformation jobInformation, JobAllocationsInformation jobAllocationsInformation, List<SlotSharingSlotAllocator.ExecutionSlotSharingGroup> list, Map<JobVertexID, Integer> map) {
        PriorityQueue<AllocationScore> priorityQueue = new PriorityQueue<>((Comparator<? super AllocationScore>) Comparator.reverseOrder());
        Iterator<SlotSharingSlotAllocator.ExecutionSlotSharingGroup> it = list.iterator();
        while (it.hasNext()) {
            priorityQueue.addAll(calculateScore(it.next(), map, jobInformation, jobAllocationsInformation));
        }
        return priorityQueue;
    }

    private static Map<JobVertexID, Integer> getParallelism(List<SlotSharingSlotAllocator.ExecutionSlotSharingGroup> list) {
        HashMap hashMap = new HashMap();
        Iterator<SlotSharingSlotAllocator.ExecutionSlotSharingGroup> it = list.iterator();
        while (it.hasNext()) {
            Iterator<ExecutionVertexID> it2 = it.next().getContainedExecutionVertices().iterator();
            while (it2.hasNext()) {
                hashMap.merge(it2.next().getJobVertexId(), 1, (v0, v1) -> {
                    return Integer.sum(v0, v1);
                });
            }
        }
        return hashMap;
    }

    public Collection<AllocationScore> calculateScore(SlotSharingSlotAllocator.ExecutionSlotSharingGroup executionSlotSharingGroup, Map<JobVertexID, Integer> map, JobInformation jobInformation, JobAllocationsInformation jobAllocationsInformation) {
        HashMap hashMap = new HashMap();
        for (ExecutionVertexID executionVertexID : executionSlotSharingGroup.getContainedExecutionVertices()) {
            KeyGroupRange computeKeyGroupRangeForOperatorIndex = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(jobInformation.getVertexInformation(executionVertexID.getJobVertexId()).getMaxParallelism(), map.get(executionVertexID.getJobVertexId()).intValue(), executionVertexID.getSubtaskIndex());
            jobAllocationsInformation.getAllocations(executionVertexID.getJobVertexId()).forEach(vertexAllocationInformation -> {
                long estimateSize = estimateSize(computeKeyGroupRangeForOperatorIndex, vertexAllocationInformation);
                if (estimateSize > 0) {
                    hashMap.merge(vertexAllocationInformation.getAllocationID(), Long.valueOf(estimateSize), (v0, v1) -> {
                        return Long.sum(v0, v1);
                    });
                }
            });
        }
        return (Collection) hashMap.entrySet().stream().map(entry -> {
            return new AllocationScore(executionSlotSharingGroup.getId(), (AllocationID) entry.getKey(), ((Long) entry.getValue()).longValue());
        }).collect(Collectors.toList());
    }

    private static long estimateSize(KeyGroupRange keyGroupRange, JobAllocationsInformation.VertexAllocationInformation vertexAllocationInformation) {
        KeyGroupRange keyGroupRange2 = vertexAllocationInformation.getKeyGroupRange();
        return keyGroupRange2.getIntersection(keyGroupRange).getNumberOfKeyGroups() * Math.max(vertexAllocationInformation.averageKeyGroupSizeInBytes, 1L);
    }
}
