package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorRef;
import akka.pattern.Patterns;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcServer;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.runtime.rpc.messages.CallAsync;
import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
import org.apache.flink.runtime.rpc.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.messages.RunAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.class */
public class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, RpcServer {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AkkaInvocationHandler.class);
    private final String address;
    private final String hostname;
    private final ActorRef rpcEndpoint;
    protected final boolean isLocal;
    private final Time timeout;
    private final long maximumFramesize;

    @Nullable
    private final CompletableFuture<Void> terminationFuture;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AkkaInvocationHandler(String str, String str2, ActorRef actorRef, Time time, long j, @Nullable CompletableFuture<Void> completableFuture) {
        this.address = (String) Preconditions.checkNotNull(str);
        this.hostname = (String) Preconditions.checkNotNull(str2);
        this.rpcEndpoint = (ActorRef) Preconditions.checkNotNull(actorRef);
        this.isLocal = this.rpcEndpoint.path().address().hasLocalScope();
        this.timeout = (Time) Preconditions.checkNotNull(time);
        this.maximumFramesize = j;
        this.terminationFuture = completableFuture;
    }

    @Override // java.lang.reflect.InvocationHandler
    public Object invoke(Object obj, Method method, Object[] objArr) throws Throwable {
        Object invoke;
        Class<?> declaringClass = method.getDeclaringClass();
        if (declaringClass.equals(AkkaBasedEndpoint.class) || declaringClass.equals(Object.class) || declaringClass.equals(RpcGateway.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(RpcServer.class)) {
            invoke = method.invoke(this, objArr);
        } else {
            if (declaringClass.equals(FencedRpcGateway.class)) {
                throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" + method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to retrieve a properly FencedRpcGateway.");
            }
            invoke = invokeRpc(method, objArr);
        }
        return invoke;
    }

    @Override // org.apache.flink.runtime.rpc.akka.AkkaBasedEndpoint
    public ActorRef getActorRef() {
        return this.rpcEndpoint;
    }

    @Override // org.apache.flink.runtime.rpc.MainThreadExecutable
    public void runAsync(Runnable runnable) {
        scheduleRunAsync(runnable, 0L);
    }

    @Override // org.apache.flink.runtime.rpc.MainThreadExecutable
    public void scheduleRunAsync(Runnable runnable, long j) {
        Preconditions.checkNotNull(runnable, "runnable");
        Preconditions.checkArgument(j >= 0, "delay must be zero or greater");
        if (!this.isLocal) {
            throw new RuntimeException("Trying to send a Runnable to a remote actor at " + this.rpcEndpoint.path() + ". This is not supported.");
        }
        tell(new RunAsync(runnable, j == 0 ? 0L : System.nanoTime() + (j * 1000000)));
    }

    @Override // org.apache.flink.runtime.rpc.MainThreadExecutable
    public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time time) {
        if (this.isLocal) {
            return (CompletableFuture<V>) ask(new CallAsync(callable), time);
        }
        throw new RuntimeException("Trying to send a Callable to a remote actor at " + this.rpcEndpoint.path() + ". This is not supported.");
    }

    @Override // org.apache.flink.runtime.rpc.StartStoppable
    public void start() {
        this.rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
    }

    @Override // org.apache.flink.runtime.rpc.StartStoppable
    public void stop() {
        this.rpcEndpoint.tell(ControlMessages.STOP, ActorRef.noSender());
    }

    private Object invokeRpc(Method method, Object[] objArr) throws Exception {
        Object obj;
        String name = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        Time extractRpcTimeout = extractRpcTimeout(method.getParameterAnnotations(), objArr, this.timeout);
        RpcInvocation createRpcInvocationMessage = createRpcInvocationMessage(name, parameterTypes, objArr);
        Class<?> returnType = method.getReturnType();
        if (Objects.equals(returnType, Void.TYPE)) {
            tell(createRpcInvocationMessage);
            obj = null;
        } else {
            CompletableFuture<U> thenApply = ask(createRpcInvocationMessage, extractRpcTimeout).thenApply(obj2 -> {
                if (!(obj2 instanceof SerializedValue)) {
                    return obj2;
                }
                try {
                    return ((SerializedValue) obj2).deserializeValue(getClass().getClassLoader());
                } catch (IOException | ClassNotFoundException e) {
                    throw new CompletionException(new RpcException("Could not deserialize the serialized payload of RPC method : " + name, e));
                }
            });
            if (Objects.equals(returnType, CompletableFuture.class)) {
                obj = thenApply;
            } else {
                try {
                    obj = thenApply.get(extractRpcTimeout.getSize(), extractRpcTimeout.getUnit());
                } catch (ExecutionException e) {
                    throw new RpcException("Failure while obtaining synchronous RPC result.", ExceptionUtils.stripExecutionException(e));
                }
            }
        }
        return obj;
    }

    protected RpcInvocation createRpcInvocationMessage(String str, Class<?>[] clsArr, Object[] objArr) throws IOException {
        RpcInvocation rpcInvocation;
        if (this.isLocal) {
            rpcInvocation = new LocalRpcInvocation(str, clsArr, objArr);
        } else {
            try {
                RemoteRpcInvocation remoteRpcInvocation = new RemoteRpcInvocation(str, clsArr, objArr);
                if (remoteRpcInvocation.getSize() > this.maximumFramesize) {
                    throw new IOException(String.format("The rpc invocation size %d exceeds the maximum akka framesize.", Long.valueOf(remoteRpcInvocation.getSize())));
                }
                rpcInvocation = remoteRpcInvocation;
            } catch (IOException e) {
                LOG.warn("Could not create remote rpc invocation message. Failing rpc invocation because...", (Throwable) e);
                throw e;
            }
        }
        return rpcInvocation;
    }

    private static Time extractRpcTimeout(Annotation[][] annotationArr, Object[] objArr, Time time) {
        if (objArr != null) {
            Preconditions.checkArgument(annotationArr.length == objArr.length);
            for (int i = 0; i < annotationArr.length; i++) {
                if (isRpcTimeout(annotationArr[i])) {
                    if (objArr[i] instanceof Time) {
                        return (Time) objArr[i];
                    }
                    throw new RuntimeException("The rpc timeout parameter must be of type " + Time.class.getName() + ". The type " + objArr[i].getClass().getName() + " is not supported.");
                }
            }
        }
        return time;
    }

    private static boolean isRpcTimeout(Annotation[] annotationArr) {
        for (Annotation annotation : annotationArr) {
            if (annotation.annotationType().equals(RpcTimeout.class)) {
                return true;
            }
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void tell(Object obj) {
        this.rpcEndpoint.tell(obj, ActorRef.noSender());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<?> ask(Object obj, Time time) {
        return FutureUtils.toJava(Patterns.ask(this.rpcEndpoint, obj, time.toMilliseconds()));
    }

    @Override // org.apache.flink.runtime.rpc.RpcGateway
    public String getAddress() {
        return this.address;
    }

    @Override // org.apache.flink.runtime.rpc.RpcGateway
    public String getHostname() {
        return this.hostname;
    }

    @Override // org.apache.flink.runtime.rpc.RpcServer
    public CompletableFuture<Void> getTerminationFuture() {
        return this.terminationFuture;
    }
}
