package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorIdentity;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Identify;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.dispatch.Futures;
import akka.pattern.Patterns;
import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter;
import org.apache.flink.runtime.rpc.FencedMainThreadExecutable;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcServer;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.runtime.rpc.messages.HandshakeSuccessMessage;
import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.reflect.ClassTag$;

@ThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcService.class */
public class AkkaRpcService implements RpcService {
    private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcService.class);
    static final int VERSION = 1;
    static final String MAXIMUM_FRAME_SIZE_PATH = "akka.remote.netty.tcp.maximum-frame-size";
    private final ActorSystem actorSystem;
    private final Time timeout;
    private final long maximumFramesize;
    private final String address;
    private final int port;
    private final ScheduledExecutor internalScheduledExecutor;
    private final CompletableFuture<Void> terminationFuture;
    private volatile boolean stopped;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private final Map<ActorRef, RpcEndpoint> actors = new HashMap(4);

    public AkkaRpcService(ActorSystem actorSystem, Time time) {
        this.actorSystem = (ActorSystem) Preconditions.checkNotNull(actorSystem, "actor system");
        this.timeout = (Time) Preconditions.checkNotNull(time, "timeout");
        if (actorSystem.settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) {
            this.maximumFramesize = actorSystem.settings().config().getBytes(MAXIMUM_FRAME_SIZE_PATH).longValue();
        } else {
            this.maximumFramesize = Long.MAX_VALUE;
        }
        Address address = AkkaUtils.getAddress(actorSystem);
        if (address.host().isDefined()) {
            this.address = (String) address.host().get();
        } else {
            this.address = "";
        }
        if (address.port().isDefined()) {
            this.port = ((Integer) address.port().get()).intValue();
        } else {
            this.port = -1;
        }
        this.internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem);
        this.terminationFuture = new CompletableFuture<>();
        this.stopped = false;
    }

    public ActorSystem getActorSystem() {
        return this.actorSystem;
    }

    protected int getVersion() {
        return 1;
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public String getAddress() {
        return this.address;
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public int getPort() {
        return this.port;
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public <C extends RpcGateway> CompletableFuture<C> connect(String str, Class<C> cls) {
        return connectInternal(str, cls, actorRef -> {
            Tuple2<String, String> extractAddressHostname = extractAddressHostname(actorRef);
            return new AkkaInvocationHandler((String) extractAddressHostname.f0, (String) extractAddressHostname.f1, actorRef, this.timeout, this.maximumFramesize, null);
        });
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(String str, F f, Class<C> cls) {
        return connectInternal(str, cls, actorRef -> {
            Tuple2<String, String> extractAddressHostname = extractAddressHostname(actorRef);
            return new FencedAkkaInvocationHandler((String) extractAddressHostname.f0, (String) extractAddressHostname.f1, actorRef, this.timeout, this.maximumFramesize, null, () -> {
                return f;
            });
        });
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C c) {
        ActorRef actorOf;
        AkkaInvocationHandler akkaInvocationHandler;
        Preconditions.checkNotNull(c, "rpc endpoint");
        CompletableFuture completableFuture = new CompletableFuture();
        Props create = c instanceof FencedRpcEndpoint ? Props.create(FencedAkkaRpcActor.class, new Object[]{c, completableFuture, Integer.valueOf(getVersion())}) : Props.create(AkkaRpcActor.class, new Object[]{c, completableFuture, Integer.valueOf(getVersion())});
        synchronized (this.lock) {
            Preconditions.checkState(!this.stopped, "RpcService is stopped");
            actorOf = this.actorSystem.actorOf(create, c.getEndpointId());
            this.actors.put(actorOf, c);
        }
        LOG.info("Starting RPC endpoint for {} at {} .", c.getClass().getName(), actorOf.path());
        String akkaURL = AkkaUtils.getAkkaURL(this.actorSystem, actorOf);
        Option host = AkkaUtils.getAddress(this.actorSystem).host();
        String str = host.isEmpty() ? "localhost" : (String) host.get();
        HashSet hashSet = new HashSet(RpcUtils.extractImplementedRpcGateways(c.getClass()));
        hashSet.add(RpcServer.class);
        hashSet.add(AkkaBasedEndpoint.class);
        if (c instanceof FencedRpcEndpoint) {
            Time time = this.timeout;
            long j = this.maximumFramesize;
            FencedRpcEndpoint fencedRpcEndpoint = (FencedRpcEndpoint) c;
            fencedRpcEndpoint.getClass();
            akkaInvocationHandler = new FencedAkkaInvocationHandler(akkaURL, str, actorOf, time, j, completableFuture, fencedRpcEndpoint::getFencingToken);
            hashSet.add(FencedMainThreadExecutable.class);
        } else {
            akkaInvocationHandler = new AkkaInvocationHandler(akkaURL, str, actorOf, this.timeout, this.maximumFramesize, completableFuture);
        }
        return (RpcServer) Proxy.newProxyInstance(getClass().getClassLoader(), (Class[]) hashSet.toArray(new Class[hashSet.size()]), akkaInvocationHandler);
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public <F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F f) {
        if (!(rpcServer instanceof AkkaBasedEndpoint)) {
            throw new RuntimeException("The given RpcServer must implement the AkkaGateway in order to fence it.");
        }
        return (RpcServer) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{RpcServer.class, AkkaBasedEndpoint.class}, new FencedAkkaInvocationHandler(rpcServer.getAddress(), rpcServer.getHostname(), ((AkkaBasedEndpoint) rpcServer).getActorRef(), this.timeout, this.maximumFramesize, null, () -> {
            return f;
        }));
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public void stopServer(RpcServer rpcServer) {
        if (rpcServer instanceof AkkaBasedEndpoint) {
            AkkaBasedEndpoint akkaBasedEndpoint = (AkkaBasedEndpoint) rpcServer;
            synchronized (this.lock) {
                if (this.stopped) {
                    return;
                }
                RpcEndpoint remove = this.actors.remove(akkaBasedEndpoint.getActorRef());
                if (remove != null) {
                    akkaBasedEndpoint.getActorRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
                } else {
                    LOG.debug("RPC endpoint {} already stopped or from different RPC service", rpcServer.getAddress());
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public CompletableFuture<Void> stopService() {
        synchronized (this.lock) {
            if (this.stopped) {
                return this.terminationFuture;
            }
            this.stopped = true;
            LOG.info("Stopping Akka RPC service.");
            FutureUtils.toJava(this.actorSystem.terminate()).whenComplete((terminated, th) -> {
                synchronized (this.lock) {
                    this.actors.clear();
                }
                if (th != null) {
                    this.terminationFuture.completeExceptionally(th);
                } else {
                    this.terminationFuture.complete(null);
                }
                LOG.info("Stopped Akka RPC service.");
            });
            return this.terminationFuture;
        }
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public CompletableFuture<Void> getTerminationFuture() {
        return this.terminationFuture;
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public Executor getExecutor() {
        return this.actorSystem.dispatcher();
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public ScheduledExecutor getScheduledExecutor() {
        return this.internalScheduledExecutor;
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public ScheduledFuture<?> scheduleRunnable(Runnable runnable, long j, TimeUnit timeUnit) {
        Preconditions.checkNotNull(runnable, "runnable");
        Preconditions.checkNotNull(timeUnit, "unit");
        Preconditions.checkArgument(j >= 0, "delay must be zero or larger");
        return this.internalScheduledExecutor.schedule(runnable, j, timeUnit);
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public void execute(Runnable runnable) {
        this.actorSystem.dispatcher().execute(runnable);
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public <T> CompletableFuture<T> execute(Callable<T> callable) {
        return FutureUtils.toJava(Futures.future(callable, this.actorSystem.dispatcher()));
    }

    private Tuple2<String, String> extractAddressHostname(ActorRef actorRef) {
        String akkaURL = AkkaUtils.getAkkaURL(this.actorSystem, actorRef);
        Option host = AkkaUtils.getAddress(this.actorSystem).host();
        return Tuple2.of(akkaURL, host.isEmpty() ? "localhost" : (String) host.get());
    }

    private <C extends RpcGateway> CompletableFuture<C> connectInternal(String str, Class<C> cls, Function<ActorRef, InvocationHandler> function) {
        Preconditions.checkState(!this.stopped, "RpcService is stopped");
        LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.", str, cls.getName());
        CompletableFuture thenApply = FutureUtils.toJava(Patterns.ask(this.actorSystem.actorSelection(str), new Identify(42), this.timeout.toMilliseconds()).mapTo(ClassTag$.MODULE$.apply(ActorIdentity.class))).thenApply(actorIdentity -> {
            if (actorIdentity.getRef() == null) {
                throw new CompletionException(new RpcConnectionException("Could not connect to rpc endpoint under address " + str + '.'));
            }
            return actorIdentity.getRef();
        });
        return thenApply.thenCombineAsync((CompletionStage) thenApply.thenCompose(actorRef -> {
            return FutureUtils.toJava(Patterns.ask(actorRef, new RemoteHandshakeMessage(cls, getVersion()), this.timeout.toMilliseconds()).mapTo(ClassTag$.MODULE$.apply(HandshakeSuccessMessage.class)));
        }), (actorRef2, handshakeSuccessMessage) -> {
            return (RpcGateway) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{cls}, (InvocationHandler) function.apply(actorRef2));
        }, (Executor) this.actorSystem.dispatcher());
    }
}
