package org.apache.flink.runtime.jobmaster;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.heartbeat.NoOpHeartbeatManager;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.PartitionTrackerFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentReconciler;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorToJobManagerHeartbeatPayload;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMaster.class */
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
    public static final String JOB_MANAGER_NAME = "jobmanager";
    private final JobMasterConfiguration jobMasterConfiguration;
    private final ResourceID resourceId;
    private final JobGraph jobGraph;
    private final Time rpcTimeout;
    private final HighAvailabilityServices highAvailabilityServices;
    private final BlobWriter blobWriter;
    private final HeartbeatServices heartbeatServices;
    private final JobManagerJobMetricGroupFactory jobMetricGroupFactory;
    private final ScheduledExecutorService scheduledExecutorService;
    private final OnCompletionActions jobCompletionActions;
    private final FatalErrorHandler fatalErrorHandler;
    private final ClassLoader userCodeLoader;
    private final SlotPool slotPool;
    private final SchedulerNGFactory schedulerNGFactory;
    private final long initializationTimestamp;
    private final boolean retrieveTaskManagerHostName;
    private final BackPressureStatsTracker backPressureStatsTracker;
    private final LeaderRetrievalService resourceManagerLeaderRetriever;
    private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;
    private final ShuffleMaster<?> shuffleMaster;
    private HeartbeatManager<TaskExecutorToJobManagerHeartbeatPayload, AllocatedSlotReport> taskManagerHeartbeatManager;
    private HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
    private SchedulerNG schedulerNG;

    @Nullable
    private JobManagerJobStatusListener jobStatusListener;

    @Nullable
    private JobManagerJobMetricGroup jobManagerJobMetricGroup;

    @Nullable
    private ResourceManagerAddress resourceManagerAddress;

    @Nullable
    private ResourceManagerConnection resourceManagerConnection;

    @Nullable
    private EstablishedResourceManagerConnection establishedResourceManagerConnection;
    private Map<String, Object> accumulators;
    private final JobMasterPartitionTracker partitionTracker;
    private final ExecutionDeploymentTracker executionDeploymentTracker;
    private final ExecutionDeploymentReconciler executionDeploymentReconciler;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMaster$JobManagerJobStatusListener.class */
    public class JobManagerJobStatusListener implements JobStatusListener {
        private volatile boolean running;

        private JobManagerJobStatusListener() {
            this.running = true;
        }

        @Override // org.apache.flink.runtime.executiongraph.JobStatusListener
        public void jobStatusChanges(JobID jobID, JobStatus jobStatus, long j, Throwable th) {
            if (this.running) {
                JobMaster.this.runAsync(() -> {
                    JobMaster.this.jobStatusChanged(jobStatus);
                });
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void stop() {
            this.running = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMaster$ResourceManagerConnection.class */
    public class ResourceManagerConnection extends RegisteredRpcConnection<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess> {
        private final JobID jobID;
        private final ResourceID jobManagerResourceID;
        private final String jobManagerRpcAddress;
        private final JobMasterId jobMasterId;

        ResourceManagerConnection(Logger logger, JobID jobID, ResourceID resourceID, String str, JobMasterId jobMasterId, String str2, ResourceManagerId resourceManagerId, Executor executor) {
            super(logger, str2, resourceManagerId, executor);
            this.jobID = (JobID) Preconditions.checkNotNull(jobID);
            this.jobManagerResourceID = (ResourceID) Preconditions.checkNotNull(resourceID);
            this.jobManagerRpcAddress = (String) Preconditions.checkNotNull(str);
            this.jobMasterId = (JobMasterId) Preconditions.checkNotNull(jobMasterId);
        }

        @Override // org.apache.flink.runtime.registration.RegisteredRpcConnection
        protected RetryingRegistration<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() {
            return new RetryingRegistration<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess>(this.log, JobMaster.this.getRpcService(), "ResourceManager", ResourceManagerGateway.class, getTargetAddress(), getTargetLeaderId(), JobMaster.this.jobMasterConfiguration.getRetryingRegistrationConfiguration()) { // from class: org.apache.flink.runtime.jobmaster.JobMaster.ResourceManagerConnection.1
                /* JADX INFO: Access modifiers changed from: protected */
                @Override // org.apache.flink.runtime.registration.RetryingRegistration
                public CompletableFuture<RegistrationResponse> invokeRegistration(ResourceManagerGateway resourceManagerGateway, ResourceManagerId resourceManagerId, long j) {
                    return resourceManagerGateway.registerJobManager(ResourceManagerConnection.this.jobMasterId, ResourceManagerConnection.this.jobManagerResourceID, ResourceManagerConnection.this.jobManagerRpcAddress, ResourceManagerConnection.this.jobID, Time.milliseconds(j));
                }
            };
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.flink.runtime.registration.RegisteredRpcConnection
        public void onRegistrationSuccess(JobMasterRegistrationSuccess jobMasterRegistrationSuccess) {
            JobMaster.this.runAsync(() -> {
                if (this == JobMaster.this.resourceManagerConnection) {
                    JobMaster.this.establishResourceManagerConnection(jobMasterRegistrationSuccess);
                }
            });
        }

        @Override // org.apache.flink.runtime.registration.RegisteredRpcConnection
        protected void onRegistrationFailure(Throwable th) {
            JobMaster.this.handleJobMasterError(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMaster$ResourceManagerHeartbeatListener.class */
    public class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
        private ResourceManagerHeartbeatListener() {
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void notifyHeartbeatTimeout(ResourceID resourceID) {
            JobMaster.this.validateRunsInMainThread();
            JobMaster.this.log.info("The heartbeat of ResourceManager with id {} timed out.", resourceID.getStringWithMetadata());
            if (JobMaster.this.establishedResourceManagerConnection == null || !JobMaster.this.establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceID)) {
                return;
            }
            JobMaster.this.reconnectToResourceManager(new JobMasterException(String.format("The heartbeat of ResourceManager with id %s timed out.", resourceID.getStringWithMetadata())));
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void reportPayload(ResourceID resourceID, Void r3) {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public Void retrievePayload(ResourceID resourceID) {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMaster$ResourceManagerLeaderListener.class */
    public class ResourceManagerLeaderListener implements LeaderRetrievalListener {
        private ResourceManagerLeaderListener() {
        }

        @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
        public void notifyLeaderAddress(String str, UUID uuid) {
            JobMaster.this.runAsync(() -> {
                JobMaster.this.notifyOfNewResourceManagerLeader(str, ResourceManagerId.fromUuidOrNull(uuid));
            });
        }

        @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
        public void handleError(Exception exc) {
            JobMaster.this.handleJobMasterError(new Exception("Fatal error in the ResourceManager leader service", exc));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMaster$TaskManagerHeartbeatListener.class */
    public class TaskManagerHeartbeatListener implements HeartbeatListener<TaskExecutorToJobManagerHeartbeatPayload, AllocatedSlotReport> {
        private TaskManagerHeartbeatListener() {
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void notifyHeartbeatTimeout(ResourceID resourceID) {
            JobMaster.this.validateRunsInMainThread();
            JobMaster.this.disconnectTaskManager(resourceID, new TimeoutException("Heartbeat of TaskManager with id " + resourceID.getStringWithMetadata() + " timed out."));
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void reportPayload(ResourceID resourceID, TaskExecutorToJobManagerHeartbeatPayload taskExecutorToJobManagerHeartbeatPayload) {
            JobMaster.this.validateRunsInMainThread();
            JobMaster.this.executionDeploymentReconciler.reconcileExecutionDeployments(resourceID, taskExecutorToJobManagerHeartbeatPayload.getExecutionDeploymentReport(), JobMaster.this.executionDeploymentTracker.getExecutionsOn(resourceID));
            Iterator<AccumulatorSnapshot> it = taskExecutorToJobManagerHeartbeatPayload.getAccumulatorReport().getAccumulatorSnapshots().iterator();
            while (it.hasNext()) {
                JobMaster.this.schedulerNG.updateAccumulators(it.next());
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public AllocatedSlotReport retrievePayload(ResourceID resourceID) {
            JobMaster.this.validateRunsInMainThread();
            return JobMaster.this.slotPool.createAllocatedSlotReport(resourceID);
        }
    }

    public JobMaster(RpcService rpcService, JobMasterConfiguration jobMasterConfiguration, ResourceID resourceID, final JobGraph jobGraph, HighAvailabilityServices highAvailabilityServices, SlotPoolFactory slotPoolFactory, JobManagerSharedServices jobManagerSharedServices, HeartbeatServices heartbeatServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, OnCompletionActions onCompletionActions, FatalErrorHandler fatalErrorHandler, ClassLoader classLoader, SchedulerNGFactory schedulerNGFactory, ShuffleMaster<?> shuffleMaster, PartitionTrackerFactory partitionTrackerFactory, ExecutionDeploymentTracker executionDeploymentTracker, ExecutionDeploymentReconciler.Factory factory, long j) throws Exception {
        super(rpcService, AkkaRpcServiceUtils.createRandomName("jobmanager"), null);
        ExecutionDeploymentReconciliationHandler executionDeploymentReconciliationHandler = new ExecutionDeploymentReconciliationHandler() { // from class: org.apache.flink.runtime.jobmaster.JobMaster.1
            @Override // org.apache.flink.runtime.jobmaster.ExecutionDeploymentReconciliationHandler
            public void onMissingDeploymentsOf(Collection<ExecutionAttemptID> collection, ResourceID resourceID2) {
                JobMaster.this.log.debug("Failing deployments {} due to no longer being deployed.", collection);
                for (ExecutionAttemptID executionAttemptID : collection) {
                    JobMaster.this.schedulerNG.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), executionAttemptID, ExecutionState.FAILED, new FlinkException(String.format("Execution %s is unexpectedly no longer running on task executor %s.", executionAttemptID, resourceID2))));
                }
            }

            @Override // org.apache.flink.runtime.jobmaster.ExecutionDeploymentReconciliationHandler
            public void onUnknownDeploymentsOf(Collection<ExecutionAttemptID> collection, ResourceID resourceID2) {
                JobMaster.this.log.debug("Canceling left-over deployments {} on task executor {}.", collection, resourceID2);
                for (ExecutionAttemptID executionAttemptID : collection) {
                    Tuple2 tuple2 = (Tuple2) JobMaster.this.registeredTaskManagers.get(resourceID2);
                    if (tuple2 != null) {
                        ((TaskExecutorGateway) tuple2.f1).cancelTask(executionAttemptID, JobMaster.this.rpcTimeout);
                    }
                }
            }
        };
        this.executionDeploymentTracker = executionDeploymentTracker;
        this.executionDeploymentReconciler = factory.create(executionDeploymentReconciliationHandler);
        this.jobMasterConfiguration = (JobMasterConfiguration) Preconditions.checkNotNull(jobMasterConfiguration);
        this.resourceId = (ResourceID) Preconditions.checkNotNull(resourceID);
        this.jobGraph = (JobGraph) Preconditions.checkNotNull(jobGraph);
        this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
        this.highAvailabilityServices = (HighAvailabilityServices) Preconditions.checkNotNull(highAvailabilityServices);
        this.blobWriter = jobManagerSharedServices.getBlobWriter();
        this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService();
        this.jobCompletionActions = (OnCompletionActions) Preconditions.checkNotNull(onCompletionActions);
        this.fatalErrorHandler = (FatalErrorHandler) Preconditions.checkNotNull(fatalErrorHandler);
        this.userCodeLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
        this.schedulerNGFactory = (SchedulerNGFactory) Preconditions.checkNotNull(schedulerNGFactory);
        this.heartbeatServices = (HeartbeatServices) Preconditions.checkNotNull(heartbeatServices);
        this.jobMetricGroupFactory = (JobManagerJobMetricGroupFactory) Preconditions.checkNotNull(jobManagerJobMetricGroupFactory);
        this.initializationTimestamp = j;
        this.retrieveTaskManagerHostName = jobMasterConfiguration.getConfiguration().getBoolean(JobManagerOptions.RETRIEVE_TASK_MANAGER_HOSTNAME);
        String name = jobGraph.getName();
        JobID jobID = jobGraph.getJobID();
        this.log.info("Initializing job {} ({}).", name, jobID);
        this.resourceManagerLeaderRetriever = this.highAvailabilityServices.getResourceManagerLeaderRetriever();
        this.slotPool = ((SlotPoolFactory) Preconditions.checkNotNull(slotPoolFactory)).createSlotPool(jobID);
        this.registeredTaskManagers = new HashMap(4);
        this.partitionTracker = ((PartitionTrackerFactory) Preconditions.checkNotNull(partitionTrackerFactory)).create(resourceID2 -> {
            Tuple2<TaskManagerLocation, TaskExecutorGateway> tuple2 = this.registeredTaskManagers.get(resourceID2);
            return tuple2 == null ? Optional.empty() : Optional.of(tuple2.f1);
        });
        this.backPressureStatsTracker = (BackPressureStatsTracker) Preconditions.checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker());
        this.shuffleMaster = (ShuffleMaster) Preconditions.checkNotNull(shuffleMaster);
        this.jobManagerJobMetricGroup = jobManagerJobMetricGroupFactory.create(jobGraph);
        this.schedulerNG = createScheduler(executionDeploymentTracker, this.jobManagerJobMetricGroup);
        this.jobStatusListener = null;
        this.resourceManagerConnection = null;
        this.establishedResourceManagerConnection = null;
        this.accumulators = new HashMap();
        this.taskManagerHeartbeatManager = NoOpHeartbeatManager.getInstance();
        this.resourceManagerHeartbeatManager = NoOpHeartbeatManager.getInstance();
    }

    private SchedulerNG createScheduler(ExecutionDeploymentTracker executionDeploymentTracker, JobManagerJobMetricGroup jobManagerJobMetricGroup) throws Exception {
        return this.schedulerNGFactory.createInstance(this.log, this.jobGraph, this.backPressureStatsTracker, this.scheduledExecutorService, this.jobMasterConfiguration.getConfiguration(), this.slotPool, this.scheduledExecutorService, this.userCodeLoader, this.highAvailabilityServices.getCheckpointRecoveryFactory(), this.rpcTimeout, this.blobWriter, jobManagerJobMetricGroup, this.jobMasterConfiguration.getSlotRequestTimeout(), this.shuffleMaster, this.partitionTracker, executionDeploymentTracker, this.initializationTimestamp);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterService
    public CompletableFuture<Acknowledge> start(JobMasterId jobMasterId) throws Exception {
        start();
        return callAsyncWithoutFencing(() -> {
            return startJobExecution(jobMasterId);
        }, RpcUtils.INF_TIMEOUT);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterService
    public CompletableFuture<Acknowledge> suspend(Exception exc) {
        return callAsyncWithoutFencing(() -> {
            return suspendExecution(exc);
        }, RpcUtils.INF_TIMEOUT).whenComplete((BiConsumer<? super V, ? super Throwable>) (acknowledge, th) -> {
            stop();
        });
    }

    @Override // org.apache.flink.runtime.rpc.RpcEndpoint
    public CompletableFuture<Void> onStop() {
        this.log.info("Stopping the JobMaster for job {}({}).", this.jobGraph.getName(), this.jobGraph.getJobID());
        suspendExecution(new FlinkException("Stopping JobMaster for job " + this.jobGraph.getName() + '(' + this.jobGraph.getJobID() + ")."));
        this.slotPool.close();
        return CompletableFuture.completedFuture(null);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<Acknowledge> cancel(Time time) {
        this.schedulerNG.cancel();
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState) {
        Preconditions.checkNotNull(taskExecutionState, "taskExecutionState");
        return this.schedulerNG.updateTaskExecutionState(taskExecutionState) ? CompletableFuture.completedFuture(Acknowledge.get()) : FutureUtils.completedExceptionally(new ExecutionGraphException("The execution attempt " + taskExecutionState.getID() + " was not found."));
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<SerializedInputSplit> requestNextInputSplit(JobVertexID jobVertexID, ExecutionAttemptID executionAttemptID) {
        try {
            return CompletableFuture.completedFuture(this.schedulerNG.requestNextInputSplit(jobVertexID, executionAttemptID));
        } catch (IOException e) {
            this.log.warn("Error while requesting next input split", (Throwable) e);
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<ExecutionState> requestPartitionState(IntermediateDataSetID intermediateDataSetID, ResultPartitionID resultPartitionID) {
        try {
            return CompletableFuture.completedFuture(this.schedulerNG.requestPartitionState(intermediateDataSetID, resultPartitionID));
        } catch (PartitionProducerDisposedException e) {
            this.log.info("Error while requesting partition state", (Throwable) e);
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<Acknowledge> scheduleOrUpdateConsumers(ResultPartitionID resultPartitionID, Time time) {
        this.schedulerNG.scheduleOrUpdateConsumers(resultPartitionID);
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<Acknowledge> disconnectTaskManager(ResourceID resourceID, Exception exc) {
        this.log.debug("Disconnect TaskExecutor {} because: {}", resourceID.getStringWithMetadata(), exc.getMessage());
        this.taskManagerHeartbeatManager.unmonitorTarget(resourceID);
        this.slotPool.releaseTaskManager(resourceID, exc);
        this.partitionTracker.stopTrackingPartitionsFor(resourceID);
        Tuple2<TaskManagerLocation, TaskExecutorGateway> remove = this.registeredTaskManagers.remove(resourceID);
        if (remove != null) {
            remove.f1.disconnectJobManager(this.jobGraph.getJobID(), exc);
        }
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway
    public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long j, CheckpointMetrics checkpointMetrics, TaskStateSnapshot taskStateSnapshot) {
        this.schedulerNG.acknowledgeCheckpoint(jobID, executionAttemptID, j, checkpointMetrics, taskStateSnapshot);
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway
    public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) {
        this.schedulerNG.declineCheckpoint(declineCheckpoint);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterOperatorEventGateway
    public CompletableFuture<Acknowledge> sendOperatorEventToCoordinator(ExecutionAttemptID executionAttemptID, OperatorID operatorID, SerializedValue<OperatorEvent> serializedValue) {
        try {
            this.schedulerNG.deliverOperatorEventToCoordinator(executionAttemptID, operatorID, serializedValue.deserializeValue(this.userCodeLoader));
            return CompletableFuture.completedFuture(Acknowledge.get());
        } catch (Exception e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.KvStateLocationOracle
    public CompletableFuture<KvStateLocation> requestKvStateLocation(JobID jobID, String str) {
        try {
            return CompletableFuture.completedFuture(this.schedulerNG.requestKvStateLocation(jobID, str));
        } catch (FlinkJobNotFoundException | UnknownKvStateLocation e) {
            this.log.info("Error while request key-value state location", e);
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.KvStateRegistryGateway
    public CompletableFuture<Acknowledge> notifyKvStateRegistered(JobID jobID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange, String str, KvStateID kvStateID, InetSocketAddress inetSocketAddress) {
        try {
            this.schedulerNG.notifyKvStateRegistered(jobID, jobVertexID, keyGroupRange, str, kvStateID, inetSocketAddress);
            return CompletableFuture.completedFuture(Acknowledge.get());
        } catch (FlinkJobNotFoundException e) {
            this.log.info("Error while receiving notification about key-value state registration", (Throwable) e);
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.KvStateRegistryGateway
    public CompletableFuture<Acknowledge> notifyKvStateUnregistered(JobID jobID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange, String str) {
        try {
            this.schedulerNG.notifyKvStateUnregistered(jobID, jobVertexID, keyGroupRange, str);
            return CompletableFuture.completedFuture(Acknowledge.get());
        } catch (FlinkJobNotFoundException e) {
            this.log.info("Error while receiving notification about key-value state de-registration", (Throwable) e);
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<Collection<SlotOffer>> offerSlots(ResourceID resourceID, Collection<SlotOffer> collection, Time time) {
        Tuple2<TaskManagerLocation, TaskExecutorGateway> tuple2 = this.registeredTaskManagers.get(resourceID);
        if (tuple2 == null) {
            return FutureUtils.completedExceptionally(new Exception("Unknown TaskManager " + resourceID));
        }
        return CompletableFuture.completedFuture(this.slotPool.offerSlots(tuple2.f0, new RpcTaskManagerGateway(tuple2.f1, getFencingToken()), collection));
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public void failSlot(ResourceID resourceID, AllocationID allocationID, Exception exc) {
        if (this.registeredTaskManagers.containsKey(resourceID)) {
            internalFailAllocation(allocationID, exc);
        } else {
            this.log.warn("Cannot fail slot " + allocationID + " because the TaskManager " + resourceID + " is unknown.");
        }
    }

    private void internalFailAllocation(AllocationID allocationID, Exception exc) {
        this.slotPool.failAllocation(allocationID, exc).ifPresent(resourceID -> {
            if (this.partitionTracker.isTrackingPartitionsFor(resourceID)) {
                return;
            }
            releaseEmptyTaskManager(resourceID);
        });
    }

    private void releaseEmptyTaskManager(ResourceID resourceID) {
        disconnectTaskManager(resourceID, new FlinkException(String.format("No more slots registered at JobMaster %s.", resourceID.getStringWithMetadata())));
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<RegistrationResponse> registerTaskManager(String str, UnresolvedTaskManagerLocation unresolvedTaskManagerLocation, Time time) {
        try {
            TaskManagerLocation fromUnresolvedLocation = this.retrieveTaskManagerHostName ? TaskManagerLocation.fromUnresolvedLocation(unresolvedTaskManagerLocation, TaskManagerLocation.ResolutionMode.RETRIEVE_HOST_NAME) : TaskManagerLocation.fromUnresolvedLocation(unresolvedTaskManagerLocation, TaskManagerLocation.ResolutionMode.USE_IP_ONLY);
            ResourceID resourceID = fromUnresolvedLocation.getResourceID();
            if (this.registeredTaskManagers.containsKey(resourceID)) {
                return CompletableFuture.completedFuture(new JMTMRegistrationSuccess(this.resourceId));
            }
            TaskManagerLocation taskManagerLocation = fromUnresolvedLocation;
            return getRpcService().connect(str, TaskExecutorGateway.class).handleAsync((taskExecutorGateway, th) -> {
                if (th != null) {
                    return new RegistrationResponse.Decline(th.getMessage());
                }
                this.slotPool.registerTaskManager(resourceID);
                this.registeredTaskManagers.put(resourceID, Tuple2.of(taskManagerLocation, taskExecutorGateway));
                this.taskManagerHeartbeatManager.monitorTarget(resourceID, new HeartbeatTarget<AllocatedSlotReport>() { // from class: org.apache.flink.runtime.jobmaster.JobMaster.2
                    @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
                    public void receiveHeartbeat(ResourceID resourceID2, AllocatedSlotReport allocatedSlotReport) {
                    }

                    @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
                    public void requestHeartbeat(ResourceID resourceID2, AllocatedSlotReport allocatedSlotReport) {
                        taskExecutorGateway.heartbeatFromJobManager(resourceID2, allocatedSlotReport);
                    }
                });
                return new JMTMRegistrationSuccess(this.resourceId);
            }, (Executor) getMainThreadExecutor());
        } catch (Throwable th2) {
            String format = String.format("Could not accept TaskManager registration. TaskManager address %s cannot be resolved. %s", unresolvedTaskManagerLocation.getExternalAddress(), th2.getMessage());
            this.log.error(format);
            return CompletableFuture.completedFuture(new RegistrationResponse.Decline(format));
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public void disconnectResourceManager(ResourceManagerId resourceManagerId, Exception exc) {
        if (isConnectingToResourceManager(resourceManagerId)) {
            reconnectToResourceManager(exc);
        }
    }

    private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerId) {
        return this.resourceManagerAddress != null && this.resourceManagerAddress.getResourceManagerId().equals(resourceManagerId);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public void heartbeatFromTaskManager(ResourceID resourceID, TaskExecutorToJobManagerHeartbeatPayload taskExecutorToJobManagerHeartbeatPayload) {
        this.taskManagerHeartbeatManager.receiveHeartbeat(resourceID, taskExecutorToJobManagerHeartbeatPayload);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public void heartbeatFromResourceManager(ResourceID resourceID) {
        this.resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<JobDetails> requestJobDetails(Time time) {
        return CompletableFuture.completedFuture(this.schedulerNG.requestJobDetails());
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<JobStatus> requestJobStatus(Time time) {
        return CompletableFuture.completedFuture(this.schedulerNG.requestJobStatus());
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<ArchivedExecutionGraph> requestJob(Time time) {
        return CompletableFuture.completedFuture(this.schedulerNG.requestJob());
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<String> triggerSavepoint(@Nullable String str, boolean z, Time time) {
        return this.schedulerNG.triggerSavepoint(str, z);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<String> stopWithSavepoint(@Nullable String str, boolean z, Time time) {
        return this.schedulerNG.stopWithSavepoint(str, z);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(JobVertexID jobVertexID) {
        try {
            return CompletableFuture.completedFuture(OperatorBackPressureStatsResponse.of(this.schedulerNG.requestOperatorBackPressureStats(jobVertexID).orElse(null)));
        } catch (FlinkException e) {
            this.log.info("Error while requesting operator back pressure stats", (Throwable) e);
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public void notifyAllocationFailure(AllocationID allocationID, Exception exc) {
        internalFailAllocation(allocationID, exc);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<Object> updateGlobalAggregate(String str, Object obj, byte[] bArr) {
        try {
            AggregateFunction aggregateFunction = (AggregateFunction) InstantiationUtil.deserializeObject(bArr, this.userCodeLoader);
            Object obj2 = this.accumulators.get(str);
            if (null == obj2) {
                obj2 = aggregateFunction.createAccumulator();
            }
            Object add = aggregateFunction.add(obj, obj2);
            this.accumulators.put(str, add);
            return CompletableFuture.completedFuture(aggregateFunction.getResult(add));
        } catch (Exception e) {
            this.log.error("Error while attempting to deserialize user AggregateFunction.");
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterGateway
    public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(OperatorID operatorID, SerializedValue<CoordinationRequest> serializedValue, Time time) {
        try {
            return this.schedulerNG.deliverCoordinationRequestToCoordinator(operatorID, serializedValue.deserializeValue(this.userCodeLoader));
        } catch (Exception e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    private Acknowledge startJobExecution(JobMasterId jobMasterId) throws Exception {
        validateRunsInMainThread();
        Preconditions.checkNotNull(jobMasterId, "The new JobMasterId must not be null.");
        if (Objects.equals(getFencingToken(), jobMasterId)) {
            this.log.info("Already started the job execution with JobMasterId {}.", jobMasterId);
            return Acknowledge.get();
        }
        setNewFencingToken(jobMasterId);
        startJobMasterServices();
        this.log.info("Starting execution of job {} ({}) under job master id {}.", this.jobGraph.getName(), this.jobGraph.getJobID(), jobMasterId);
        resetAndStartScheduler();
        return Acknowledge.get();
    }

    private void startJobMasterServices() throws Exception {
        startHeartbeatServices();
        this.slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
        reconnectToResourceManager(new FlinkException("Starting JobMaster component."));
        this.resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    }

    private void setNewFencingToken(JobMasterId jobMasterId) {
        if (getFencingToken() != null) {
            this.log.info("Restarting old job with JobMasterId {}. The new JobMasterId is {}.", getFencingToken(), jobMasterId);
            suspendExecution(new FlinkException("Old job with JobMasterId " + getFencingToken() + " is restarted with a new JobMasterId " + jobMasterId + '.'));
        }
        setFencingToken(jobMasterId);
    }

    private Acknowledge suspendExecution(Exception exc) {
        validateRunsInMainThread();
        if (getFencingToken() == null) {
            this.log.debug("Job has already been suspended or shutdown.");
            return Acknowledge.get();
        }
        setFencingToken(null);
        try {
            this.resourceManagerLeaderRetriever.stop();
            this.resourceManagerAddress = null;
        } catch (Throwable th) {
            this.log.warn("Failed to stop resource manager leader retriever when suspending.", th);
        }
        suspendAndClearSchedulerFields(exc);
        Iterator it = new HashSet(this.registeredTaskManagers.keySet()).iterator();
        while (it.hasNext()) {
            disconnectTaskManager((ResourceID) it.next(), exc);
        }
        this.slotPool.suspend();
        closeResourceManagerConnection(exc);
        stopHeartbeatServices();
        return Acknowledge.get();
    }

    private void stopHeartbeatServices() {
        this.taskManagerHeartbeatManager.stop();
        this.resourceManagerHeartbeatManager.stop();
    }

    private void startHeartbeatServices() {
        this.taskManagerHeartbeatManager = this.heartbeatServices.createHeartbeatManagerSender(this.resourceId, new TaskManagerHeartbeatListener(), getMainThreadExecutor(), this.log);
        this.resourceManagerHeartbeatManager = this.heartbeatServices.createHeartbeatManager(this.resourceId, new ResourceManagerHeartbeatListener(), getMainThreadExecutor(), this.log);
    }

    private void assignScheduler(SchedulerNG schedulerNG, JobManagerJobMetricGroup jobManagerJobMetricGroup) {
        validateRunsInMainThread();
        Preconditions.checkState(this.schedulerNG.requestJobStatus().isTerminalState());
        Preconditions.checkState(this.jobManagerJobMetricGroup == null);
        this.schedulerNG = schedulerNG;
        this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
    }

    private void resetAndStartScheduler() throws Exception {
        CompletableFuture handle;
        validateRunsInMainThread();
        if (this.schedulerNG.requestJobStatus() == JobStatus.CREATED) {
            handle = CompletableFuture.completedFuture(null);
            this.schedulerNG.setMainThreadExecutor(getMainThreadExecutor());
        } else {
            suspendAndClearSchedulerFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled."));
            JobManagerJobMetricGroup create = this.jobMetricGroupFactory.create(this.jobGraph);
            SchedulerNG createScheduler = createScheduler(this.executionDeploymentTracker, create);
            handle = this.schedulerNG.getTerminationFuture().handle((r7, th) -> {
                createScheduler.setMainThreadExecutor(getMainThreadExecutor());
                assignScheduler(createScheduler, create);
                return null;
            });
        }
        FutureUtils.assertNoException(handle.thenRun(this::startScheduling));
    }

    private void startScheduling() {
        Preconditions.checkState(this.jobStatusListener == null);
        this.jobStatusListener = new JobManagerJobStatusListener();
        this.schedulerNG.registerJobStatusListener(this.jobStatusListener);
        this.schedulerNG.startScheduling();
    }

    private void suspendAndClearSchedulerFields(Exception exc) {
        suspendScheduler(exc);
        clearSchedulerFields();
    }

    private void suspendScheduler(Exception exc) {
        this.schedulerNG.suspend(exc);
        if (this.jobManagerJobMetricGroup != null) {
            this.jobManagerJobMetricGroup.close();
        }
        if (this.jobStatusListener != null) {
            this.jobStatusListener.stop();
        }
    }

    private void clearSchedulerFields() {
        this.jobManagerJobMetricGroup = null;
        this.jobStatusListener = null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleJobMasterError(Throwable th) {
        if (!ExceptionUtils.isJvmFatalError(th)) {
            this.jobCompletionActions.jobMasterFailed(th);
        } else {
            this.log.error("Fatal error occurred on JobManager.", th);
            this.fatalErrorHandler.onFatalError(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void jobStatusChanged(JobStatus jobStatus) {
        validateRunsInMainThread();
        if (jobStatus.isGloballyTerminalState()) {
            runAsync(() -> {
                Consumer consumer;
                Set<ResourceID> keySet = this.registeredTaskManagers.keySet();
                if (jobStatus == JobStatus.FINISHED) {
                    JobMasterPartitionTracker jobMasterPartitionTracker = this.partitionTracker;
                    jobMasterPartitionTracker.getClass();
                    consumer = jobMasterPartitionTracker::stopTrackingAndReleaseOrPromotePartitionsFor;
                } else {
                    JobMasterPartitionTracker jobMasterPartitionTracker2 = this.partitionTracker;
                    jobMasterPartitionTracker2.getClass();
                    consumer = jobMasterPartitionTracker2::stopTrackingAndReleasePartitionsFor;
                }
                keySet.forEach(consumer);
            });
            ArchivedExecutionGraph requestJob = this.schedulerNG.requestJob();
            this.scheduledExecutorService.execute(() -> {
                this.jobCompletionActions.jobReachedGloballyTerminalState(requestJob);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyOfNewResourceManagerLeader(String str, ResourceManagerId resourceManagerId) {
        this.resourceManagerAddress = createResourceManagerAddress(str, resourceManagerId);
        reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", this.resourceManagerAddress)));
    }

    @Nullable
    private ResourceManagerAddress createResourceManagerAddress(@Nullable String str, @Nullable ResourceManagerId resourceManagerId) {
        if (str == null) {
            return null;
        }
        Preconditions.checkNotNull(resourceManagerId);
        return new ResourceManagerAddress(str, resourceManagerId);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reconnectToResourceManager(Exception exc) {
        closeResourceManagerConnection(exc);
        tryConnectToResourceManager();
    }

    private void tryConnectToResourceManager() {
        if (this.resourceManagerAddress != null) {
            connectToResourceManager();
        }
    }

    private void connectToResourceManager() {
        if (!$assertionsDisabled && this.resourceManagerAddress == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.resourceManagerConnection != null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.establishedResourceManagerConnection != null) {
            throw new AssertionError();
        }
        this.log.info("Connecting to ResourceManager {}", this.resourceManagerAddress);
        this.resourceManagerConnection = new ResourceManagerConnection(this.log, this.jobGraph.getJobID(), this.resourceId, getAddress(), getFencingToken(), this.resourceManagerAddress.getAddress(), this.resourceManagerAddress.getResourceManagerId(), this.scheduledExecutorService);
        this.resourceManagerConnection.start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void establishResourceManagerConnection(JobMasterRegistrationSuccess jobMasterRegistrationSuccess) {
        ResourceManagerId resourceManagerId = jobMasterRegistrationSuccess.getResourceManagerId();
        if (this.resourceManagerConnection == null || !Objects.equals(this.resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) {
            this.log.debug("Ignoring resource manager connection to {} because it's duplicated or outdated.", resourceManagerId);
            return;
        }
        this.log.info("JobManager successfully registered at ResourceManager, leader id: {}.", resourceManagerId);
        final ResourceManagerGateway targetGateway = this.resourceManagerConnection.getTargetGateway();
        ResourceID resourceManagerResourceId = jobMasterRegistrationSuccess.getResourceManagerResourceId();
        this.establishedResourceManagerConnection = new EstablishedResourceManagerConnection(targetGateway, resourceManagerResourceId);
        this.slotPool.connectToResourceManager(targetGateway);
        this.resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>() { // from class: org.apache.flink.runtime.jobmaster.JobMaster.3
            @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
            public void receiveHeartbeat(ResourceID resourceID, Void r5) {
                targetGateway.heartbeatFromJobManager(resourceID);
            }

            @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
            public void requestHeartbeat(ResourceID resourceID, Void r3) {
            }
        });
    }

    private void closeResourceManagerConnection(Exception exc) {
        if (this.establishedResourceManagerConnection != null) {
            dissolveResourceManagerConnection(this.establishedResourceManagerConnection, exc);
            this.establishedResourceManagerConnection = null;
        }
        if (this.resourceManagerConnection != null) {
            this.resourceManagerConnection.close();
            this.resourceManagerConnection = null;
        }
    }

    private void dissolveResourceManagerConnection(EstablishedResourceManagerConnection establishedResourceManagerConnection, Exception exc) {
        ResourceID resourceManagerResourceID = establishedResourceManagerConnection.getResourceManagerResourceID();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Close ResourceManager connection {}.", resourceManagerResourceID.getStringWithMetadata(), exc);
        } else {
            this.log.info("Close ResourceManager connection {}: {}.", resourceManagerResourceID.getStringWithMetadata(), exc.getMessage());
        }
        this.resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceID);
        establishedResourceManagerConnection.getResourceManagerGateway().disconnectJobManager(this.jobGraph.getJobID(), exc);
        this.slotPool.disconnectResourceManager();
    }

    @Override // org.apache.flink.runtime.jobmaster.JobMasterService
    public JobMasterGateway getGateway() {
        return (JobMasterGateway) getSelfGateway(JobMasterGateway.class);
    }

    static {
        $assertionsDisabled = !JobMaster.class.desiredAssertionStatus();
    }
}
