package org.apache.flink.runtime.taskexecutor;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationRejection;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.TaskManagerRegistrationInformation;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/DefaultJobLeaderService.class */
public class DefaultJobLeaderService implements JobLeaderService {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultJobLeaderService.class);
    private final UnresolvedTaskManagerLocation ownLocation;
    private final RetryingRegistrationConfiguration retryingRegistrationConfiguration;
    private final UUID taskManagerSession = UUID.randomUUID();
    private final Map<JobID, Tuple2<LeaderRetrievalService, JobManagerLeaderListener>> jobLeaderServices = new ConcurrentHashMap(4);
    private volatile State state = State.CREATED;
    private String ownerAddress = null;
    private RpcService rpcService = null;
    private HighAvailabilityServices highAvailabilityServices = null;
    private JobLeaderListener jobLeaderListener = null;

    /* JADX INFO: Access modifiers changed from: private */
    @ThreadSafe
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/DefaultJobLeaderService$JobManagerLeaderListener.class */
    public final class JobManagerLeaderListener implements LeaderRetrievalListener {
        private final Object lock;
        private final JobID jobId;

        @GuardedBy("lock")
        @Nullable
        private RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess, JMTMRegistrationRejection> rpcConnection;

        @GuardedBy("lock")
        @Nullable
        private JobMasterId currentJobMasterId;
        private volatile boolean stopped;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/DefaultJobLeaderService$JobManagerLeaderListener$JobManagerRegisteredRpcConnection.class */
        public final class JobManagerRegisteredRpcConnection extends RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess, JMTMRegistrationRejection> {
            /* JADX WARN: Multi-variable type inference failed */
            JobManagerRegisteredRpcConnection(Logger logger, String str, JobMasterId jobMasterId, Executor executor) {
                super(logger, str, jobMasterId, executor);
            }

            @Override // org.apache.flink.runtime.registration.RegisteredRpcConnection
            protected RetryingRegistration<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess, JMTMRegistrationRejection> generateRegistration() {
                return new JobManagerRetryingRegistration(DefaultJobLeaderService.LOG, DefaultJobLeaderService.this.rpcService, "JobManager", JobMasterGateway.class, getTargetAddress(), getTargetLeaderId(), DefaultJobLeaderService.this.retryingRegistrationConfiguration, JobManagerLeaderListener.this.jobId, TaskManagerRegistrationInformation.create(DefaultJobLeaderService.this.ownerAddress, DefaultJobLeaderService.this.ownLocation, DefaultJobLeaderService.this.taskManagerSession));
            }

            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.flink.runtime.registration.RegisteredRpcConnection
            public void onRegistrationSuccess(JMTMRegistrationSuccess jMTMRegistrationSuccess) {
                runIfValidRegistrationAttemptOrElse(() -> {
                    this.log.info("Successful registration at job manager {} for job {}.", getTargetAddress(), JobManagerLeaderListener.this.jobId);
                    DefaultJobLeaderService.this.jobLeaderListener.jobManagerGainedLeadership(JobManagerLeaderListener.this.jobId, getTargetGateway(), jMTMRegistrationSuccess);
                }, () -> {
                    this.log.debug("Encountered obsolete JobManager registration success from {} with leader session ID {}.", getTargetAddress(), getTargetLeaderId());
                });
            }

            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.flink.runtime.registration.RegisteredRpcConnection
            public void onRegistrationRejection(JMTMRegistrationRejection jMTMRegistrationRejection) {
                runIfValidRegistrationAttemptOrElse(() -> {
                    this.log.info("Rejected registration at job manager {} for job {}.", getTargetAddress(), JobManagerLeaderListener.this.jobId);
                    DefaultJobLeaderService.this.jobLeaderListener.jobManagerRejectedRegistration(JobManagerLeaderListener.this.jobId, getTargetAddress(), jMTMRegistrationRejection);
                }, () -> {
                    this.log.debug("Encountered obsolete JobManager registration rejection {} from {} with leader session ID {}.", new Object[]{jMTMRegistrationRejection, getTargetAddress(), getTargetLeaderId()});
                });
            }

            @Override // org.apache.flink.runtime.registration.RegisteredRpcConnection
            protected void onRegistrationFailure(Throwable th) {
                runIfValidRegistrationAttemptOrElse(() -> {
                    this.log.info("Failed to register at job  manager {} for job {}.", getTargetAddress(), JobManagerLeaderListener.this.jobId);
                    DefaultJobLeaderService.this.jobLeaderListener.handleError(th);
                }, () -> {
                    this.log.debug("Obsolete JobManager registration failure from {} with leader session ID {}.", new Object[]{getTargetAddress(), getTargetLeaderId(), th});
                });
            }

            private void runIfValidRegistrationAttemptOrElse(Runnable runnable, Runnable runnable2) {
                if (Objects.equals(getTargetLeaderId(), JobManagerLeaderListener.this.getCurrentJobMasterId())) {
                    runnable.run();
                } else {
                    runnable2.run();
                }
            }
        }

        private JobManagerLeaderListener(JobID jobID) {
            this.lock = new Object();
            this.jobId = (JobID) Preconditions.checkNotNull(jobID);
            this.stopped = false;
            this.rpcConnection = null;
            this.currentJobMasterId = null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public JobMasterId getCurrentJobMasterId() {
            JobMasterId jobMasterId;
            synchronized (this.lock) {
                jobMasterId = this.currentJobMasterId;
            }
            return jobMasterId;
        }

        public void stop() {
            synchronized (this.lock) {
                if (!this.stopped) {
                    this.stopped = true;
                    closeRpcConnection();
                }
            }
        }

        public void reconnect() {
            synchronized (this.lock) {
                if (this.stopped) {
                    DefaultJobLeaderService.LOG.debug("Cannot reconnect because the JobManagerLeaderListener has already been stopped.");
                } else if (this.rpcConnection != null) {
                    Preconditions.checkState(this.rpcConnection.tryReconnect(), "Illegal concurrent modification of the JobManagerLeaderListener rpc connection.");
                } else {
                    DefaultJobLeaderService.LOG.debug("Cannot reconnect to an unknown JobMaster.");
                }
            }
        }

        @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
        public void notifyLeaderAddress(@Nullable String str, @Nullable UUID uuid) {
            Optional empty = Optional.empty();
            synchronized (this.lock) {
                if (this.stopped) {
                    DefaultJobLeaderService.LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. However, the service is no longer running.", DefaultJobLeaderService.class.getSimpleName(), this.jobId);
                } else {
                    JobMasterId fromUuidOrNull = JobMasterId.fromUuidOrNull(uuid);
                    DefaultJobLeaderService.LOG.debug("New leader information for job {}. Address: {}, leader id: {}.", new Object[]{this.jobId, str, WebMonitorEndpoint.HIDDEN_CONTENT});
                    if (str == null || str.isEmpty()) {
                        empty = Optional.ofNullable(this.currentJobMasterId);
                        closeRpcConnection();
                    } else if (Objects.equals(fromUuidOrNull, this.currentJobMasterId)) {
                        DefaultJobLeaderService.LOG.debug("Ongoing attempt to connect to leader of job {}. Ignoring duplicate leader information.", this.jobId);
                    } else {
                        closeRpcConnection();
                        openRpcConnectionTo(str, fromUuidOrNull);
                    }
                }
            }
            empty.ifPresent(jobMasterId -> {
                DefaultJobLeaderService.this.jobLeaderListener.jobManagerLostLeadership(this.jobId, jobMasterId);
            });
        }

        @GuardedBy("lock")
        private void openRpcConnectionTo(String str, JobMasterId jobMasterId) {
            Preconditions.checkState(this.currentJobMasterId == null && this.rpcConnection == null, "Cannot open a new rpc connection if the previous connection has not been closed.");
            this.currentJobMasterId = jobMasterId;
            this.rpcConnection = new JobManagerRegisteredRpcConnection(DefaultJobLeaderService.LOG, str, jobMasterId, DefaultJobLeaderService.this.rpcService.getScheduledExecutor());
            DefaultJobLeaderService.LOG.info("Try to register at job manager {} with leader id {}.", str, jobMasterId.toUUID());
            this.rpcConnection.start();
        }

        @GuardedBy("lock")
        private void closeRpcConnection() {
            if (this.rpcConnection != null) {
                this.rpcConnection.close();
                this.rpcConnection = null;
                this.currentJobMasterId = null;
            }
        }

        @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
        public void handleError(Exception exc) {
            if (this.stopped) {
                DefaultJobLeaderService.LOG.debug("{}'s leader retrieval listener reported an exception for job {}. However, the service is no longer running.", new Object[]{DefaultJobLeaderService.class.getSimpleName(), this.jobId, exc});
            } else {
                DefaultJobLeaderService.this.jobLeaderListener.handleError(exc);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/DefaultJobLeaderService$JobManagerRetryingRegistration.class */
    private static final class JobManagerRetryingRegistration extends RetryingRegistration<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess, JMTMRegistrationRejection> {
        private final JobID jobId;
        private final TaskManagerRegistrationInformation taskManagerRegistrationInformation;

        /* JADX WARN: Multi-variable type inference failed */
        JobManagerRetryingRegistration(Logger logger, RpcService rpcService, String str, Class<JobMasterGateway> cls, String str2, JobMasterId jobMasterId, RetryingRegistrationConfiguration retryingRegistrationConfiguration, JobID jobID, TaskManagerRegistrationInformation taskManagerRegistrationInformation) {
            super(logger, rpcService, str, cls, str2, jobMasterId, retryingRegistrationConfiguration);
            this.jobId = (JobID) Preconditions.checkNotNull(jobID);
            this.taskManagerRegistrationInformation = taskManagerRegistrationInformation;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.flink.runtime.registration.RetryingRegistration
        public CompletableFuture<RegistrationResponse> invokeRegistration(JobMasterGateway jobMasterGateway, JobMasterId jobMasterId, long j) {
            return jobMasterGateway.registerTaskManager(this.jobId, this.taskManagerRegistrationInformation, Time.milliseconds(j));
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/DefaultJobLeaderService$State.class */
    private enum State {
        CREATED,
        STARTED,
        STOPPED
    }

    public DefaultJobLeaderService(UnresolvedTaskManagerLocation unresolvedTaskManagerLocation, RetryingRegistrationConfiguration retryingRegistrationConfiguration) {
        this.ownLocation = (UnresolvedTaskManagerLocation) Preconditions.checkNotNull(unresolvedTaskManagerLocation);
        this.retryingRegistrationConfiguration = (RetryingRegistrationConfiguration) Preconditions.checkNotNull(retryingRegistrationConfiguration);
    }

    @Override // org.apache.flink.runtime.taskexecutor.JobLeaderService
    public void start(String str, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, JobLeaderListener jobLeaderListener) {
        if (State.CREATED != this.state) {
            throw new IllegalStateException("The service has already been started.");
        }
        LOG.info("Start job leader service.");
        this.ownerAddress = (String) Preconditions.checkNotNull(str);
        this.rpcService = (RpcService) Preconditions.checkNotNull(rpcService);
        this.highAvailabilityServices = (HighAvailabilityServices) Preconditions.checkNotNull(highAvailabilityServices);
        this.jobLeaderListener = (JobLeaderListener) Preconditions.checkNotNull(jobLeaderListener);
        this.state = State.STARTED;
    }

    @Override // org.apache.flink.runtime.taskexecutor.JobLeaderService
    public void stop() throws Exception {
        LOG.info("Stop job leader service.");
        if (State.STARTED == this.state) {
            for (Tuple2<LeaderRetrievalService, JobManagerLeaderListener> tuple2 : this.jobLeaderServices.values()) {
                LeaderRetrievalService leaderRetrievalService = (LeaderRetrievalService) tuple2.f0;
                ((JobManagerLeaderListener) tuple2.f1).stop();
                leaderRetrievalService.stop();
            }
            this.jobLeaderServices.clear();
        }
        this.state = State.STOPPED;
    }

    @Override // org.apache.flink.runtime.taskexecutor.JobLeaderService
    public void removeJob(JobID jobID) {
        Preconditions.checkState(State.STARTED == this.state, "The service is currently not running.");
        Tuple2<LeaderRetrievalService, JobManagerLeaderListener> remove = this.jobLeaderServices.remove(jobID);
        if (remove != null) {
            LOG.info("Remove job {} from job leader monitoring.", jobID);
            LeaderRetrievalService leaderRetrievalService = (LeaderRetrievalService) remove.f0;
            ((JobManagerLeaderListener) remove.f1).stop();
            try {
                leaderRetrievalService.stop();
            } catch (Exception e) {
                LOG.info("Could not properly stop the LeaderRetrievalService for job {}.", jobID, e);
            }
        }
    }

    @Override // org.apache.flink.runtime.taskexecutor.JobLeaderService
    public void addJob(JobID jobID, String str) throws Exception {
        Preconditions.checkState(State.STARTED == this.state, "The service is currently not running.");
        LOG.info("Add job {} for job leader monitoring.", jobID);
        LeaderRetrievalService jobManagerLeaderRetriever = this.highAvailabilityServices.getJobManagerLeaderRetriever(jobID, str);
        JobManagerLeaderListener jobManagerLeaderListener = new JobManagerLeaderListener(jobID);
        Tuple2<LeaderRetrievalService, JobManagerLeaderListener> put = this.jobLeaderServices.put(jobID, Tuple2.of(jobManagerLeaderRetriever, jobManagerLeaderListener));
        if (put != null) {
            ((LeaderRetrievalService) put.f0).stop();
            ((JobManagerLeaderListener) put.f1).stop();
        }
        jobManagerLeaderRetriever.start(jobManagerLeaderListener);
    }

    @Override // org.apache.flink.runtime.taskexecutor.JobLeaderService
    public void reconnect(JobID jobID) {
        Preconditions.checkNotNull(jobID, "JobID must not be null.");
        Tuple2<LeaderRetrievalService, JobManagerLeaderListener> tuple2 = this.jobLeaderServices.get(jobID);
        if (tuple2 != null) {
            ((JobManagerLeaderListener) tuple2.f1).reconnect();
        } else {
            LOG.info("Cannot reconnect to job {} because it is not registered.", jobID);
        }
    }

    @Override // org.apache.flink.runtime.taskexecutor.JobLeaderService
    @VisibleForTesting
    public boolean containsJob(JobID jobID) {
        Preconditions.checkState(State.STARTED == this.state, "The service is currently not running.");
        return this.jobLeaderServices.containsKey(jobID);
    }
}
