package org.apache.tephra.shaded.org.apache.twill.internal.kafka;

import java.net.BindException;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
import org.apache.tephra.shaded.com.google.common.base.Preconditions;
import org.apache.tephra.shaded.com.google.common.base.Throwables;
import org.apache.tephra.shaded.com.google.common.util.concurrent.AbstractIdleService;
import org.apache.tephra.shaded.org.apache.twill.internal.Configs;
import org.apache.tephra.shaded.org.apache.twill.internal.utils.Networks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tephra/shaded/org/apache/twill/internal/kafka/EmbeddedKafkaServer.class */
public final class EmbeddedKafkaServer extends AbstractIdleService {
    public static final String START_RETRIES = "twill.kafka.start.timeout.retries";
    private static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaServer.class);
    private static final String DEFAULT_START_RETRIES = "5";
    private final int startTimeoutRetries;
    private final Properties properties = new Properties();
    private KafkaServer server;

    public EmbeddedKafkaServer(Properties properties) {
        this.startTimeoutRetries = Integer.parseInt(properties.getProperty(START_RETRIES, DEFAULT_START_RETRIES));
        this.properties.putAll(properties);
    }

    @Override // org.apache.tephra.shaded.com.google.common.util.concurrent.AbstractIdleService
    protected void startUp() throws Exception {
        int i = 0;
        do {
            KafkaConfig createKafkaConfig = createKafkaConfig(this.properties);
            KafkaServer createKafkaServer = createKafkaServer(createKafkaConfig);
            try {
                createKafkaServer.startup();
                this.server = createKafkaServer;
            } catch (Exception e) {
                createKafkaServer.shutdown();
                createKafkaServer.awaitShutdown();
                Throwable rootCause = Throwables.getRootCause(e);
                if (rootCause instanceof ZkTimeoutException) {
                    LOG.warn("Timeout when connecting to ZooKeeper from KafkaServer. Attempt number {}.", Integer.valueOf(i), rootCause);
                } else {
                    if (!(rootCause instanceof BindException)) {
                        throw e;
                    }
                    LOG.warn("Kafka failed to bind to port {}. Attempt number {}.", new Object[]{Integer.valueOf(createKafkaConfig.port()), Integer.valueOf(i), rootCause});
                }
                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(Configs.Defaults.JAVA_RESERVED_MEMORY_MB) + 1);
            }
            if (this.server != null) {
                break;
            } else {
                i++;
            }
        } while (i < this.startTimeoutRetries);
        if (this.server == null) {
            throw new IllegalStateException("Failed to start Kafka server after " + i + " attempts.");
        }
    }

    @Override // org.apache.tephra.shaded.com.google.common.util.concurrent.AbstractIdleService
    protected void shutDown() throws Exception {
        if (this.server != null) {
            this.server.shutdown();
            this.server.awaitShutdown();
        }
    }

    private KafkaServer createKafkaServer(KafkaConfig kafkaConfig) {
        return new KafkaServer(kafkaConfig, new Time() { // from class: org.apache.tephra.shaded.org.apache.twill.internal.kafka.EmbeddedKafkaServer.1
            public long milliseconds() {
                return System.currentTimeMillis();
            }

            public long nanoseconds() {
                return System.nanoTime();
            }

            public void sleep(long j) {
                try {
                    Thread.sleep(j);
                } catch (InterruptedException e) {
                    Thread.interrupted();
                }
            }
        });
    }

    private KafkaConfig createKafkaConfig(Properties properties) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        String property = properties2.getProperty("port");
        if (property == null || "0".equals(property)) {
            int randomPort = Networks.getRandomPort();
            Preconditions.checkState(randomPort > 0, "Failed to get random port.");
            properties2.setProperty("port", Integer.toString(randomPort));
        }
        return new KafkaConfig(properties2);
    }
}
