package org.apache.flink.client.deployment.executors;

import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionUtils;

@Internal
/* loaded from: input_file:org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.class */
public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements PipelineExecutor {
    private final ClientFactory clusterClientFactory;

    public AbstractSessionClusterExecutor(@Nonnull ClientFactory clientfactory) {
        this.clusterClientFactory = (ClientFactory) Preconditions.checkNotNull(clientfactory);
    }

    @Override // org.apache.flink.core.execution.PipelineExecutor
    public CompletableFuture<JobClient> execute(@Nonnull Pipeline pipeline, @Nonnull Configuration configuration, @Nonnull ClassLoader classLoader) throws Exception {
        JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
        ClusterDescriptor createClusterDescriptor2 = this.clusterClientFactory.createClusterDescriptor2(configuration);
        Throwable th = null;
        try {
            try {
                Object clusterId = this.clusterClientFactory.getClusterId(configuration);
                Preconditions.checkState(clusterId != null);
                ClusterClientProvider retrieve = createClusterDescriptor2.retrieve(clusterId);
                ClusterClient clusterClient = retrieve.getClusterClient();
                CompletableFuture<JobClient> whenCompleteAsync = clusterClient.submitJob(jobGraph).thenApplyAsync(FunctionUtils.uncheckedFunction(jobID -> {
                    ClientUtils.waitUntilJobInitializationFinished(() -> {
                        return clusterClient.getJobStatus(jobID).get();
                    }, () -> {
                        return clusterClient.requestJobResult(jobID).get();
                    }, classLoader);
                    return jobID;
                })).thenApplyAsync((Function<? super U, ? extends U>) jobID2 -> {
                    return new ClusterClientJobClientAdapter(retrieve, jobID2, classLoader);
                }).whenCompleteAsync((jobClient, th2) -> {
                    clusterClient.close();
                });
                if (createClusterDescriptor2 != null) {
                    if (0 != 0) {
                        try {
                            createClusterDescriptor2.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        createClusterDescriptor2.close();
                    }
                }
                return whenCompleteAsync;
            } finally {
            }
        } catch (Throwable th4) {
            if (createClusterDescriptor2 != null) {
                if (th != null) {
                    try {
                        createClusterDescriptor2.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createClusterDescriptor2.close();
                }
            }
            throw th4;
        }
    }
}
