package org.apache.flink.tests.util.flink;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.queryablestate.FutureUtils;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.util.ConfigurationException;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

/* loaded from: input_file:org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.class */
public class LocalStandaloneFlinkResource implements FlinkResource {
    private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneFlinkResource.class);
    private final TemporaryFolder temporaryFolder = new TemporaryFolder();
    private final Path distributionDirectory;

    @Nullable
    private final Path logBackupDirectory;
    private final FlinkResourceSetup setup;
    private FlinkDistribution distribution;

    /* loaded from: input_file:org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource$StandaloneClusterController.class */
    private static class StandaloneClusterController implements ClusterController {
        private final FlinkDistribution distribution;

        StandaloneClusterController(FlinkDistribution flinkDistribution) {
            this.distribution = flinkDistribution;
        }

        @Override // org.apache.flink.tests.util.flink.ClusterController
        public JobController submitJob(JobSubmission jobSubmission) throws IOException {
            return new StandaloneJobController(this.distribution.submitJob(jobSubmission));
        }

        @Override // org.apache.flink.tests.util.flink.ClusterController
        public void submitSQLJob(SQLJobSubmission sQLJobSubmission) throws IOException {
            this.distribution.submitSQLJob(sQLJobSubmission);
        }

        public CompletableFuture<Void> closeAsync() {
            try {
                this.distribution.stopFlinkCluster();
                return CompletableFuture.completedFuture(null);
            } catch (IOException e) {
                return FutureUtils.getFailedFuture(e);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource$StandaloneJobController.class */
    private static class StandaloneJobController implements JobController {
        private final JobID jobId;

        StandaloneJobController(JobID jobID) {
            this.jobId = jobID;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LocalStandaloneFlinkResource(Path path, @Nullable Path path2, FlinkResourceSetup flinkResourceSetup) {
        LOG.info("Using distribution {}.", path);
        this.distributionDirectory = path;
        this.logBackupDirectory = path2;
        this.setup = flinkResourceSetup;
    }

    public void before() throws Exception {
        this.temporaryFolder.create();
        Path path = this.temporaryFolder.newFolder().toPath();
        LOG.info("Copying distribution to {}.", path);
        TestUtils.copyDirectory(this.distributionDirectory, path);
        this.distribution = new FlinkDistribution(path);
        this.distribution.setRootLogLevel(Level.DEBUG);
        Iterator<JarOperation> it = this.setup.getJarOperations().iterator();
        while (it.hasNext()) {
            this.distribution.performJarOperation(it.next());
        }
        if (this.setup.getConfig().isPresent()) {
            this.distribution.appendConfiguration(this.setup.getConfig().get());
        }
    }

    public void afterTestSuccess() {
        shutdownCluster();
        this.temporaryFolder.delete();
    }

    public void afterTestFailure() {
        if (this.distribution != null) {
            shutdownCluster();
            backupLogs();
        }
        this.temporaryFolder.delete();
    }

    private void shutdownCluster() {
        try {
            this.distribution.stopFlinkCluster();
        } catch (IOException e) {
            LOG.warn("Error while shutting down Flink cluster.", e);
        }
    }

    private void backupLogs() {
        if (this.logBackupDirectory != null) {
            Path resolve = this.logBackupDirectory.resolve("flink-" + UUID.randomUUID().toString());
            try {
                this.distribution.copyLogsTo(resolve);
                LOG.info("Backed up logs to {}.", resolve);
            } catch (IOException e) {
                LOG.warn("An error has occurred while backing up logs to {}.", resolve, e);
            }
        }
    }

    @Override // org.apache.flink.tests.util.flink.FlinkResource
    public ClusterController startCluster(int i) throws IOException {
        int size;
        this.distribution.setTaskExecutorHosts(Collections.nCopies(i, "localhost"));
        this.distribution.startFlinkCluster();
        try {
            RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), Executors.directExecutor());
            Throwable th = null;
            for (int i2 = 0; i2 < 30; i2++) {
                try {
                    try {
                        try {
                            try {
                                size = ((TaskManagersInfo) restClient.sendRequest("localhost", 8081, TaskManagersHeaders.getInstance(), EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance()).get(1L, TimeUnit.SECONDS)).getTaskManagerInfos().size();
                            } catch (InterruptedException e) {
                                LOG.info("Waiting for dispatcher REST endpoint to come up...");
                                Thread.currentThread().interrupt();
                            }
                        } catch (ExecutionException | TimeoutException e2) {
                            LOG.info("Waiting for dispatcher REST endpoint to come up...");
                        }
                        if (size == i) {
                            StandaloneClusterController standaloneClusterController = new StandaloneClusterController(this.distribution);
                            if (restClient != null) {
                                if (0 != 0) {
                                    try {
                                        restClient.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    restClient.close();
                                }
                            }
                            return standaloneClusterController;
                        }
                        LOG.info("Waiting for task managers to come up. {}/{} are currently running.", Integer.valueOf(size), Integer.valueOf(i));
                        try {
                            Thread.sleep(1000L);
                        } catch (InterruptedException e3) {
                            Thread.currentThread().interrupt();
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (restClient != null) {
                        if (th != null) {
                            try {
                                restClient.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            restClient.close();
                        }
                    }
                    throw th3;
                }
            }
            if (restClient != null) {
                if (0 != 0) {
                    try {
                        restClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    restClient.close();
                }
            }
            throw new RuntimeException("Cluster did not start in expected time-frame.");
        } catch (Exception e4) {
            throw new RuntimeException(e4);
        } catch (ConfigurationException e5) {
            throw new RuntimeException("Could not create RestClient.", e5);
        }
    }

    @Override // org.apache.flink.tests.util.flink.FlinkResource
    public Stream<String> searchAllLogs(Pattern pattern, Function<Matcher, String> function) throws IOException {
        return this.distribution.searchAllLogs(pattern, function);
    }
}
