package org.apache.flink.runtime.dispatcher.runner;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionUtils;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.class */
public class SessionDispatcherLeaderProcess extends AbstractDispatcherLeaderProcess implements JobGraphStore.JobGraphListener {
    private final AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory dispatcherGatewayServiceFactory;
    private final JobGraphStore jobGraphStore;
    private final Executor ioExecutor;
    private CompletableFuture<Void> onGoingRecoveryOperation;

    private SessionDispatcherLeaderProcess(UUID uuid, AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory dispatcherGatewayServiceFactory, JobGraphStore jobGraphStore, Executor executor, FatalErrorHandler fatalErrorHandler) {
        super(uuid, fatalErrorHandler);
        this.onGoingRecoveryOperation = FutureUtils.completedVoidFuture();
        this.dispatcherGatewayServiceFactory = dispatcherGatewayServiceFactory;
        this.jobGraphStore = jobGraphStore;
        this.ioExecutor = executor;
    }

    @Override // org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess
    protected void onStart() {
        startServices();
        this.onGoingRecoveryOperation = recoverJobsAsync().thenAccept(this::createDispatcherIfRunning).handle((v1, v2) -> {
            return onErrorIfRunning(v1, v2);
        });
    }

    private void startServices() {
        try {
            this.jobGraphStore.start(this);
        } catch (Exception e) {
            throw new FlinkRuntimeException(String.format("Could not start %s when trying to start the %s.", this.jobGraphStore.getClass().getSimpleName(), getClass().getSimpleName()), e);
        }
    }

    private void createDispatcherIfRunning(Collection<JobGraph> collection) {
        runIfStateIs(AbstractDispatcherLeaderProcess.State.RUNNING, () -> {
            createDispatcher(collection);
        });
    }

    private void createDispatcher(Collection<JobGraph> collection) {
        completeDispatcherSetup(this.dispatcherGatewayServiceFactory.create(DispatcherId.fromUuid(getLeaderSessionId()), collection, this.jobGraphStore));
    }

    private CompletableFuture<Collection<JobGraph>> recoverJobsAsync() {
        return CompletableFuture.supplyAsync(this::recoverJobsIfRunning, this.ioExecutor);
    }

    private Collection<JobGraph> recoverJobsIfRunning() {
        return (Collection) supplyUnsynchronizedIfRunning(this::recoverJobs).orElse(Collections.emptyList());
    }

    private Collection<JobGraph> recoverJobs() {
        this.log.info("Recover all persisted job graphs.");
        Collection<JobID> jobIds = getJobIds();
        ArrayList arrayList = new ArrayList();
        Iterator<JobID> it = jobIds.iterator();
        while (it.hasNext()) {
            arrayList.add(recoverJob(it.next()));
        }
        this.log.info("Successfully recovered {} persisted job graphs.", Integer.valueOf(arrayList.size()));
        return arrayList;
    }

    private Collection<JobID> getJobIds() {
        try {
            return this.jobGraphStore.getJobIds();
        } catch (Exception e) {
            throw new FlinkRuntimeException("Could not retrieve job ids of persisted jobs.", e);
        }
    }

    private JobGraph recoverJob(JobID jobID) {
        this.log.info("Trying to recover job with job id {}.", jobID);
        try {
            return this.jobGraphStore.recoverJobGraph(jobID);
        } catch (Exception e) {
            throw new FlinkRuntimeException(String.format("Could not recover job with job id %s.", jobID), e);
        }
    }

    @Override // org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess
    protected CompletableFuture<Void> onClose() {
        return CompletableFuture.runAsync(this::stopServices, this.ioExecutor);
    }

    private void stopServices() {
        try {
            this.jobGraphStore.stop();
        } catch (Exception e) {
            ExceptionUtils.rethrow(e);
        }
    }

    @Override // org.apache.flink.runtime.jobmanager.JobGraphStore.JobGraphListener
    public void onAddedJobGraph(JobID jobID) {
        runIfStateIs(AbstractDispatcherLeaderProcess.State.RUNNING, () -> {
            handleAddedJobGraph(jobID);
        });
    }

    private void handleAddedJobGraph(JobID jobID) {
        this.log.debug("Job {} has been added to the {} by another process.", jobID, this.jobGraphStore.getClass().getSimpleName());
        this.onGoingRecoveryOperation = this.onGoingRecoveryOperation.thenApplyAsync(r5 -> {
            return recoverJobIfRunning(jobID);
        }, this.ioExecutor).thenCompose((Function<? super U, ? extends CompletionStage<U>>) optional -> {
            return (CompletableFuture) optional.flatMap(this::submitAddedJobIfRunning).orElse(FutureUtils.completedVoidFuture());
        }).handle((v1, v2) -> {
            return onErrorIfRunning(v1, v2);
        });
    }

    private Optional<CompletableFuture<Void>> submitAddedJobIfRunning(JobGraph jobGraph) {
        return supplyIfRunning(() -> {
            return submitAddedJob(jobGraph);
        });
    }

    private CompletableFuture<Void> submitAddedJob(JobGraph jobGraph) {
        return getDispatcherGatewayInternal().submitJob(jobGraph, RpcUtils.INF_TIMEOUT).thenApply(FunctionUtils.nullFn()).exceptionally((Function<Throwable, ? extends U>) this::filterOutDuplicateJobSubmissionException);
    }

    private Void filterOutDuplicateJobSubmissionException(Throwable th) {
        Throwable stripCompletionException = ExceptionUtils.stripCompletionException(th);
        if (!(stripCompletionException instanceof DuplicateJobSubmissionException)) {
            throw new CompletionException(th);
        }
        DuplicateJobSubmissionException duplicateJobSubmissionException = (DuplicateJobSubmissionException) stripCompletionException;
        this.log.debug("Ignore recovered job {} because the job is currently being executed.", duplicateJobSubmissionException.getJobID(), duplicateJobSubmissionException);
        return null;
    }

    private DispatcherGateway getDispatcherGatewayInternal() {
        return (DispatcherGateway) Preconditions.checkNotNull(getDispatcherGateway().getNow(null));
    }

    private Optional<JobGraph> recoverJobIfRunning(JobID jobID) {
        return supplyUnsynchronizedIfRunning(() -> {
            return recoverJob(jobID);
        });
    }

    @Override // org.apache.flink.runtime.jobmanager.JobGraphStore.JobGraphListener
    public void onRemovedJobGraph(JobID jobID) {
        runIfStateIs(AbstractDispatcherLeaderProcess.State.RUNNING, () -> {
            handleRemovedJobGraph(jobID);
        });
    }

    private void handleRemovedJobGraph(JobID jobID) {
        this.log.debug("Job {} has been removed from the {} by another process.", jobID, this.jobGraphStore.getClass().getSimpleName());
        this.onGoingRecoveryOperation = this.onGoingRecoveryOperation.thenCompose(r5 -> {
            return removeJobGraphIfRunning(jobID).orElse(FutureUtils.completedVoidFuture());
        }).handle((BiFunction<? super U, Throwable, ? extends U>) (v1, v2) -> {
            return onErrorIfRunning(v1, v2);
        });
    }

    private Optional<CompletableFuture<Void>> removeJobGraphIfRunning(JobID jobID) {
        return supplyIfRunning(() -> {
            return removeJobGraph(jobID);
        });
    }

    private CompletableFuture<Void> removeJobGraph(JobID jobID) {
        return (CompletableFuture) getDispatcherService().map(dispatcherGatewayService -> {
            return dispatcherGatewayService.onRemovedJobGraph(jobID);
        }).orElseGet(FutureUtils::completedVoidFuture);
    }

    public static SessionDispatcherLeaderProcess create(UUID uuid, AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory dispatcherGatewayServiceFactory, JobGraphStore jobGraphStore, Executor executor, FatalErrorHandler fatalErrorHandler) {
        return new SessionDispatcherLeaderProcess(uuid, dispatcherGatewayServiceFactory, jobGraphStore, executor, fatalErrorHandler);
    }

    @Override // org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess, org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcess
    public /* bridge */ /* synthetic */ CompletableFuture getShutDownFuture() {
        return super.getShutDownFuture();
    }
}
