package org.apache.flink.kubernetes;

import java.io.File;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.availability.ResourceReservationProvider;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerDriverConfiguration;
import org.apache.flink.kubernetes.dli.NodeOperator;
import org.apache.flink.kubernetes.dli.QuotaOperator;
import org.apache.flink.kubernetes.dli.ResourceUpdateEvent;
import org.apache.flink.kubernetes.dli.ResourceUpdateTask;
import org.apache.flink.kubernetes.dli.ResourceUpdateTaskFactory;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator;
import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesResourceSpec;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.resourcemanager.ComputationUnitUtils;
import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.util.ResourceManagerUtils;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/kubernetes/KubernetesResourceManagerDriver.class */
public class KubernetesResourceManagerDriver extends AbstractResourceManagerDriver<KubernetesWorkerNode> {
    private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d-%s";
    private static final String TARGET_CU_NAME = "target-cu";
    private static final long INIT_QUEUE_EVENT_PROC_DELAY = 1000;
    private static final long QUEUE_EVENT_PROC_DELAY = 1000;
    private final String clusterId;
    private final String webInterfaceUrl;
    private final FlinkKubeClient flinkKubeClient;
    private final MemorySize jmMemorySize;
    private final MemorySize tmMemorySize;
    private final Map<String, CompletableFuture<KubernetesWorkerNode>> requestResourceFutures;
    private long currentMaxAttemptId;
    private long currentMaxPodId;
    private Optional<KubernetesWatch> podsWatchOpt;
    private final String affinityConfigMapName;
    private Optional<KubernetesSharedWatcher.Watch> configmapWatchOpt;
    private volatile boolean running;
    private FlinkPod taskManagerPodTemplate;
    private ResourceReservationProvider resourceReservationProvider;
    private final double jmCpuRequired;
    private final BigDecimal jmMemoryRequired;
    private final double tmCpuRequired;
    private final BigDecimal tmMemoryRequired;
    private List<String> blacklistedNodes;
    private final KubernetesConfigOptions.BlacklistMode blacklistMode;
    private ResourceUpdateTask resourceUpdateTask;
    private final Deque<ResourceUpdateEvent> resourceUpdateEventQueue;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/kubernetes/KubernetesResourceManagerDriver$ConfigMapCallbackHandlerImpl.class */
    public class ConfigMapCallbackHandlerImpl implements FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap> {
        private ConfigMapCallbackHandlerImpl() {
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void onAdded(List<KubernetesConfigMap> list) {
            KubernetesResourceManagerDriver.this.log.info("Configmap {} was created", KubernetesResourceManagerDriver.this.affinityConfigMapName);
            KubernetesResourceManagerDriver.this.resourceUpdateEventQueue.addFirst(ResourceUpdateEvent.createBlacklistEvent(KubernetesUtils.getBlacklistedNodes(KubernetesUtils.checkConfigMaps(list, KubernetesResourceManagerDriver.this.affinityConfigMapName))));
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void onModified(List<KubernetesConfigMap> list) {
            KubernetesResourceManagerDriver.this.log.info("Configmap {} was modified", KubernetesResourceManagerDriver.this.affinityConfigMapName);
            KubernetesResourceManagerDriver.this.resourceUpdateEventQueue.addFirst(ResourceUpdateEvent.createBlacklistEvent(KubernetesUtils.getBlacklistedNodes(KubernetesUtils.checkConfigMaps(list, KubernetesResourceManagerDriver.this.affinityConfigMapName))));
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void onDeleted(List<KubernetesConfigMap> list) {
            KubernetesResourceManagerDriver.this.log.info("Configmap {} was deleted", KubernetesResourceManagerDriver.this.affinityConfigMapName);
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void onError(List<KubernetesConfigMap> list) {
            KubernetesResourceManagerDriver.this.log.error("Error while watching the ConfigMap " + KubernetesResourceManagerDriver.this.affinityConfigMapName);
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void handleError(Throwable th) {
            if (th instanceof KubernetesTooOldResourceVersionException) {
                KubernetesResourceManagerDriver.this.getMainThreadExecutor().execute(() -> {
                    if (KubernetesResourceManagerDriver.this.running) {
                        KubernetesResourceManagerDriver.this.configmapWatchOpt.ifPresent((v0) -> {
                            v0.close();
                        });
                        KubernetesResourceManagerDriver.this.log.info("Creating a new watch on affinity ConfigMap.");
                        KubernetesResourceManagerDriver.this.configmapWatchOpt = KubernetesResourceManagerDriver.this.watchAffinityConfigMap();
                    }
                });
            } else {
                KubernetesResourceManagerDriver.this.getResourceEventHandler().onError(th);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/kubernetes/KubernetesResourceManagerDriver$DliNodesCallbackHandlerImpl.class */
    private class DliNodesCallbackHandlerImpl implements NodeOperator {
        private DliNodesCallbackHandlerImpl() {
        }

        @Override // org.apache.flink.kubernetes.dli.NodeOperator
        public void decommissioningNode(List<String> list) {
            KubernetesResourceManagerDriver.this.log.info("Blacklist event {} was received", list);
            KubernetesResourceManagerDriver.this.resourceUpdateEventQueue.addFirst(ResourceUpdateEvent.createBlacklistEvent(list));
        }
    }

    /* loaded from: input_file:org/apache/flink/kubernetes/KubernetesResourceManagerDriver$DliQuotaCallbackHandlerImpl.class */
    private class DliQuotaCallbackHandlerImpl implements QuotaOperator {
        private DliQuotaCallbackHandlerImpl() {
        }

        @Override // org.apache.flink.kubernetes.dli.QuotaOperator
        public void updateQuota(int i) {
            KubernetesResourceManagerDriver.this.log.info("Quota event {} was received", Integer.valueOf(i));
            KubernetesResourceManagerDriver.this.resourceUpdateEventQueue.addFirst(ResourceUpdateEvent.createQuotaEvent(Integer.valueOf(i)));
        }

        @Override // org.apache.flink.kubernetes.dli.QuotaOperator
        public int getTargetCU() {
            return KubernetesResourceManagerDriver.this.getTargetCUFromFlinkConfigMap().intValue();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/kubernetes/KubernetesResourceManagerDriver$PodCallbackHandlerImpl.class */
    public class PodCallbackHandlerImpl implements FlinkKubeClient.WatchCallbackHandler<KubernetesPod> {
        private PodCallbackHandlerImpl() {
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void onAdded(List<KubernetesPod> list) {
            KubernetesResourceManagerDriver.this.handlePodEventsInMainThread(list);
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void onModified(List<KubernetesPod> list) {
            KubernetesResourceManagerDriver.this.handlePodEventsInMainThread(list);
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void onDeleted(List<KubernetesPod> list) {
            KubernetesResourceManagerDriver.this.handlePodEventsInMainThread(list);
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void onError(List<KubernetesPod> list) {
            KubernetesResourceManagerDriver.this.handlePodEventsInMainThread(list);
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void handleError(Throwable th) {
            if (th instanceof KubernetesTooOldResourceVersionException) {
                KubernetesResourceManagerDriver.this.getMainThreadExecutor().execute(() -> {
                    if (KubernetesResourceManagerDriver.this.running) {
                        KubernetesResourceManagerDriver.this.podsWatchOpt.ifPresent((v0) -> {
                            v0.close();
                        });
                        KubernetesResourceManagerDriver.this.log.info("Creating a new watch on TaskManager pods.");
                        try {
                            KubernetesResourceManagerDriver.this.podsWatchOpt = KubernetesResourceManagerDriver.this.watchTaskManagerPods();
                        } catch (Exception e) {
                            KubernetesResourceManagerDriver.this.getResourceEventHandler().onError(e);
                        }
                    }
                });
            } else {
                KubernetesResourceManagerDriver.this.getResourceEventHandler().onError(th);
            }
        }
    }

    public KubernetesResourceManagerDriver(Configuration configuration, FlinkKubeClient flinkKubeClient, KubernetesResourceManagerDriverConfiguration kubernetesResourceManagerDriverConfiguration) {
        super(configuration, GlobalConfiguration.loadConfiguration());
        this.currentMaxAttemptId = 0L;
        this.currentMaxPodId = 0L;
        this.clusterId = (String) Preconditions.checkNotNull(kubernetesResourceManagerDriverConfiguration.getClusterId());
        this.webInterfaceUrl = kubernetesResourceManagerDriverConfiguration.getWebInterfaceUrl();
        this.flinkKubeClient = (FlinkKubeClient) Preconditions.checkNotNull(flinkKubeClient);
        this.requestResourceFutures = new HashMap();
        this.affinityConfigMapName = configuration.getString(KubernetesConfigOptions.AFFINITY_CONFIGMAP_NAME);
        this.running = false;
        this.jmMemorySize = (MemorySize) Preconditions.checkNotNull(kubernetesResourceManagerDriverConfiguration.getJmMemorySize());
        this.tmMemorySize = (MemorySize) Preconditions.checkNotNull(kubernetesResourceManagerDriverConfiguration.getTmMemorySize());
        this.jmCpuRequired = configuration.getDouble(KubernetesConfigOptions.JOB_MANAGER_CPU);
        this.jmMemoryRequired = BigDecimal.valueOf(this.jmMemorySize.multiply(configuration.getDouble(KubernetesConfigOptions.JOB_MANAGER_CPU_LIMIT_FACTOR)).getBytes());
        this.tmCpuRequired = configuration.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU);
        this.tmMemoryRequired = BigDecimal.valueOf(this.tmMemorySize.multiply(configuration.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU_LIMIT_FACTOR)).getBytes());
        ComputationUnitUtils.init(configuration.getDouble(ClusterOptions.CU_CPU), BigDecimal.valueOf(((MemorySize) configuration.get(ClusterOptions.CU_MEMORY)).getBytes()), this.tmCpuRequired, this.tmMemoryRequired, this.jmCpuRequired, this.jmMemoryRequired);
        this.blacklistMode = KubernetesConfigOptions.BlacklistMode.valueOf((String) configuration.get(KubernetesConfigOptions.KUBERNETES_BLACKLIST_NODE_MODE));
        if (KubernetesConfigOptions.BlacklistMode.NodeOperator.equals(this.blacklistMode)) {
            this.blacklistedNodes = new CopyOnWriteArrayList();
            this.resourceUpdateTask = ResourceUpdateTaskFactory.create(configuration);
        }
        this.resourceUpdateEventQueue = new ConcurrentLinkedDeque();
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver
    protected void initializeInternal() throws Exception {
        this.resourceReservationProvider = new ResourceReservationProvider(this.flinkConfig, this.flinkKubeClient);
        this.resourceReservationProvider.tryReleaseAllResources();
        this.podsWatchOpt = watchTaskManagerPods();
        File taskManagerPodTemplateFileInPod = KubernetesUtils.getTaskManagerPodTemplateFileInPod();
        if (taskManagerPodTemplateFileInPod.exists()) {
            this.taskManagerPodTemplate = KubernetesUtils.loadPodFromTemplateFile(this.flinkKubeClient, taskManagerPodTemplateFileInPod, (String) this.flinkConfig.get(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE_MAIN_CONTAINER));
        } else {
            this.taskManagerPodTemplate = new FlinkPod.Builder().build();
        }
        updateKubernetesServiceTargetPortIfNecessary();
        switch (this.blacklistMode) {
            case ConfigMap:
                this.configmapWatchOpt = watchAffinityConfigMap();
                break;
            case NodeOperator:
                this.blacklistedNodes.addAll(this.flinkKubeClient.getBlacklistedNodes());
                this.resourceUpdateTask.start(new DliNodesCallbackHandlerImpl(), new DliQuotaCallbackHandlerImpl(), this.flinkConfig, getCurrentCU().intValue());
                break;
            default:
                throw new IllegalStateException(String.format("Blacklist Mode should be %s or %s", KubernetesConfigOptions.BlacklistMode.ConfigMap, KubernetesConfigOptions.BlacklistMode.NodeOperator));
        }
        recoverWorkerNodesFromPreviousAttempts();
        getMainThreadExecutor().scheduleWithFixedDelay(this::adaptPodsForCurrentResources, 1000L, 1000L, TimeUnit.MILLISECONDS);
        this.running = true;
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver
    public void terminate() throws Exception {
        if (this.running) {
            this.running = false;
            Exception exc = null;
            try {
                this.podsWatchOpt.ifPresent((v0) -> {
                    v0.close();
                });
            } catch (Exception e) {
                exc = e;
            }
            try {
                this.flinkKubeClient.close();
            } catch (Exception e2) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
            }
            if (exc != null) {
                throw exc;
            }
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver
    public void deregisterApplication(ApplicationStatus applicationStatus, @Nullable String str) {
        this.log.info("Deregistering Flink Kubernetes cluster, clusterId: {}, diagnostics: {}", this.clusterId, str == null ? "" : str);
        this.flinkKubeClient.stopAndCleanupCluster(this.clusterId);
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver
    public CompletableFuture<KubernetesWorkerNode> requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
        KubernetesTaskManagerParameters createKubernetesTaskManagerParameters = createKubernetesTaskManagerParameters(taskExecutorProcessSpec);
        KubernetesPod buildTaskManagerKubernetesPod = KubernetesTaskManagerFactory.buildTaskManagerKubernetesPod(this.taskManagerPodTemplate, createKubernetesTaskManagerParameters, KubernetesUtils.getAvailableNodes(this.flinkKubeClient), getCurrentBlacklistedNodes(() -> {
            return KubernetesUtils.getBlacklistedNodes(this.flinkKubeClient, createKubernetesTaskManagerParameters);
        }));
        String name = buildTaskManagerKubernetesPod.getName();
        CompletableFuture<KubernetesWorkerNode> completableFuture = new CompletableFuture<>();
        this.requestResourceFutures.put(name, completableFuture);
        this.log.info("Creating new TaskManager pod with name {} and resource <{},{}>.", name, Integer.valueOf(createKubernetesTaskManagerParameters.getTaskManagerMemoryMB()), Double.valueOf(createKubernetesTaskManagerParameters.getTaskManagerCPU()));
        FutureUtils.assertNoException(this.flinkKubeClient.createTaskManagerPod(buildTaskManagerKubernetesPod).handleAsync((r8, th) -> {
            if (th == null) {
                this.log.info("Pod {} is created.", name);
                return null;
            }
            this.log.warn("Could not create pod {}, exception: {}", name, th);
            CompletableFuture<KubernetesWorkerNode> remove = this.requestResourceFutures.remove(buildTaskManagerKubernetesPod.getName());
            if (remove == null) {
                return null;
            }
            remove.completeExceptionally(th);
            return null;
        }, (Executor) getMainThreadExecutor()));
        return completableFuture;
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver
    public void releaseResource(KubernetesWorkerNode kubernetesWorkerNode) {
        String resourceID = kubernetesWorkerNode.getResourceID().toString();
        this.log.info("Stopping TaskManager pod {}.", resourceID);
        stopPod(resourceID);
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver, org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver
    public boolean checkResourceAvailabilityForSlots(int i) {
        int integer = this.flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
        int ceil = (int) Math.ceil(i / integer);
        if (ceil <= 0) {
            return true;
        }
        this.log.info("TaskManagers to create:" + ceil);
        return calcServicesPool(0, ceil, false) * ((long) integer) >= ((long) i);
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver
    public long getMaximumExtraSlots() {
        return calcServicesPool(0, Integer.MAX_VALUE, false) * this.flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver, org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver
    public void changeMasterServiceAffinity() {
        this.flinkKubeClient.changeJobManagerComponentAffinity(KubernetesUtils.getAvailableNodes(this.flinkKubeClient), getCurrentBlacklistedNodes(() -> {
            return KubernetesUtils.getBlacklistedNodes(this.flinkKubeClient, this.affinityConfigMapName);
        }));
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver
    public int reserveJobManagers(int i) {
        this.log.info("Reserving {} JobManagers", Integer.valueOf(i));
        return (int) calcServicesPool(i, 0, true);
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver
    public int updateResourceReservation(int i) {
        this.log.info("Reserving {} TaskManagers", Integer.valueOf(i));
        return (int) calcServicesPool(0, i, true);
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver, org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver
    public void releaseOneWorkerReservation() {
        this.resourceReservationProvider.tryReleaseResources(new KubernetesResourceSpec(this.tmCpuRequired, this.tmMemoryRequired));
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver, org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver
    public void releaseOneMasterReservation() {
        this.resourceReservationProvider.tryReleaseResources(new KubernetesResourceSpec(this.jmCpuRequired, this.jmMemoryRequired));
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver
    public void clearResourceReservation() {
        this.resourceReservationProvider.tryReleaseAllResources();
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver
    public void notifyResourceChange(int i, int i2) {
        if (KubernetesConfigOptions.BlacklistMode.NodeOperator.equals(this.blacklistMode)) {
            this.resourceUpdateTask.reportApplicationInfo(this.flinkConfig, i2, i);
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver
    public Integer requestForScaleOut(int i) {
        Integer num = null;
        if (KubernetesConfigOptions.BlacklistMode.NodeOperator.equals(this.blacklistMode)) {
            num = this.resourceUpdateTask.applyScaleOut(i);
            updateTargetCUInFlinkConfigMap(num);
        }
        return num;
    }

    private long calcServicesPool(int i, int i2, boolean z) {
        this.log.info("CPU needed to create 1 JM:" + this.jmCpuRequired);
        this.log.info("Memory needed to create 1 JM: {} (bytes)", Long.valueOf(this.jmMemoryRequired.longValue()));
        this.log.info("CPU needed to create 1 TM:" + this.tmCpuRequired);
        this.log.info("Memory needed to create 1 TM: {} (bytes)", Long.valueOf(this.tmMemoryRequired.longValue()));
        this.log.info("Count of JM to be created:" + i);
        this.log.info("Count of TM to be created:" + i2);
        int i3 = 0;
        for (Map.Entry<String, KubernetesResourceSpec> entry : this.flinkKubeClient.getClusterResourcesInfo(getCurrentBlacklistedNodes(() -> {
            return KubernetesUtils.getBlacklistedNodes(this.flinkKubeClient, this.affinityConfigMapName);
        })).entrySet()) {
            String key = entry.getKey();
            KubernetesResourceSpec value = entry.getValue();
            this.log.info("Trying to check resources and place pods on node {}", key);
            this.log.info("Node free resources: {}", value);
            KubernetesResourceSpec copy = value.copy();
            int placeServicesOnNode = placeServicesOnNode(key, "TaskManager", i2, new KubernetesResourceSpec(this.tmCpuRequired, this.tmMemoryRequired), value, copy, z);
            int placeServicesOnNode2 = placeServicesOnNode(key, "JobManager", i, new KubernetesResourceSpec(this.jmCpuRequired, this.jmMemoryRequired), value, copy, z);
            i3 = i3 + (i2 - placeServicesOnNode) + (i - placeServicesOnNode2);
            i2 = placeServicesOnNode;
            i = placeServicesOnNode2;
            this.log.info("{} TaskManagers might be created additionally for this node", Long.valueOf(calculatePodsCapacityForResources(value, this.tmCpuRequired, this.tmMemoryRequired)));
        }
        return i3;
    }

    private int placeServicesOnNode(String str, String str2, int i, KubernetesResourceSpec kubernetesResourceSpec, KubernetesResourceSpec kubernetesResourceSpec2, KubernetesResourceSpec kubernetesResourceSpec3, boolean z) {
        this.log.info("Trying to place {} pods of {} on node {} with current free resources = {} and original free resources = {}. The required resources for every pod: {}", Integer.valueOf(i), str2, str, kubernetesResourceSpec2, kubernetesResourceSpec3, kubernetesResourceSpec);
        int i2 = 0;
        while (kubernetesResourceSpec2.suitableFor(kubernetesResourceSpec) && i > 0) {
            if (z) {
                if (this.resourceReservationProvider.tryOccupyResources(str, kubernetesResourceSpec, kubernetesResourceSpec3)) {
                    this.log.info("Resources for 1 {} ({}) is reserved on the node {}", str2, kubernetesResourceSpec, str);
                } else {
                    kubernetesResourceSpec2.subtract(kubernetesResourceSpec);
                }
            }
            kubernetesResourceSpec2.subtract(kubernetesResourceSpec);
            i--;
            i2++;
        }
        this.log.info("{} pods for {} should be placed on this node", Integer.valueOf(i2), str2);
        return i;
    }

    private long calculatePodsCapacityForResources(KubernetesResourceSpec kubernetesResourceSpec, double d, BigDecimal bigDecimal) {
        BigDecimal memory = kubernetesResourceSpec.getMemory();
        double cpu = kubernetesResourceSpec.getCpu();
        long j = 0;
        long j2 = 0;
        if (memory.compareTo(bigDecimal) >= 0) {
            j = memory.divide(bigDecimal, RoundingMode.DOWN).longValue();
        }
        if (cpu >= d) {
            j2 = (long) Math.floor(cpu / d);
        }
        return Math.min(j, j2);
    }

    private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerException {
        List<KubernetesPod> podsWithLabels = this.flinkKubeClient.getPodsWithLabels(KubernetesUtils.getTaskManagerSelectors(this.clusterId));
        ArrayList arrayList = new ArrayList();
        for (KubernetesPod kubernetesPod : podsWithLabels) {
            KubernetesWorkerNode kubernetesWorkerNode = new KubernetesWorkerNode(new ResourceID(kubernetesPod.getName()));
            long attempt = kubernetesWorkerNode.getAttempt();
            if (attempt > this.currentMaxAttemptId) {
                this.currentMaxAttemptId = attempt;
            }
            if (kubernetesPod.isTerminated() || !kubernetesPod.isScheduled()) {
                stopPod(kubernetesPod.getName());
            } else {
                arrayList.add(kubernetesWorkerNode);
            }
        }
        Logger logger = this.log;
        Integer valueOf = Integer.valueOf(arrayList.size());
        long j = this.currentMaxAttemptId + 1;
        this.currentMaxAttemptId = j;
        logger.info("Recovered {} pods from previous attempts, current attempt id is {}.", valueOf, Long.valueOf(j));
        getResourceEventHandler().onPreviousAttemptWorkersRecovered(arrayList);
    }

    private void updateKubernetesServiceTargetPortIfNecessary() throws Exception {
        if (KubernetesUtils.isHostNetwork(this.flinkConfig)) {
            int intValue = ResourceManagerUtils.parseRestBindPortFromWebInterfaceUrl(this.webInterfaceUrl).intValue();
            Preconditions.checkArgument(intValue > 0, "Failed to parse rest port from " + this.webInterfaceUrl);
            this.flinkKubeClient.updateServiceTargetPort(KubernetesService.ServiceType.REST_SERVICE, this.clusterId, Constants.REST_PORT_NAME, intValue).get();
            if (HighAvailabilityMode.isHighAvailabilityModeActivated(this.flinkConfig)) {
                return;
            }
            this.flinkKubeClient.updateServiceTargetPort(KubernetesService.ServiceType.INTERNAL_SERVICE, this.clusterId, Constants.BLOB_SERVER_PORT_NAME, Integer.parseInt(this.flinkConfig.getString(BlobServerOptions.PORT))).get();
            this.flinkKubeClient.updateServiceTargetPort(KubernetesService.ServiceType.INTERNAL_SERVICE, this.clusterId, Constants.JOB_MANAGER_RPC_PORT_NAME, this.flinkConfig.getInteger(JobManagerOptions.PORT)).get();
        }
    }

    private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(TaskExecutorProcessSpec taskExecutorProcessSpec) {
        long j = this.currentMaxPodId + 1;
        this.currentMaxPodId = j;
        String format = String.format(TASK_MANAGER_POD_FORMAT, this.clusterId, Long.valueOf(this.currentMaxAttemptId), Long.valueOf(j), UUID.randomUUID());
        ContaineredTaskManagerParameters create = ContaineredTaskManagerParameters.create(this.flinkConfig, taskExecutorProcessSpec);
        Configuration configuration = new Configuration(this.flinkConfig);
        configuration.set((ConfigOption<ConfigOption<String>>) TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, (ConfigOption<String>) format);
        return new KubernetesTaskManagerParameters(this.flinkConfig, format, BootstrapTools.getDynamicPropertiesAsString(this.flinkClientConfig, configuration), ProcessMemoryUtils.generateJvmParametersStr(taskExecutorProcessSpec), create, ExternalResourceUtils.getExternalResourceConfigurationKeys(this.flinkConfig, KubernetesConfigOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX));
    }

    public ResourceUpdateTask getResourceUpdateTask() {
        return this.resourceUpdateTask;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handlePodEventsInMainThread(List<KubernetesPod> list) {
        getMainThreadExecutor().execute(() -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                KubernetesPod kubernetesPod = (KubernetesPod) it.next();
                if (kubernetesPod.isTerminated()) {
                    onPodTerminated(kubernetesPod);
                } else if (kubernetesPod.isScheduled()) {
                    onPodScheduled(kubernetesPod);
                }
            }
        });
    }

    private void onPodScheduled(KubernetesPod kubernetesPod) {
        String name = kubernetesPod.getName();
        CompletableFuture<KubernetesWorkerNode> remove = this.requestResourceFutures.remove(name);
        if (remove == null) {
            this.log.debug("Ignore TaskManager pod that is already added: {}", name);
        } else {
            this.log.info("Received new TaskManager pod: {}", name);
            remove.complete(new KubernetesWorkerNode(new ResourceID(name)));
        }
    }

    private void onPodTerminated(KubernetesPod kubernetesPod) {
        String name = kubernetesPod.getName();
        this.log.debug("TaskManager pod {} is terminated.", name);
        CompletableFuture<KubernetesWorkerNode> remove = this.requestResourceFutures.remove(name);
        if (remove != null) {
            this.log.warn("Pod {} is terminated before being scheduled.", name);
            remove.completeExceptionally(new FlinkException("Pod is terminated."));
        }
        getResourceEventHandler().onWorkerTerminated(new ResourceID(name), kubernetesPod.getTerminatedDiagnostics());
        stopPod(name);
    }

    private void stopPod(String str) {
        this.flinkKubeClient.stopPod(str).whenComplete((r7, th) -> {
            if (th != null) {
                this.log.warn("Could not remove TaskManager pod {}, exception: {}", str, th);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Optional<KubernetesWatch> watchTaskManagerPods() throws Exception {
        return Optional.of(this.flinkKubeClient.watchPodsAndDoCallback(KubernetesUtils.getTaskManagerSelectors(this.clusterId), new PodCallbackHandlerImpl()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Optional<KubernetesSharedWatcher.Watch> watchAffinityConfigMap() {
        return Optional.of(this.flinkKubeClient.createConfigMapSharedWatcher(KubernetesUtils.getCommonLabels(this.clusterId)).watch(this.affinityConfigMapName, new ConfigMapCallbackHandlerImpl(), null));
    }

    private List<String> getCurrentBlacklistedNodes(Supplier<List<String>> supplier) {
        switch (this.blacklistMode) {
            case ConfigMap:
                return supplier.get();
            case NodeOperator:
                return this.blacklistedNodes;
            default:
                throw new IllegalStateException(String.format("Blacklist Mode should be %s or %s", KubernetesConfigOptions.BlacklistMode.ConfigMap, KubernetesConfigOptions.BlacklistMode.NodeOperator));
        }
    }

    private void updateTargetCUInFlinkConfigMap(Integer num) {
        if (num == null) {
            this.log.warn("Received target CU is null");
        } else {
            this.log.info("Updating targetCU to {} in flink-conf config map", num);
            this.flinkKubeClient.checkAndUpdateConfigMap(FlinkConfMountDecorator.getFlinkConfConfigMapName(this.clusterId), kubernetesConfigMap -> {
                kubernetesConfigMap.getData().put(TARGET_CU_NAME, String.valueOf(num));
                return Optional.of(kubernetesConfigMap);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Integer getTargetCUFromFlinkConfigMap() {
        return (Integer) this.flinkKubeClient.getConfigMap(FlinkConfMountDecorator.getFlinkConfConfigMapName(this.clusterId)).map(kubernetesConfigMap -> {
            return kubernetesConfigMap.getData().get(TARGET_CU_NAME);
        }).map(Integer::parseInt).orElse(Integer.valueOf(this.flinkConfig.getInteger(ClusterOptions.MAX_CU)));
    }

    private Integer getCurrentCU() {
        return Integer.valueOf(ComputationUnitUtils.calcCU(this.flinkKubeClient.getPodsWithLabels(KubernetesUtils.getTaskManagerLabels(this.clusterId)).size(), this.flinkKubeClient.getPodsWithLabels(KubernetesUtils.getJobManagerLabels(this.clusterId)).size()));
    }

    private void adaptPodsForCurrentResources() {
        if (this.resourceUpdateEventQueue.isEmpty()) {
            return;
        }
        LinkedList linkedList = new LinkedList(this.resourceUpdateEventQueue);
        this.resourceUpdateEventQueue.removeAll(linkedList);
        this.log.info("Event processing with {}", linkedList);
        List list = (List) linkedList.stream().filter(resourceUpdateEvent -> {
            return resourceUpdateEvent.getEventType().equals(ResourceUpdateEvent.EventType.BlacklistEvent);
        }).findFirst().map((v0) -> {
            return v0.getBlacklistNodes();
        }).orElse(null);
        Integer num = (Integer) linkedList.stream().filter(resourceUpdateEvent2 -> {
            return resourceUpdateEvent2.getEventType().equals(ResourceUpdateEvent.EventType.QuotaEvent);
        }).findFirst().map((v0) -> {
            return v0.getTargetCU();
        }).orElse(null);
        if (this.blacklistMode.equals(KubernetesConfigOptions.BlacklistMode.NodeOperator) && list != null) {
            this.blacklistedNodes.clear();
            this.blacklistedNodes.addAll(list);
        }
        if (list == null) {
            list = Collections.emptyList();
        }
        Set<ResourceID> blacklistedPods = KubernetesUtils.getBlacklistedPods(this.flinkKubeClient, list, KubernetesUtils.getJobManagerLabels(this.clusterId));
        Set<ResourceID> blacklistedPods2 = KubernetesUtils.getBlacklistedPods(this.flinkKubeClient, list, KubernetesUtils.getTaskManagerLabels(this.clusterId));
        int intValue = getTargetCUFromFlinkConfigMap().intValue();
        this.log.debug("CU from config map: {}", Integer.valueOf(intValue));
        if (num != null && num.intValue() > intValue) {
            updateTargetCUInFlinkConfigMap(num);
        }
        this.log.info("Start redeploying with {}, {}, {}", num, blacklistedPods2, blacklistedPods);
        boolean callForcedRedeploying = getResourceEventHandler().callForcedRedeploying(num, blacklistedPods2, blacklistedPods);
        if (num == null || num.intValue() >= intValue) {
            return;
        }
        if (callForcedRedeploying) {
            updateTargetCUInFlinkConfigMap(num);
        } else if (num.intValue() > getCurrentCU().intValue()) {
            updateTargetCUInFlinkConfigMap(num);
        }
    }
}
