package org.apache.flink.runtime.resourcemanager.active;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blocklist.BlockedResource;
import org.apache.flink.runtime.blocklist.BlocklistHandler;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.ThresholdMeter;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.rescaling.DeploymentManager;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceDeclaration;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.util.FlinkExpectedException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TimeUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.class */
public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable> extends ResourceManager<WorkerType> implements ResourceEventHandler<WorkerType> {
    protected final Configuration flinkConfig;
    private final Duration startWorkerRetryInterval;
    private final ResourceManagerDriver<WorkerType> resourceManagerDriver;
    private final Map<ResourceID, WorkerType> workerNodeMap;
    private final WorkerCounter pendingWorkerCounter;
    private final WorkerCounter totalWorkerCounter;
    private final Map<ResourceID, WorkerResourceSpec> workerResourceSpecs;
    private final Map<CompletableFuture<WorkerType>, WorkerResourceSpec> unallocatedWorkerFutures;
    private final Set<ResourceID> currentAttemptUnregisteredWorkers;
    private final Map<ResourceID, CompletableFuture<Integer>> workerRegistrationListeners;
    private final Set<ResourceID> previousAttemptUnregisteredWorkers;
    private boolean delayedMasterServiceRestart;
    private final ThresholdMeter startWorkerFailureRater;
    private final Duration workerRegistrationTimeout;
    private CompletableFuture<Void> startWorkerCoolDown;
    private final CompletableFuture<Void> readyToServeFuture;
    private final Duration previousWorkerRecoverTimeout;
    private Collection<ResourceDeclaration> resourceDeclarations;
    private int targetTMNum;
    private final int minTMNum;
    private final int maxTMNum;

    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager$GatewayMainThreadExecutor.class */
    private class GatewayMainThreadExecutor implements ScheduledExecutor {
        private GatewayMainThreadExecutor() {
        }

        public ScheduledFuture<?> schedule(Runnable runnable, long j, TimeUnit timeUnit) {
            return ActiveResourceManager.this.getMainThreadExecutor().schedule(runnable, j, timeUnit);
        }

        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long j, TimeUnit timeUnit) {
            return ActiveResourceManager.this.getMainThreadExecutor().schedule(callable, j, timeUnit);
        }

        public ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long j, long j2, TimeUnit timeUnit) {
            return ActiveResourceManager.this.getMainThreadExecutor().scheduleAtFixedRate(runnable, j, j2, timeUnit);
        }

        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable, long j, long j2, TimeUnit timeUnit) {
            return ActiveResourceManager.this.getMainThreadExecutor().scheduleWithFixedDelay(runnable, j, j2, timeUnit);
        }

        public void execute(Runnable runnable) {
            ActiveResourceManager.this.getMainThreadExecutor().execute(runnable);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager$ResourceAllocatorImpl.class */
    private class ResourceAllocatorImpl implements ResourceAllocator {
        private ResourceAllocatorImpl() {
        }

        @Override // org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator
        public boolean isSupported() {
            return true;
        }

        @Override // org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator
        public void cleaningUpDisconnectedResource(ResourceID resourceID) {
            ActiveResourceManager.this.validateRunsInMainThread();
            ActiveResourceManager.this.internalStopWorker(resourceID);
        }

        @Override // org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator
        public void declareResourceNeeded(Collection<ResourceDeclaration> collection) {
            ActiveResourceManager.this.validateRunsInMainThread();
            ActiveResourceManager.this.declareResourceNeeded(collection);
        }
    }

    public ActiveResourceManager(ResourceManagerDriver<WorkerType> resourceManagerDriver, Configuration configuration, RpcService rpcService, UUID uuid, ResourceID resourceID, HeartbeatServices heartbeatServices, DelegationTokenManager delegationTokenManager, SlotManager slotManager, ResourceManagerPartitionTrackerFactory resourceManagerPartitionTrackerFactory, BlocklistHandler.Factory factory, JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, ResourceManagerMetricGroup resourceManagerMetricGroup, ThresholdMeter thresholdMeter, Duration duration, Duration duration2, Duration duration3, Executor executor, DeploymentManager deploymentManager) {
        super(rpcService, uuid, resourceID, heartbeatServices, delegationTokenManager, slotManager, resourceManagerPartitionTrackerFactory, factory, jobLeaderIdService, clusterInformation, fatalErrorHandler, resourceManagerMetricGroup, Time.fromDuration((Duration) ((Configuration) Preconditions.checkNotNull(configuration)).get(AkkaOptions.ASK_TIMEOUT_DURATION)), executor, deploymentManager);
        this.delayedMasterServiceRestart = false;
        this.flinkConfig = configuration;
        this.resourceManagerDriver = resourceManagerDriver;
        this.workerNodeMap = new HashMap();
        this.pendingWorkerCounter = new WorkerCounter();
        this.totalWorkerCounter = new WorkerCounter();
        this.workerResourceSpecs = new HashMap();
        this.unallocatedWorkerFutures = new HashMap();
        this.currentAttemptUnregisteredWorkers = new HashSet();
        this.previousAttemptUnregisteredWorkers = new HashSet();
        this.startWorkerFailureRater = (ThresholdMeter) Preconditions.checkNotNull(thresholdMeter);
        this.startWorkerRetryInterval = duration;
        this.workerRegistrationTimeout = duration2;
        this.startWorkerCoolDown = FutureUtils.completedVoidFuture();
        this.previousWorkerRecoverTimeout = duration3;
        this.readyToServeFuture = new CompletableFuture<>();
        this.resourceDeclarations = new HashSet();
        this.workerRegistrationListeners = new HashMap();
        this.minTMNum = ((Integer) configuration.get(ClusterOptions.TASK_MANAGER_MIN_NUM)).intValue();
        Preconditions.checkArgument(this.minTMNum >= 1, String.format("'%s' should not be configured less than one.", ClusterOptions.TASK_MANAGER_MIN_NUM.key()));
        this.maxTMNum = ((Integer) configuration.get(ClusterOptions.TASK_MANAGER_MAX_NUM)).intValue();
        Preconditions.checkArgument(this.maxTMNum >= this.minTMNum, String.format("'%s' should not be configured less than '%s'.", ClusterOptions.TASK_MANAGER_MAX_NUM.key(), ClusterOptions.TASK_MANAGER_MIN_NUM.key()));
        this.targetTMNum = this.maxTMNum;
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected void initialize() throws ResourceManagerException {
        try {
            ResourceManagerDriver<WorkerType> resourceManagerDriver = this.resourceManagerDriver;
            GatewayMainThreadExecutor gatewayMainThreadExecutor = new GatewayMainThreadExecutor();
            Executor executor = this.ioExecutor;
            BlocklistHandler blocklistHandler = this.blocklistHandler;
            blocklistHandler.getClass();
            resourceManagerDriver.initialize(this, gatewayMainThreadExecutor, executor, blocklistHandler::getAllBlockedNodeIds);
        } catch (Exception e) {
            throw new ResourceManagerException("Cannot initialize resource provider.", e);
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected void terminate() throws ResourceManagerException {
        try {
            this.resourceManagerDriver.terminate();
        } catch (Exception e) {
            throw new ResourceManagerException("Cannot terminate resource provider.", e);
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected void internalDeregisterApplication(ApplicationStatus applicationStatus, @Nullable String str) throws ResourceManagerException {
        try {
            this.resourceManagerDriver.deregisterApplication(applicationStatus, str);
        } catch (Exception e) {
            throw new ResourceManagerException("Cannot deregister application.", e);
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected Optional<WorkerType> getWorkerNodeIfAcceptRegistration(ResourceID resourceID) {
        return Optional.ofNullable(this.workerNodeMap.get(resourceID));
    }

    @VisibleForTesting
    public void declareResourceNeeded(Collection<ResourceDeclaration> collection) {
        this.resourceDeclarations = Collections.unmodifiableCollection(collection);
        this.log.debug("Update resource declarations to {}.", collection);
        checkResourceDeclarations();
    }

    public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
        requestNewWorker(workerResourceSpec, false);
        return true;
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected int reserveJobManagers(int i) {
        return this.resourceManagerDriver.reserveJobManagers(i);
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected int reserveTaskManagers(int i) {
        return this.resourceManagerDriver.updateResourceReservation(i);
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected void clearResourceReservation() {
        this.resourceManagerDriver.clearResourceReservation();
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected int createWorkersSynchronously(int i) {
        this.log.info("Creating {} workers.", Integer.valueOf(i));
        WorkerResourceSpec workerResourceSpec = getWorkerResourceSpec();
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(requestNewWorker(workerResourceSpec, true));
        }
        this.log.info("Registered {} event(s).", Integer.valueOf(arrayList.size()));
        int i3 = 0;
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                i3 += ((Integer) ((CompletableFuture) it.next()).get()).intValue();
            } catch (Exception e) {
                this.log.info("Captured exception: {}", e.getMessage());
            }
        }
        this.log.info("Created {} out of {} workers.", Integer.valueOf(i3), Integer.valueOf(i));
        return i3;
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected int requestTaskManagers(int i) {
        return getNumberOfRequiredWorkers();
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected void onWorkerRegistered(WorkerType workertype, WorkerResourceSpec workerResourceSpec) {
        ResourceID resourceID = workertype.getResourceID();
        this.log.info("Worker {} is registered.", resourceID.getStringWithMetadata());
        tryRemovePreviousPendingRecoveryTaskManager(resourceID);
        if (!this.workerResourceSpecs.containsKey(workertype.getResourceID())) {
            this.workerResourceSpecs.put(workertype.getResourceID(), workerResourceSpec);
            this.totalWorkerCounter.increaseAndGet(workerResourceSpec);
            this.log.info("Recovered worker {} with resource spec {} registered", resourceID.getStringWithMetadata(), workerResourceSpec);
        }
        if (this.currentAttemptUnregisteredWorkers.remove(resourceID)) {
            int decreaseAndGet = this.pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
            if (this.workerRegistrationListeners.containsKey(resourceID)) {
                this.workerRegistrationListeners.remove(resourceID).complete(1);
            }
            this.log.info("Worker {} with resource spec {} was requested in current attempt. Current pending count after registering: {}.", new Object[]{resourceID.getStringWithMetadata(), workerResourceSpec, Integer.valueOf(decreaseAndGet)});
        }
        checkAndStopBlockedMasters();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    public void registerMetrics() {
        super.registerMetrics();
        this.resourceManagerMetricGroup.meter(MetricNames.START_WORKER_FAILURE_RATE, this.startWorkerFailureRater);
        ResourceManagerMetricGroup resourceManagerMetricGroup = this.resourceManagerMetricGroup;
        WorkerCounter workerCounter = this.pendingWorkerCounter;
        workerCounter.getClass();
        resourceManagerMetricGroup.gauge(MetricNames.NUM_PENDING_TASK_MANAGERS, workerCounter::getTotalNum);
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceEventHandler
    public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> collection) {
        getMainThreadExecutor().assertRunningInMainThread();
        this.log.info("Recovered {} workers from previous attempt.", Integer.valueOf(collection.size()));
        for (WorkerType workertype : collection) {
            ResourceID resourceID = workertype.getResourceID();
            this.workerNodeMap.put(resourceID, workertype);
            this.previousAttemptUnregisteredWorkers.add(resourceID);
            scheduleWorkerRegistrationTimeoutCheck(resourceID);
            this.log.info("Worker {} recovered from previous attempt.", resourceID.getStringWithMetadata());
        }
        if (collection.size() <= 0 || this.previousWorkerRecoverTimeout.isZero()) {
            this.readyToServeFuture.complete(null);
        } else {
            scheduleRunAsync(() -> {
                this.readyToServeFuture.complete(null);
                this.log.info("Timeout to wait recovery taskmanagers, recovery future is completed.");
            }, this.previousWorkerRecoverTimeout.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceEventHandler
    public void onWorkerTerminated(ResourceID resourceID, String str) {
        if (this.currentAttemptUnregisteredWorkers.contains(resourceID)) {
            recordWorkerFailureAndPauseWorkerCreationIfNeeded();
        }
        if (clearStateForWorker(resourceID)) {
            this.log.info("Worker {} is terminated. Diagnostics: {}", resourceID.getStringWithMetadata(), str);
            CompletableFuture<Integer> remove = this.workerRegistrationListeners.remove(resourceID);
            if (remove != null) {
                remove.complete(0);
            } else {
                checkResourceDeclarations();
            }
        }
        closeTaskManagerConnection(resourceID, new Exception(str));
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceEventHandler
    public void releaseBlockedNodes(Collection<String> collection) {
        this.blocklistHandler.unblockHeavyNodes();
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceEventHandler
    public void addBlockedNodes(Collection<String> collection) {
        this.blocklistHandler.addNewBlockedNodes((Collection) collection.stream().map(str -> {
            return BlockedResource.heavilyBlockNode(str, "Node was blocked by other services");
        }).collect(Collectors.toList()));
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceEventHandler
    public void onMasterBlocked(Collection<ResourceID> collection) {
        if (collection.isEmpty()) {
            return;
        }
        this.delayedMasterServiceRestart = true;
        checkAndStopBlockedMasters();
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceEventHandler
    public void onError(Throwable th) {
        onFatalError(th);
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceEventHandler
    public boolean callForcedRedeploying(Integer num, Collection<ResourceID> collection) {
        if (num == null && collection.size() == 0 && !isAffectedByBlocklist()) {
            this.log.info("Nothing changed, skip rescaling");
            return false;
        }
        if (num != null) {
            this.targetTMNum = num.intValue();
        }
        this.log.info("Calling rescaling with taskManager num in {}-{} and blocked JMs {}/{}", new Object[]{Integer.valueOf(this.minTMNum), num, Integer.valueOf(collection.size()), Integer.valueOf(getNumberOfJobManagers())});
        boolean clusterRedeploying = clusterRedeploying(collection);
        if (clusterRedeploying) {
            onMasterBlocked(collection);
        }
        clearResourceReservation();
        return clusterRedeploying;
    }

    private void checkResourceDeclarations() {
        validateRunsInMainThread();
        for (ResourceDeclaration resourceDeclaration : this.resourceDeclarations) {
            WorkerResourceSpec spec = resourceDeclaration.getSpec();
            int numNeeded = resourceDeclaration.getNumNeeded();
            int num = this.totalWorkerCounter.getNum(spec) - numNeeded;
            if (num > 0) {
                this.log.info("need release {} workers, current worker number {}, declared worker number {}", new Object[]{Integer.valueOf(num), Integer.valueOf(this.totalWorkerCounter.getNum(spec)), Integer.valueOf(numNeeded)});
                int releaseUnWantedResources = releaseUnWantedResources(resourceDeclaration.getUnwantedWorkers(), num);
                if (releaseUnWantedResources > 0) {
                    releaseUnWantedResources = releaseUnallocatedWorkers(spec, releaseUnWantedResources);
                }
                if (releaseUnWantedResources > 0) {
                    releaseUnWantedResources = releaseAllocatedWorkers(this.currentAttemptUnregisteredWorkers, spec, releaseUnWantedResources);
                }
                if (releaseUnWantedResources > 0) {
                    releaseUnWantedResources = releaseAllocatedWorkers(this.workerNodeMap.keySet(), spec, releaseUnWantedResources);
                }
                Preconditions.checkState(releaseUnWantedResources == 0, "there are no more workers to release");
            } else if (num >= 0) {
                this.log.debug("current worker number {} meets the declared worker {}", Integer.valueOf(this.totalWorkerCounter.getNum(spec)), Integer.valueOf(numNeeded));
            } else if (this.startWorkerCoolDown.isDone()) {
                int i = -num;
                this.log.info("need request {} new workers, current worker number {}, declared worker number {}", new Object[]{Integer.valueOf(i), Integer.valueOf(this.totalWorkerCounter.getNum(spec)), Integer.valueOf(numNeeded)});
                for (int i2 = 0; i2 < i; i2++) {
                    requestNewWorker(spec, false);
                }
            } else {
                this.startWorkerCoolDown.thenRun(this::checkResourceDeclarations);
            }
        }
    }

    private int releaseUnWantedResources(Collection<InstanceID> collection, int i) {
        FlinkExpectedException flinkExpectedException = new FlinkExpectedException("slot manager has determined that the resource is no longer needed");
        for (InstanceID instanceID : collection) {
            if (i <= 0) {
                break;
            }
            if (releaseResource(instanceID, (Exception) flinkExpectedException)) {
                i--;
            }
        }
        return i;
    }

    private int releaseUnallocatedWorkers(WorkerResourceSpec workerResourceSpec, int i) {
        for (CompletableFuture completableFuture : (Set) this.unallocatedWorkerFutures.entrySet().stream().filter(entry -> {
            return ((WorkerResourceSpec) entry.getValue()).equals(workerResourceSpec);
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toSet())) {
            if (i <= 0) {
                break;
            }
            if (completableFuture.cancel(true)) {
                i--;
            }
        }
        return i;
    }

    private int releaseAllocatedWorkers(Collection<ResourceID> collection, WorkerResourceSpec workerResourceSpec, int i) {
        List<ResourceID> list = (List) collection.stream().filter(resourceID -> {
            return workerResourceSpec.equals(this.workerResourceSpecs.get(resourceID));
        }).collect(Collectors.toList());
        FlinkExpectedException flinkExpectedException = new FlinkExpectedException("resource is no longer needed");
        for (ResourceID resourceID2 : list) {
            if (i <= 0) {
                break;
            }
            if (releaseResource(resourceID2, (Exception) flinkExpectedException)) {
                i--;
            } else {
                this.log.warn("Resource {} could not release.", resourceID2);
            }
        }
        return i;
    }

    private boolean releaseResource(InstanceID instanceID, Exception exc) {
        WorkerType workerByInstanceId = getWorkerByInstanceId(instanceID);
        if (workerByInstanceId != null) {
            return releaseResource(workerByInstanceId.getResourceID(), exc);
        }
        this.log.debug("Instance {} not found in ResourceManager.", instanceID);
        return false;
    }

    private boolean releaseResource(ResourceID resourceID, Exception exc) {
        if (!this.workerNodeMap.containsKey(resourceID)) {
            return false;
        }
        internalStopWorker(resourceID);
        closeTaskManagerConnection(resourceID, exc);
        return true;
    }

    @VisibleForTesting
    public CompletableFuture<Integer> requestNewWorker(WorkerResourceSpec workerResourceSpec, boolean z) {
        TaskExecutorProcessSpec processSpecFromWorkerResourceSpec = TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(this.flinkConfig, workerResourceSpec);
        int increaseAndGet = this.pendingWorkerCounter.increaseAndGet(workerResourceSpec);
        this.totalWorkerCounter.increaseAndGet(workerResourceSpec);
        this.log.info("Requesting new worker with resource spec {}, current pending count: {}.", workerResourceSpec, Integer.valueOf(increaseAndGet));
        CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
        CompletableFuture<WorkerType> requestResource = this.resourceManagerDriver.requestResource(processSpecFromWorkerResourceSpec);
        this.unallocatedWorkerFutures.put(requestResource, workerResourceSpec);
        FutureUtils.assertNoException(requestResource.handle((resourceIDRetrievable, th) -> {
            this.unallocatedWorkerFutures.remove(requestResource);
            if (th == null) {
                ResourceID resourceID = resourceIDRetrievable.getResourceID();
                this.workerNodeMap.put(resourceID, resourceIDRetrievable);
                this.currentAttemptUnregisteredWorkers.add(resourceID);
                if (z) {
                    this.workerRegistrationListeners.put(resourceID, completableFuture);
                }
                this.workerResourceSpecs.put(resourceID, workerResourceSpec);
                this.currentAttemptUnregisteredWorkers.add(resourceID);
                scheduleWorkerRegistrationTimeoutCheck(resourceID);
                this.log.info("Requested worker {} with resource spec {}.", resourceID.getStringWithMetadata(), workerResourceSpec);
                return null;
            }
            int decreaseAndGet = this.pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
            this.totalWorkerCounter.decreaseAndGet(workerResourceSpec);
            if (th instanceof CancellationException) {
                this.log.info("Requesting worker with resource spec {} canceled, current pending count: {}", workerResourceSpec, Integer.valueOf(decreaseAndGet));
                return null;
            }
            this.log.warn("Failed requesting worker with resource spec {}, current pending count: {}", new Object[]{workerResourceSpec, Integer.valueOf(decreaseAndGet), th});
            if (!z) {
                recordWorkerFailureAndPauseWorkerCreationIfNeeded();
                checkResourceDeclarations();
            }
            completableFuture.complete(0);
            return null;
        }));
        if (!z) {
            completableFuture.complete(0);
        }
        return completableFuture;
    }

    private void scheduleWorkerRegistrationTimeoutCheck(ResourceID resourceID) {
        scheduleRunAsync(() -> {
            if (this.currentAttemptUnregisteredWorkers.contains(resourceID) || this.previousAttemptUnregisteredWorkers.contains(resourceID)) {
                this.log.warn("Worker {} did not register in {}, will stop it and request a new one if needed.", resourceID, this.workerRegistrationTimeout);
                internalStopWorker(resourceID);
                checkResourceDeclarations();
                CompletableFuture<Integer> remove = this.workerRegistrationListeners.remove(resourceID);
                if (remove != null) {
                    this.log.info("Worker {} created synchronously, no requesting needed", resourceID);
                    remove.complete(0);
                } else {
                    checkResourceDeclarations();
                }
                checkAndStopBlockedMasters();
            }
        }, this.workerRegistrationTimeout);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void internalStopWorker(ResourceID resourceID) {
        this.log.info("Stopping worker {}.", resourceID.getStringWithMetadata());
        WorkerType workertype = this.workerNodeMap.get(resourceID);
        if (workertype != null) {
            this.resourceManagerDriver.releaseResource(workertype);
        }
        clearStateForWorker(resourceID);
    }

    private boolean clearStateForWorker(ResourceID resourceID) {
        if (this.workerNodeMap.remove(resourceID) == null) {
            this.log.debug("Ignore unrecognized worker {}.", resourceID.getStringWithMetadata());
            return false;
        }
        WorkerResourceSpec remove = this.workerResourceSpecs.remove(resourceID);
        tryRemovePreviousPendingRecoveryTaskManager(resourceID);
        if (remove == null) {
            return true;
        }
        this.totalWorkerCounter.decreaseAndGet(remove);
        if (!this.currentAttemptUnregisteredWorkers.remove(resourceID)) {
            return true;
        }
        this.log.info("Worker {} with resource spec {} was requested in current attempt and has not registered. Current pending count after removing: {}.", new Object[]{resourceID.getStringWithMetadata(), remove, Integer.valueOf(this.pendingWorkerCounter.decreaseAndGet(remove))});
        return true;
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected boolean canManageWorkerNumber() {
        return true;
    }

    private void checkAndStopBlockedMasters() {
        if (!this.delayedMasterServiceRestart) {
            this.log.info("There are no delayed masters to restart");
            return;
        }
        this.log.info("Attempting to change the Master Service affinity. The Service will be restarted");
        this.resourceManagerDriver.releaseOneMasterReservation();
        this.resourceManagerDriver.changeMasterServiceAffinity();
        this.delayedMasterServiceRestart = false;
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager, org.apache.flink.runtime.resourcemanager.ResourceManagerGateway
    public long getMaximumExtraSlots() {
        return this.resourceManagerDriver.getMaximumExtraSlots();
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected int getWorkersUpperLimit() {
        return Math.max(getNumberOfRequiredWorkers(), this.maxTMNum);
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected int requestWorkersLimit(int i) {
        Integer requestForScaleOut;
        if (this.targetTMNum < i && (requestForScaleOut = this.resourceManagerDriver.requestForScaleOut(i)) != null) {
            this.targetTMNum = requestForScaleOut.intValue();
        }
        return this.targetTMNum;
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManagerGateway
    public void notifyResourceUsage() {
        int i;
        try {
            i = getNumberOfRegisteredTaskManagers().get().intValue();
        } catch (Exception e) {
            i = 0;
        }
        this.resourceManagerDriver.notifyResourceChange(this.targetTMNum, i);
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected int getNumberOfAvailableWorkers() {
        return this.targetTMNum;
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected int getNumberOfRequiredWorkers() {
        return this.minTMNum;
    }

    private void recordWorkerFailureAndPauseWorkerCreationIfNeeded() {
        if (recordStartWorkerFailure()) {
            tryResetWorkerCreationCoolDown();
        }
    }

    private boolean recordStartWorkerFailure() {
        this.startWorkerFailureRater.markEvent();
        try {
            this.startWorkerFailureRater.checkAgainstThreshold();
            return false;
        } catch (ThresholdMeter.ThresholdExceedException e) {
            this.log.warn("Reaching max start worker failure rate: {}", e.getMessage());
            return true;
        }
    }

    private void tryResetWorkerCreationCoolDown() {
        if (this.startWorkerCoolDown.isDone()) {
            this.log.info("Will not retry creating worker in {}.", this.startWorkerRetryInterval);
            this.startWorkerCoolDown = new CompletableFuture<>();
            scheduleRunAsync(() -> {
                this.startWorkerCoolDown.complete(null);
            }, this.startWorkerRetryInterval);
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    public CompletableFuture<Void> getReadyToServeFuture() {
        return this.readyToServeFuture;
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected ResourceAllocator getResourceAllocator() {
        return new ResourceAllocatorImpl();
    }

    private void tryRemovePreviousPendingRecoveryTaskManager(ResourceID resourceID) {
        long size = this.previousAttemptUnregisteredWorkers.size();
        if (this.previousAttemptUnregisteredWorkers.remove(resourceID)) {
            Logger logger = this.log;
            Object[] objArr = new Object[3];
            objArr[0] = Long.valueOf(size);
            objArr[1] = Integer.valueOf(this.previousAttemptUnregisteredWorkers.size());
            objArr[2] = this.previousAttemptUnregisteredWorkers.size() == 0 ? " Resource manager is ready to serve." : "";
            logger.info("Pending recovery taskmanagers {} -> {}.{}", objArr);
        }
        if (this.previousAttemptUnregisteredWorkers.size() == 0) {
            this.readyToServeFuture.complete(null);
        }
    }

    @VisibleForTesting
    <T> CompletableFuture<T> runInMainThread(Callable<T> callable, Time time) {
        return callAsync(callable, TimeUtils.toDuration(time));
    }
}
