package org.apache.flink.runtime.scheduler.adaptive.allocator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/allocator/RuntimeRescalingSlotAssigner.class */
public class RuntimeRescalingSlotAssigner implements SlotAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(RuntimeRescalingSlotAssigner.class);

    @Override // org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAssigner
    public Collection<JobSchedulingPlan.SlotAssignment> assignSlots(JobInformation jobInformation, Collection<? extends SlotInfo> collection, VertexParallelism vertexParallelism, JobAllocationsInformation jobAllocationsInformation) {
        ArrayList arrayList = new ArrayList();
        Iterator<SlotSharingGroup> it = jobInformation.getSlotSharingGroups().iterator();
        while (it.hasNext()) {
            arrayList.addAll(DefaultSlotAssigner.createExecutionSlotSharingGroups(vertexParallelism, it.next()));
        }
        Map<ExecutionVertexID, AllocationID> executionVertexAllocations = jobAllocationsInformation.getExecutionVertexAllocations();
        HashSet hashSet = new HashSet(executionVertexAllocations.values());
        Map map = (Map) collection.stream().filter(slotInfo -> {
            return hashSet.contains(slotInfo.getAllocationId());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getAllocationId();
        }, Function.identity()));
        List list = (List) collection.stream().filter(slotInfo2 -> {
            return !hashSet.contains(slotInfo2.getAllocationId());
        }).sorted(Comparator.comparing(slotInfo3 -> {
            return slotInfo3.getTaskManagerLocation().getResourceID().toString();
        })).collect(Collectors.toList());
        LOG.info("{} oldSlots:    {}", "Assigning slots for Runtime Rescaling,", map);
        LOG.info("{} allocations: {}", "Assigning slots for Runtime Rescaling,", executionVertexAllocations);
        LOG.info("{} newSlots:    {}", "Assigning slots for Runtime Rescaling,", list);
        LOG.info("{} parallelism: {}", "Assigning slots for Runtime Rescaling,", vertexParallelism);
        LOG.info("{} slot groups: {}", "Assigning slots for Runtime Rescaling,", describeGroups(arrayList));
        ArrayList arrayList2 = new ArrayList();
        for (SlotSharingSlotAllocator.ExecutionSlotSharingGroup executionSlotSharingGroup : arrayList) {
            Stream<ExecutionVertexID> stream = executionSlotSharingGroup.getContainedExecutionVertices().stream();
            Objects.requireNonNull(executionVertexAllocations);
            Stream<ExecutionVertexID> filter = stream.filter((v1) -> {
                return r1.containsKey(v1);
            });
            Objects.requireNonNull(executionVertexAllocations);
            Set set = (Set) filter.map((v1) -> {
                return r1.get(v1);
            }).collect(Collectors.toSet());
            SlotInfo slotInfo4 = null;
            if (!set.isEmpty()) {
                slotInfo4 = (SlotInfo) map.get(set.iterator().next());
                if (slotInfo4 != null) {
                    LOG.info("{} old slot {} assigned for {}", new Object[]{"Assigning slots for Runtime Rescaling,", slotInfo4, describeGroup(executionSlotSharingGroup)});
                }
            } else if (!list.isEmpty()) {
                slotInfo4 = (SlotInfo) list.remove(0);
                LOG.info("{} new slot {} assigned for {}", new Object[]{"Assigning slots for Runtime Rescaling,", slotInfo4, describeGroup(executionSlotSharingGroup)});
            }
            if (slotInfo4 == null) {
                LOG.warn("{} Cannot find a slot for the group {}", "Assigning slots for Runtime Rescaling,", describeGroup(executionSlotSharingGroup));
                return null;
            }
            arrayList2.add(new JobSchedulingPlan.SlotAssignment(slotInfo4, executionSlotSharingGroup));
        }
        return arrayList2;
    }

    private String describeGroup(SlotSharingSlotAllocator.ExecutionSlotSharingGroup executionSlotSharingGroup) {
        return executionSlotSharingGroup == null ? "null" : executionSlotSharingGroup.getId() + ": " + executionSlotSharingGroup.getContainedExecutionVertices();
    }

    private String describeGroups(List<SlotSharingSlotAllocator.ExecutionSlotSharingGroup> list) {
        return ((List) list.stream().map(this::describeGroup).collect(Collectors.toList())).toString();
    }
}
