package org.apache.flink.tests.util.flink;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.tests.util.parameters.ParameterProperty;
import org.apache.flink.tests.util.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.images.builder.ImageFromDockerfile;
import org.testcontainers.lifecycle.TestDescription;
import org.testcontainers.lifecycle.TestLifecycleAware;
import org.testcontainers.utility.MountableFile;

/* loaded from: input_file:org/apache/flink/tests/util/flink/FlinkContainer.class */
public class FlinkContainer extends GenericContainer<FlinkContainer> implements TestLifecycleAware {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkContainer.class);
    private static final ObjectMapper objectMapper = new ObjectMapper();
    private static final String FLINK_BIN = "flink/bin";
    private final TemporaryFolder temporaryFolder;
    private final Path logBackupDir;

    /* loaded from: input_file:org/apache/flink/tests/util/flink/FlinkContainer$FlinkContainerBuilder.class */
    public static class FlinkContainerBuilder {
        private static final ParameterProperty<Path> DISTRIBUTION_LOG_BACKUP_DIRECTORY = new ParameterProperty<>("logBackupDir", str -> {
            return Paths.get(str, new String[0]);
        });
        private String javaVersion;
        private int numTaskManagers = 1;
        private final TemporaryFolder temporaryFolder = new TemporaryFolder();

        public FlinkContainerBuilder numTaskManagers(int i) {
            this.numTaskManagers = i;
            return this;
        }

        public FlinkContainerBuilder javaVersion(String str) {
            this.javaVersion = str;
            return this;
        }

        public FlinkContainer build() throws IOException {
            try {
                Path findFlinkDist = FileUtils.findFlinkDist();
                String path = findFlinkDist.getFileName().toString();
                this.temporaryFolder.create();
                Path resolve = this.temporaryFolder.newFolder().toPath().resolve("workers");
                Files.write(resolve, (Iterable<? extends CharSequence>) IntStream.range(0, this.numTaskManagers).mapToObj(i -> {
                    return "localhost";
                }).collect(Collectors.toList()), new OpenOption[0]);
                ImageFromDockerfile withFileFromPath = new ImageFromDockerfile("flink-dist", true).withDockerfileFromBuilder(dockerfileBuilder -> {
                    dockerfileBuilder.from("openjdk:" + getJavaVersionSuffix()).copy(path, "flink").copy(path + "/conf/workers", "workers").cmd("flink/bin/start-cluster.sh && tail -f /dev/null").build();
                }).withFileFromPath("workers", resolve).withFileFromPath(path, findFlinkDist);
                Optional<Path> optional = DISTRIBUTION_LOG_BACKUP_DIRECTORY.get();
                if (!optional.isPresent()) {
                    FlinkContainer.LOG.warn("Property {} not set, logs will not be backed up in case of test failures.", DISTRIBUTION_LOG_BACKUP_DIRECTORY.getPropertyName());
                }
                FlinkContainer flinkContainer = new FlinkContainer(withFileFromPath, this.numTaskManagers, optional.orElse(null));
                this.temporaryFolder.delete();
                return flinkContainer;
            } catch (Throwable th) {
                this.temporaryFolder.delete();
                throw th;
            }
        }

        private String getJavaVersionSuffix() {
            if (this.javaVersion != null) {
                return this.javaVersion;
            }
            String property = System.getProperty("java.vm.specification.version");
            boolean z = -1;
            switch (property.hashCode()) {
                case 1568:
                    if (property.equals("11")) {
                        z = true;
                        break;
                    }
                    break;
                case 48571:
                    if (property.equals("1.8")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return "8";
                case true:
                    return "11";
                default:
                    throw new IllegalStateException("Unexpected value: " + property);
            }
        }
    }

    private FlinkContainer(ImageFromDockerfile imageFromDockerfile, int i, @Nullable Path path) {
        super(imageFromDockerfile);
        this.temporaryFolder = new TemporaryFolder();
        this.logBackupDir = path;
        withExposedPorts(new Integer[]{8081});
        waitingFor(new HttpWaitStrategy().forPort(8081).forPath("/taskmanagers").forResponsePredicate(str -> {
            try {
                return i == ((TaskManagersInfo) objectMapper.readValue(str, TaskManagersInfo.class)).getTaskManagerInfos().size();
            } catch (JsonProcessingException e) {
                return false;
            }
        }));
    }

    public void beforeTest(TestDescription testDescription) {
        try {
            this.temporaryFolder.create();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void afterTest(TestDescription testDescription, Optional<Throwable> optional) {
        if (optional.isPresent() && this.logBackupDir != null) {
            try {
                Path resolve = this.logBackupDir.resolve("flink-" + UUID.randomUUID().toString());
                copyFileOrDirectoryFromContainer("flink/log/", resolve);
                LOG.info("Backed up logs to {}.", resolve);
            } catch (IOException e) {
                LOG.error("Could not backup the flink logs.", e);
            }
        }
        this.temporaryFolder.delete();
    }

    public void copyFileOrDirectoryFromContainer(String str, Path path) throws IOException {
        InputStream exec = DockerClientFactory.instance().client().copyArchiveFromContainerCmd(getContainerId(), str).exec();
        Throwable th = null;
        try {
            TarArchiveInputStream tarArchiveInputStream = new TarArchiveInputStream(exec);
            Throwable th2 = null;
            try {
                try {
                    unTar(tarArchiveInputStream, path.toFile());
                    if (tarArchiveInputStream != null) {
                        if (0 != 0) {
                            try {
                                tarArchiveInputStream.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            tarArchiveInputStream.close();
                        }
                    }
                    if (exec != null) {
                        if (0 == 0) {
                            exec.close();
                            return;
                        }
                        try {
                            exec.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (tarArchiveInputStream != null) {
                    if (th2 != null) {
                        try {
                            tarArchiveInputStream.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        tarArchiveInputStream.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (exec != null) {
                if (0 != 0) {
                    try {
                        exec.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    exec.close();
                }
            }
            throw th8;
        }
    }

    private static void unTar(TarArchiveInputStream tarArchiveInputStream, File file) throws IOException {
        while (true) {
            TarArchiveEntry nextTarEntry = tarArchiveInputStream.getNextTarEntry();
            if (nextTarEntry == null) {
                return;
            }
            FileOutputStream fileOutputStream = null;
            try {
                try {
                    if (!nextTarEntry.isDirectory()) {
                        File file2 = new File(file, nextTarEntry.getName());
                        File parentFile = file2.getParentFile();
                        if (!parentFile.exists()) {
                            parentFile.mkdirs();
                        }
                        FileOutputStream fileOutputStream2 = new FileOutputStream(file2);
                        IOUtils.copyBytes(tarArchiveInputStream, fileOutputStream2, false);
                        if (fileOutputStream2 != null) {
                            try {
                                fileOutputStream2.flush();
                                fileOutputStream2.getFD().sync();
                                fileOutputStream2.close();
                            } catch (IOException e) {
                                LOG.warn("Exception closing {}", fileOutputStream2, e);
                            }
                        }
                    } else if (0 != 0) {
                        try {
                            fileOutputStream.flush();
                            fileOutputStream.getFD().sync();
                            fileOutputStream.close();
                        } catch (IOException e2) {
                            LOG.warn("Exception closing {}", (Object) null, e2);
                        }
                    }
                } catch (Exception e3) {
                    LOG.warn("Exception extracting {} to {}", new Object[]{tarArchiveInputStream, file, e3});
                    if (0 != 0) {
                        try {
                            fileOutputStream.flush();
                            fileOutputStream.getFD().sync();
                            fileOutputStream.close();
                        } catch (IOException e4) {
                            LOG.warn("Exception closing {}", (Object) null, e4);
                        }
                    }
                }
            } catch (Throwable th) {
                if (0 != 0) {
                    try {
                        fileOutputStream.flush();
                        fileOutputStream.getFD().sync();
                        fileOutputStream.close();
                    } catch (IOException e5) {
                        LOG.warn("Exception closing {}", (Object) null, e5);
                        throw th;
                    }
                }
                throw th;
            }
        }
    }

    public void submitSQLJob(SQLJobSubmission sQLJobSubmission) throws IOException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        Path path = this.temporaryFolder.newFile().toPath();
        Files.write(path, sQLJobSubmission.getSqlLines(), new OpenOption[0]);
        copyFileToContainer(MountableFile.forHostPath(path), "/tmp/script.sql");
        arrayList.add("cat /tmp/script.sql | ");
        arrayList.add("flink/bin/sql-client.sh");
        arrayList.add("embedded");
        sQLJobSubmission.getDefaultEnvFile().ifPresent(str -> {
            arrayList.add("--defaults");
            arrayList.add(copyAndGetContainerPath(str));
        });
        sQLJobSubmission.getSessionEnvFile().ifPresent(str2 -> {
            arrayList.add("--environment");
            arrayList.add(copyAndGetContainerPath(str2));
        });
        for (String str3 : sQLJobSubmission.getJars()) {
            arrayList.add("--jar");
            arrayList.add(copyAndGetContainerPath(str3));
        }
        Container.ExecResult execInContainer = execInContainer(new String[]{"bash", "-c", String.join(" ", arrayList)});
        LOG.info(execInContainer.getStdout());
        LOG.error(execInContainer.getStderr());
        if (execInContainer.getExitCode() != 0) {
            throw new AssertionError("Failed when submitting the SQL job.");
        }
    }

    @Nonnull
    private String copyAndGetContainerPath(String str) {
        Path path = Paths.get(str, new String[0]);
        String str2 = "/tmp/" + path.getFileName();
        copyFileToContainer(MountableFile.forHostPath(path), str2);
        return str2;
    }

    public static FlinkContainerBuilder builder() {
        return new FlinkContainerBuilder();
    }
}
