package org.apache.flink.tests.util.kafka;

import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.tests.util.AutoClosableProcess;
import org.apache.flink.tests.util.CommandLineWrapper;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.tests.util.activation.OperatingSystemRestriction;
import org.apache.flink.tests.util.cache.DownloadCache;
import org.apache.flink.tests.util.util.FileUtils;
import org.apache.flink.util.OperatingSystem;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.class */
public class LocalStandaloneKafkaResource implements KafkaResource {
    private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneKafkaResource.class);
    private static final Pattern ZK_DATA_DIR_PATTERN = Pattern.compile(".*(dataDir=).*");
    private static final Pattern KAFKA_LOG_DIR_PATTERN = Pattern.compile(".*(log\\.dirs=).*");
    private static final String ZOOKEEPER_HOST = "localhost";
    private static final int ZOOKEEPER_PORT = 2181;
    private static final String ZOOKEEPER_ADDRESS = "localhost:2181";
    private static final String KAFKA_HOST = "localhost";
    private static final int KAFKA_PORT = 9092;
    private static final String KAFKA_ADDRESS = "localhost:9092";
    private final TemporaryFolder tmp = new TemporaryFolder();
    private final DownloadCache downloadCache = DownloadCache.get();
    private final String kafkaVersion;
    private Path kafkaDir;

    @Nullable
    private Path logBackupDirectory;

    /* JADX INFO: Access modifiers changed from: package-private */
    public LocalStandaloneKafkaResource(String str, @Nullable Path path) {
        OperatingSystemRestriction.forbid(String.format("The %s relies on UNIX utils and shell scripts.", getClass().getSimpleName()), new OperatingSystem[]{OperatingSystem.WINDOWS});
        this.kafkaVersion = str;
        this.logBackupDirectory = path;
    }

    private static String getKafkaDownloadUrl(String str) {
        return String.format("https://archive.apache.org/dist/kafka/%s/kafka_2.11-%s.tgz", str, str);
    }

    public void before() throws Exception {
        this.tmp.create();
        this.downloadCache.before();
        this.kafkaDir = this.tmp.newFolder("kafka").toPath().toAbsolutePath();
        setupKafkaDist();
        setupKafkaCluster();
    }

    private void setupKafkaDist() throws IOException {
        Path orDownload = this.downloadCache.getOrDownload(getKafkaDownloadUrl(this.kafkaVersion), this.tmp.newFolder("getOrDownload").toPath());
        LOG.info("Kafka location: {}", this.kafkaDir.toAbsolutePath());
        AutoClosableProcess.runBlocking(CommandLineWrapper.tar(orDownload).extract().zipped().strip(1).targetDir(this.kafkaDir).build());
        LOG.info("Updating ZooKeeper properties");
        FileUtils.replace(this.kafkaDir.resolve(Paths.get("config", "zookeeper.properties")), ZK_DATA_DIR_PATTERN, matcher -> {
            return matcher.replaceAll("$1" + this.kafkaDir.resolve("zookeeper").toAbsolutePath());
        });
        LOG.info("Updating Kafka properties");
        FileUtils.replace(this.kafkaDir.resolve(Paths.get("config", "server.properties")), KAFKA_LOG_DIR_PATTERN, matcher2 -> {
            return matcher2.replaceAll("$1" + this.kafkaDir.resolve("kafka").toAbsolutePath());
        });
    }

    private void setupKafkaCluster() throws IOException {
        LOG.info("Starting zookeeper");
        AutoClosableProcess.runBlocking(new String[]{this.kafkaDir.resolve(Paths.get("bin", "zookeeper-server-start.sh")).toString(), "-daemon", this.kafkaDir.resolve(Paths.get("config", "zookeeper.properties")).toString()});
        LOG.info("Starting kafka");
        AutoClosableProcess.runBlocking(new String[]{this.kafkaDir.resolve(Paths.get("bin", "kafka-server-start.sh")).toString(), "-daemon", this.kafkaDir.resolve(Paths.get("config", "server.properties")).toString()});
        while (!isZookeeperRunning(this.kafkaDir)) {
            try {
                LOG.info("Waiting for ZooKeeper to start.");
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        while (!isKafkaRunning(this.kafkaDir)) {
            try {
                LOG.info("Waiting for Kafka to start.");
                Thread.sleep(500L);
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    public void afterTestSuccess() {
        shutdownResource();
        this.downloadCache.afterTestSuccess();
        this.tmp.delete();
    }

    public void afterTestFailure() {
        shutdownResource();
        backupLogs();
        this.downloadCache.afterTestFailure();
        this.tmp.delete();
    }

    private void shutdownResource() {
        try {
            AutoClosableProcess.runBlocking(new String[]{this.kafkaDir.resolve(Paths.get("bin", "kafka-server-stop.sh")).toString()});
            while (isKafkaRunning(this.kafkaDir)) {
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        } catch (IOException e2) {
            LOG.warn("Error while shutting down kafka.", e2);
        }
        try {
            AutoClosableProcess.runBlocking(new String[]{this.kafkaDir.resolve(Paths.get("bin", "zookeeper-server-stop.sh")).toString()});
            while (isZookeeperRunning(this.kafkaDir)) {
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e3) {
                    Thread.currentThread().interrupt();
                }
            }
        } catch (IOException e4) {
            LOG.warn("Error while shutting down zookeeper.", e4);
        }
    }

    private void backupLogs() {
        if (this.logBackupDirectory != null) {
            Path resolve = this.logBackupDirectory.resolve("kafka-" + UUID.randomUUID().toString());
            try {
                Files.createDirectories(resolve, new FileAttribute[0]);
                TestUtils.copyDirectory(this.kafkaDir.resolve("logs"), resolve);
                LOG.info("Backed up logs to {}.", resolve);
            } catch (IOException e) {
                LOG.warn("An error has occurred while backing up logs to {}.", resolve, e);
            }
        }
    }

    private static boolean isZookeeperRunning(Path path) {
        try {
            queryBrokerStatus(path, str -> {
            });
            return true;
        } catch (IOException e) {
            return false;
        }
    }

    private static boolean isKafkaRunning(Path path) throws IOException {
        try {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            queryBrokerStatus(path, str -> {
                atomicBoolean.compareAndSet(false, str.contains("\"port\":"));
            });
            return atomicBoolean.get();
        } catch (IOException e) {
            return false;
        }
    }

    private static void queryBrokerStatus(Path path, Consumer<String> consumer) throws IOException {
        AutoClosableProcess.create(new String[]{path.resolve(Paths.get("bin", "zookeeper-shell.sh")).toString(), ZOOKEEPER_ADDRESS, "get", "/brokers/ids/0"}).setStdoutProcessor(consumer).runBlocking();
    }

    @Override // org.apache.flink.tests.util.kafka.KafkaResource
    public void createTopic(int i, int i2, String str) throws IOException {
        AutoClosableProcess.runBlocking(new String[]{this.kafkaDir.resolve(Paths.get("bin", "kafka-topics.sh")).toString(), "--create", "--zookeeper", ZOOKEEPER_ADDRESS, "--replication-factor", String.valueOf(i), "--partitions", String.valueOf(i2), "--topic", str});
    }

    @Override // org.apache.flink.tests.util.kafka.KafkaResource
    public void sendMessages(String str, String... strArr) throws IOException {
        sendMessagesAndWait(createSendMessageArguments(str), strArr);
    }

    @Override // org.apache.flink.tests.util.kafka.KafkaResource
    public void sendKeyedMessages(String str, String str2, String... strArr) throws IOException {
        ArrayList arrayList = new ArrayList(createSendMessageArguments(str));
        arrayList.add("--property");
        arrayList.add("parse.key=true");
        arrayList.add("--property");
        arrayList.add("key.separator=" + str2);
        sendMessagesAndWait(arrayList, strArr);
    }

    private List<String> createSendMessageArguments(String str) {
        return Arrays.asList(this.kafkaDir.resolve(Paths.get("bin", "kafka-console-producer.sh")).toString(), "--broker-list", KAFKA_ADDRESS, "--topic", str);
    }

    private void sendMessagesAndWait(List<String> list, String... strArr) throws IOException {
        AutoClosableProcess runNonBlocking = AutoClosableProcess.runNonBlocking((String[]) list.toArray(new String[0]));
        Throwable th = null;
        try {
            PrintStream printStream = new PrintStream(runNonBlocking.getProcess().getOutputStream(), true, StandardCharsets.UTF_8.name());
            Throwable th2 = null;
            try {
                try {
                    for (String str : strArr) {
                        printStream.println(str);
                    }
                    printStream.flush();
                    if (printStream != null) {
                        if (0 != 0) {
                            try {
                                printStream.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            printStream.close();
                        }
                    }
                    try {
                        runNonBlocking.getProcess().waitFor();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    if (runNonBlocking != null) {
                        if (0 == 0) {
                            runNonBlocking.close();
                            return;
                        }
                        try {
                            runNonBlocking.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (printStream != null) {
                    if (th2 != null) {
                        try {
                            printStream.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        printStream.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (runNonBlocking != null) {
                if (0 != 0) {
                    try {
                        runNonBlocking.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    runNonBlocking.close();
                }
            }
            throw th8;
        }
    }

    @Override // org.apache.flink.tests.util.kafka.KafkaResource
    public List<String> readMessage(int i, String str, String str2) throws IOException {
        List<String> synchronizedList = Collections.synchronizedList(new ArrayList(i));
        AutoClosableProcess.AutoClosableProcessBuilder create = AutoClosableProcess.create(new String[]{this.kafkaDir.resolve(Paths.get("bin", "kafka-console-consumer.sh")).toString(), "--bootstrap-server", KAFKA_ADDRESS, "--from-beginning", "--max-messages", String.valueOf(i), "--topic", str2, "--consumer-property", "group.id=" + str});
        synchronizedList.getClass();
        AutoClosableProcess runNonBlocking = create.setStdoutProcessor((v1) -> {
            r1.add(v1);
        }).runNonBlocking();
        Throwable th = null;
        try {
            Deadline fromNow = Deadline.fromNow(Duration.ofSeconds(120L));
            while (fromNow.hasTimeLeft() && synchronizedList.size() < i) {
                try {
                    LOG.info("Waiting for messages. Received {}/{}.", Integer.valueOf(synchronizedList.size()), Integer.valueOf(i));
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            if (synchronizedList.size() != i) {
                throw new IOException("Could not read expected number of messages.");
            }
            return synchronizedList;
        } finally {
            if (runNonBlocking != null) {
                if (0 != 0) {
                    try {
                        runNonBlocking.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    runNonBlocking.close();
                }
            }
        }
    }

    @Override // org.apache.flink.tests.util.kafka.KafkaResource
    public void setNumPartitions(int i, String str) throws IOException {
        AutoClosableProcess.runBlocking(new String[]{this.kafkaDir.resolve(Paths.get("bin", "kafka-topics.sh")).toString(), "--alter", "--topic", str, "--partitions", String.valueOf(i), "--zookeeper", ZOOKEEPER_ADDRESS});
    }

    @Override // org.apache.flink.tests.util.kafka.KafkaResource
    public int getNumPartitions(String str) throws IOException {
        Pattern compile = Pattern.compile(".*PartitionCount:\\s*([0-9]+).*");
        AtomicReference atomicReference = new AtomicReference(-1);
        AutoClosableProcess.create(new String[]{this.kafkaDir.resolve(Paths.get("bin", "kafka-topics.sh")).toString(), "--describe", "--topic", str, "--zookeeper", ZOOKEEPER_ADDRESS}).setStdoutProcessor(str2 -> {
            Matcher matcher = compile.matcher(str2);
            if (matcher.matches()) {
                atomicReference.compareAndSet(-1, Integer.valueOf(Integer.parseInt(matcher.group(1))));
            }
        }).runBlocking();
        return ((Integer) atomicReference.get()).intValue();
    }

    @Override // org.apache.flink.tests.util.kafka.KafkaResource
    public long getPartitionOffset(String str, int i) throws IOException {
        Pattern compile = Pattern.compile(".*:.*:([0-9]+)");
        AtomicReference atomicReference = new AtomicReference(-1);
        AutoClosableProcess.create(new String[]{this.kafkaDir.resolve(Paths.get("bin", "kafka-run-class.sh")).toString(), "kafka.tools.GetOffsetShell", "--broker-list", KAFKA_ADDRESS, "--topic", str, "--partitions", String.valueOf(i), "--time", "-1"}).setStdoutProcessor(str2 -> {
            Matcher matcher = compile.matcher(str2);
            if (matcher.matches()) {
                atomicReference.compareAndSet(-1, Integer.valueOf(Integer.parseInt(matcher.group(1))));
            }
        }).runBlocking();
        int intValue = ((Integer) atomicReference.get()).intValue();
        if (intValue == -1) {
            throw new IOException("Could not determine partition offset.");
        }
        return intValue;
    }

    @Override // org.apache.flink.tests.util.kafka.KafkaResource
    public Collection<InetSocketAddress> getBootstrapServerAddresses() {
        return Collections.singletonList(InetSocketAddress.createUnresolved("localhost", KAFKA_PORT));
    }

    @Override // org.apache.flink.tests.util.kafka.KafkaResource
    public InetSocketAddress getZookeeperAddress() {
        return InetSocketAddress.createUnresolved("localhost", KAFKA_PORT);
    }
}
