package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.class */
public class TaskExecutorManager implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TaskExecutorManager.class);
    private final ResourceProfile defaultSlotResourceProfile;
    private final WorkerResourceSpec defaultWorkerResourceSpec;
    private final int numSlotsPerWorker;
    private final int maxSlotNum;
    private final boolean waitResultConsumedBeforeRelease;
    private final int redundantTaskManagerNum;
    private final Time taskManagerTimeout;
    private final ResourceActions resourceActions;
    private final Executor mainThreadExecutor;
    private final ScheduledFuture<?> taskManagerTimeoutsAndRedundancyCheck;
    private final Semaphore redeployingSemaphore;
    private final Map<InstanceID, TaskManagerRegistration> taskManagerRegistrations = new HashMap();
    private final Map<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots = new HashMap();
    private final Map<InstanceID, TaskManagerRegistration> blacklistedTaskManagerRegistrations = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public TaskExecutorManager(WorkerResourceSpec workerResourceSpec, int i, int i2, boolean z, int i3, Time time, ScheduledExecutor scheduledExecutor, Executor executor, ResourceActions resourceActions, Semaphore semaphore) {
        this.defaultWorkerResourceSpec = workerResourceSpec;
        this.numSlotsPerWorker = i;
        this.maxSlotNum = i2;
        this.waitResultConsumedBeforeRelease = z;
        this.redundantTaskManagerNum = i3;
        this.taskManagerTimeout = time;
        this.defaultSlotResourceProfile = SlotManagerUtils.generateDefaultSlotResourceProfile(workerResourceSpec, i);
        this.resourceActions = (ResourceActions) Preconditions.checkNotNull(resourceActions);
        this.mainThreadExecutor = executor;
        this.taskManagerTimeoutsAndRedundancyCheck = scheduledExecutor.scheduleWithFixedDelay(() -> {
            executor.execute(this::checkTaskManagerTimeoutsAndRedundancy);
        }, 0L, time.toMilliseconds(), TimeUnit.MILLISECONDS);
        this.redeployingSemaphore = (Semaphore) Preconditions.checkNotNull(semaphore);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.taskManagerTimeoutsAndRedundancyCheck.cancel(false);
    }

    public boolean isTaskManagerRegistered(InstanceID instanceID) {
        return this.taskManagerRegistrations.containsKey(instanceID);
    }

    public boolean registerTaskManager(TaskExecutorConnection taskExecutorConnection, SlotReport slotReport, ResourceProfile resourceProfile, ResourceProfile resourceProfile2) {
        if (isMaxSlotNumExceededAfterRegistration(slotReport)) {
            LOG.info("The total number of slots exceeds the max limitation {}, releasing the excess task executor.", Integer.valueOf(this.maxSlotNum));
            this.resourceActions.releaseResource(taskExecutorConnection.getInstanceID(), new FlinkException("The total number of slots exceeds the max limitation."));
            return false;
        }
        this.taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), new TaskManagerRegistration(taskExecutorConnection, (Collection) StreamSupport.stream(slotReport.spliterator(), false).map((v0) -> {
            return v0.getSlotID();
        }).collect(Collectors.toList()), resourceProfile, resourceProfile2));
        Iterator<SlotStatus> it = slotReport.iterator();
        while (it.hasNext()) {
            SlotStatus next = it.next();
            if (next.getJobID() == null) {
                findAndRemoveExactlyMatchingPendingTaskManagerSlot(next.getResourceProfile());
            }
        }
        return true;
    }

    private boolean isMaxSlotNumExceededAfterRegistration(SlotReport slotReport) {
        if (isMaxSlotNumExceededAfterAdding(slotReport.getNumSlotStatus())) {
            return isMaxSlotNumExceededAfterAdding(getNumNonPendingReportedNewSlots(slotReport));
        }
        return false;
    }

    private int getNumNonPendingReportedNewSlots(SlotReport slotReport) {
        HashSet hashSet = new HashSet();
        Iterator<SlotStatus> it = slotReport.iterator();
        while (it.hasNext()) {
            SlotStatus next = it.next();
            if (next.getAllocationID() == null) {
                Iterator<PendingTaskManagerSlot> it2 = this.pendingSlots.values().iterator();
                while (true) {
                    if (it2.hasNext()) {
                        PendingTaskManagerSlot next2 = it2.next();
                        if (!hashSet.contains(next2.getTaskManagerSlotId()) && isPendingSlotExactlyMatchingResourceProfile(next2, next.getResourceProfile())) {
                            hashSet.add(next2.getTaskManagerSlotId());
                            break;
                        }
                    }
                }
            }
        }
        return slotReport.getNumSlotStatus() - hashSet.size();
    }

    private void findAndRemoveExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) {
        for (PendingTaskManagerSlot pendingTaskManagerSlot : this.pendingSlots.values()) {
            if (isPendingSlotExactlyMatchingResourceProfile(pendingTaskManagerSlot, resourceProfile)) {
                this.pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());
                return;
            }
        }
    }

    private boolean isPendingSlotExactlyMatchingResourceProfile(PendingTaskManagerSlot pendingTaskManagerSlot, ResourceProfile resourceProfile) {
        return pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile);
    }

    public void unregisterTaskExecutor(InstanceID instanceID) {
        this.taskManagerRegistrations.remove(instanceID);
    }

    public Collection<InstanceID> getTaskExecutors() {
        return new ArrayList(this.taskManagerRegistrations.keySet());
    }

    public Optional<ResourceRequirement> allocateWorker(ResourceProfile resourceProfile) {
        int numberRegisteredSlots = getNumberRegisteredSlots();
        int numberPendingTaskManagerSlots = getNumberPendingTaskManagerSlots();
        if (isMaxSlotNumExceededAfterAdding(this.numSlotsPerWorker)) {
            LOG.warn("Could not allocate {} more slots. The number of registered and pending slots is {}, while the maximum is {}.", Integer.valueOf(this.numSlotsPerWorker), Integer.valueOf(numberPendingTaskManagerSlots + numberRegisteredSlots), Integer.valueOf(this.maxSlotNum));
            return Optional.empty();
        }
        if (this.defaultSlotResourceProfile.isMatching(resourceProfile) && this.resourceActions.allocateResource(this.defaultWorkerResourceSpec)) {
            for (int i = 0; i < this.numSlotsPerWorker; i++) {
                PendingTaskManagerSlot pendingTaskManagerSlot = new PendingTaskManagerSlot(this.defaultSlotResourceProfile);
                this.pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), pendingTaskManagerSlot);
            }
            return Optional.of(ResourceRequirement.create(this.defaultSlotResourceProfile, this.numSlotsPerWorker));
        }
        return Optional.empty();
    }

    private boolean isMaxSlotNumExceededAfterAdding(int i) {
        return (getNumberRegisteredSlots() + getNumberPendingTaskManagerSlots()) + i > this.maxSlotNum;
    }

    public Map<WorkerResourceSpec, Integer> getRequiredWorkers() {
        int divideRoundUp = MathUtils.divideRoundUp(getNumberPendingTaskManagerSlots(), this.numSlotsPerWorker);
        return divideRoundUp > 0 ? Collections.singletonMap(this.defaultWorkerResourceSpec, Integer.valueOf(divideRoundUp)) : Collections.emptyMap();
    }

    public WorkerResourceSpec getWorkerResourceSpec() {
        return this.defaultWorkerResourceSpec;
    }

    public int getNumSlotsPerWorker() {
        return this.numSlotsPerWorker;
    }

    @VisibleForTesting
    int getNumberPendingTaskManagerSlots() {
        return this.pendingSlots.size();
    }

    public void blacklistTaskManager(InstanceID instanceID) {
        synchronized (this.blacklistedTaskManagerRegistrations) {
            Preconditions.checkNotNull(instanceID);
            TaskManagerRegistration remove = this.taskManagerRegistrations.remove(instanceID);
            if (remove != null) {
                this.blacklistedTaskManagerRegistrations.put(instanceID, remove);
            } else {
                LOG.info("Task Manager with id {} doesn't exist", instanceID);
            }
        }
    }

    public int releaseBlacklistedTaskManagers() {
        int size;
        synchronized (this.blacklistedTaskManagerRegistrations) {
            size = this.blacklistedTaskManagerRegistrations.size();
            this.taskManagerRegistrations.putAll(this.blacklistedTaskManagerRegistrations);
            this.blacklistedTaskManagerRegistrations.clear();
        }
        return size;
    }

    public void callRedundancyCheck() {
        checkTaskManagerTimeoutsAndRedundancy();
    }

    private void checkTaskManagerTimeoutsAndRedundancy() {
        if (this.redeployingSemaphore.tryAcquire()) {
            if (!this.taskManagerRegistrations.isEmpty()) {
                long currentTimeMillis = System.currentTimeMillis();
                ArrayList<TaskManagerRegistration> arrayList = new ArrayList<>(this.taskManagerRegistrations.size());
                for (TaskManagerRegistration taskManagerRegistration : this.taskManagerRegistrations.values()) {
                    if (currentTimeMillis - taskManagerRegistration.getIdleSince() >= this.taskManagerTimeout.toMilliseconds()) {
                        arrayList.add(taskManagerRegistration);
                    }
                }
                int numberFreeSlots = (this.redundantTaskManagerNum * this.numSlotsPerWorker) - getNumberFreeSlots();
                if (numberFreeSlots > 0) {
                    allocateRedundantTaskManagers(MathUtils.divideRoundUp(numberFreeSlots, this.numSlotsPerWorker));
                } else {
                    releaseIdleTaskExecutors(arrayList, Math.min((-numberFreeSlots) / this.numSlotsPerWorker, arrayList.size()));
                }
            }
            synchronized (this.blacklistedTaskManagerRegistrations) {
                Iterator<InstanceID> it = this.blacklistedTaskManagerRegistrations.keySet().iterator();
                while (it.hasNext()) {
                    releaseIdleTaskExecutor(it.next());
                }
                this.blacklistedTaskManagerRegistrations.clear();
            }
            this.redeployingSemaphore.release();
        }
    }

    private void allocateRedundantTaskManagers(int i) {
        LOG.debug("Allocating {} task executors for redundancy.", Integer.valueOf(i));
        int allocateWorkers = allocateWorkers(i);
        if (i != allocateWorkers) {
            LOG.warn("Expect to allocate {} taskManagers. Actually allocate {} taskManagers.", Integer.valueOf(i), Integer.valueOf(allocateWorkers));
        }
    }

    private int allocateWorkers(int i) {
        int i2 = 0;
        for (int i3 = 0; i3 < i && allocateWorker(this.defaultSlotResourceProfile).isPresent(); i3++) {
            i2++;
        }
        return i2;
    }

    private void releaseIdleTaskExecutors(ArrayList<TaskManagerRegistration> arrayList, int i) {
        for (int i2 = 0; i2 < i; i2++) {
            if (this.waitResultConsumedBeforeRelease) {
                releaseIdleTaskExecutorIfPossible(arrayList.get(i2));
            } else {
                releaseIdleTaskExecutor(arrayList.get(i2).getInstanceId());
            }
        }
    }

    private void releaseIdleTaskExecutorIfPossible(TaskManagerRegistration taskManagerRegistration) {
        long idleSince = taskManagerRegistration.getIdleSince();
        taskManagerRegistration.getTaskManagerConnection().getTaskExecutorGateway().canBeReleased().thenAcceptAsync(bool -> {
            InstanceID instanceId = taskManagerRegistration.getInstanceId();
            if ((idleSince == taskManagerRegistration.getIdleSince()) && bool.booleanValue()) {
                releaseIdleTaskExecutor(instanceId);
            }
        }, this.mainThreadExecutor);
    }

    private void releaseIdleTaskExecutor(InstanceID instanceID) {
        FlinkException flinkException = new FlinkException("TaskExecutor exceeded the idle timeout.");
        LOG.debug("Release TaskExecutor {} because it exceeded the idle timeout.", instanceID);
        this.resourceActions.releaseResource(instanceID, flinkException);
    }

    public ResourceProfile getTotalRegisteredResources() {
        return (ResourceProfile) this.taskManagerRegistrations.values().stream().map((v0) -> {
            return v0.getTotalResource();
        }).reduce(ResourceProfile.ZERO, (v0, v1) -> {
            return v0.merge(v1);
        });
    }

    public ResourceProfile getTotalRegisteredResourcesOf(InstanceID instanceID) {
        return (ResourceProfile) Optional.ofNullable(this.taskManagerRegistrations.get(instanceID)).map((v0) -> {
            return v0.getTotalResource();
        }).orElse(ResourceProfile.ZERO);
    }

    public ResourceProfile getTotalFreeResources() {
        return (ResourceProfile) this.taskManagerRegistrations.values().stream().map(taskManagerRegistration -> {
            return taskManagerRegistration.getDefaultSlotResourceProfile().multiply(taskManagerRegistration.getNumberFreeSlots());
        }).reduce(ResourceProfile.ZERO, (v0, v1) -> {
            return v0.merge(v1);
        });
    }

    public ResourceProfile getTotalFreeResourcesOf(InstanceID instanceID) {
        return (ResourceProfile) Optional.ofNullable(this.taskManagerRegistrations.get(instanceID)).map(taskManagerRegistration -> {
            return taskManagerRegistration.getDefaultSlotResourceProfile().multiply(taskManagerRegistration.getNumberFreeSlots());
        }).orElse(ResourceProfile.ZERO);
    }

    public Iterable<SlotID> getSlotsOf(InstanceID instanceID) {
        return this.taskManagerRegistrations.get(instanceID).getSlots();
    }

    public int getNumberRegisteredSlots() {
        return ((Integer) this.taskManagerRegistrations.values().stream().map((v0) -> {
            return v0.getNumberRegisteredSlots();
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        })).intValue() + getNumberBlacklistedSlots();
    }

    public int getNumberRegisteredSlotsOf(InstanceID instanceID) {
        TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get(instanceID);
        if (taskManagerRegistration != null) {
            return taskManagerRegistration.getNumberRegisteredSlots();
        }
        return 0;
    }

    public int getNumberBlacklistedSlots() {
        int intValue;
        synchronized (this.blacklistedTaskManagerRegistrations) {
            intValue = ((Integer) this.blacklistedTaskManagerRegistrations.values().stream().map((v0) -> {
                return v0.getNumberRegisteredSlots();
            }).reduce(0, (v0, v1) -> {
                return Integer.sum(v0, v1);
            })).intValue();
        }
        return intValue;
    }

    public int getNumberFreeSlots() {
        return ((Integer) this.taskManagerRegistrations.values().stream().map((v0) -> {
            return v0.getNumberFreeSlots();
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        })).intValue();
    }

    public int getNumberFreeSlotsOf(InstanceID instanceID) {
        TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get(instanceID);
        if (taskManagerRegistration != null) {
            return taskManagerRegistration.getNumberFreeSlots();
        }
        return 0;
    }

    public Collection<PendingTaskManagerSlot> getPendingTaskManagerSlots() {
        return this.pendingSlots.values();
    }

    public void occupySlot(InstanceID instanceID) {
        this.taskManagerRegistrations.get(instanceID).occupySlot();
    }

    public void freeSlot(InstanceID instanceID) {
        this.taskManagerRegistrations.get(instanceID).freeSlot();
    }

    public void markUsed(InstanceID instanceID) {
        this.taskManagerRegistrations.get(instanceID).markUsed();
    }
}
