package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigMergeable;
import java.io.IOException;
import java.net.BindException;
import java.util.Iterator;
import java.util.Optional;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.util.NetUtils;
import org.jboss.netty.channel.ChannelException;
import org.slf4j.Logger;

/* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/akka/AkkaBootstrapTools.class */
public class AkkaBootstrapTools {

    /* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/akka/AkkaBootstrapTools$ActorSystemExecutorConfiguration.class */
    public interface ActorSystemExecutorConfiguration {
        Config getAkkaConfig();
    }

    @VisibleForTesting
    public static ActorSystem startRemoteActorSystem(Configuration configuration, String str, String str2, Logger logger) throws Exception {
        return startRemoteActorSystem(configuration, AkkaUtils.getFlinkActorSystemName(), str, str2, NetUtils.getWildcardIPAddress(), (Optional<Integer>) Optional.empty(), logger, AkkaUtils.getForkJoinExecutorConfig(getForkJoinExecutorConfiguration(configuration)), (Config) null);
    }

    public static ActorSystem startRemoteActorSystem(Configuration configuration, String str, String str2, String str3, String str4, Optional<Integer> optional, Logger logger, Config config, Config config2) throws Exception {
        try {
            Iterator portRangeFromString = NetUtils.getPortRangeFromString(str3);
            while (portRangeFromString.hasNext()) {
                int intValue = ((Integer) portRangeFromString.next()).intValue();
                try {
                    return startRemoteActorSystem(configuration, str, str2, intValue, str4, optional.orElse(Integer.valueOf(intValue)).intValue(), logger, config, config2);
                } catch (Exception e) {
                    Throwable cause = e.getCause();
                    if (!(cause instanceof ChannelException) && !(cause instanceof BindException)) {
                        throw e;
                    }
                }
            }
            throw new BindException("Could not start actor system on any port in port range " + str3);
        } catch (Exception e2) {
            throw new IllegalArgumentException("Invalid port range definition: " + str3);
        }
    }

    private static ActorSystem startRemoteActorSystem(Configuration configuration, String str, String str2, int i, String str3, int i2, Logger logger, Config config, Config config2) throws Exception {
        Throwable cause;
        String unresolvedHostAndPortToNormalizedString = NetUtils.unresolvedHostAndPortToNormalizedString(str2, i);
        String unresolvedHostAndPortToNormalizedString2 = NetUtils.unresolvedHostAndPortToNormalizedString(str3, i2);
        logger.info("Trying to start actor system, external address {}, bind address {}.", unresolvedHostAndPortToNormalizedString, unresolvedHostAndPortToNormalizedString2);
        try {
            Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, new HostAndPort(str2, i), new HostAndPort(str3, i2), config);
            if (config2 != null) {
                akkaConfig = config2.withFallback((ConfigMergeable) akkaConfig);
            }
            return startActorSystem(akkaConfig, str, logger);
        } catch (Throwable th) {
            if ((th instanceof org.apache.flink.shaded.netty4.io.netty.channel.ChannelException) && (cause = th.getCause()) != null && (th.getCause() instanceof BindException)) {
                throw new IOException("Unable to create ActorSystem at address " + unresolvedHostAndPortToNormalizedString2 + " : " + cause.getMessage(), th);
            }
            throw new Exception("Could not create actor system", th);
        }
    }

    public static ActorSystem startLocalActorSystem(Configuration configuration, String str, Logger logger, Config config, Config config2) throws Exception {
        logger.info("Trying to start local actor system");
        try {
            Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, null, null, config);
            if (config2 != null) {
                akkaConfig = config2.withFallback((ConfigMergeable) akkaConfig);
            }
            return startActorSystem(akkaConfig, str, logger);
        } catch (Throwable th) {
            throw new Exception("Could not create actor system", th);
        }
    }

    private static ActorSystem startActorSystem(Config config, String str, Logger logger) {
        ActorSystem createActorSystem = AkkaUtils.createActorSystem(str, config);
        logger.info("Actor system started at {}", AkkaUtils.getAddress(createActorSystem));
        return createActorSystem;
    }

    private AkkaBootstrapTools() {
    }

    public static RpcSystem.ForkJoinExecutorConfiguration getForkJoinExecutorConfiguration(Configuration configuration) {
        return new RpcSystem.ForkJoinExecutorConfiguration(configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR), configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN), configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX));
    }
}
