package org.apache.flink.runtime.registration;

import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RegistrationResponse.Success;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/registration/RetryingRegistration.class */
public abstract class RetryingRegistration<F extends Serializable, G extends RpcGateway, S extends RegistrationResponse.Success> {
    private final Logger log;
    private final RpcService rpcService;
    private final String targetName;
    private final Class<G> targetType;
    private final String targetAddress;
    private final F fencingToken;
    private final CompletableFuture<Tuple2<G, S>> completionFuture = new CompletableFuture<>();
    private final RetryingRegistrationConfiguration retryingRegistrationConfiguration;
    private volatile boolean canceled;

    public RetryingRegistration(Logger logger, RpcService rpcService, String str, Class<G> cls, String str2, F f, RetryingRegistrationConfiguration retryingRegistrationConfiguration) {
        this.log = (Logger) Preconditions.checkNotNull(logger);
        this.rpcService = (RpcService) Preconditions.checkNotNull(rpcService);
        this.targetName = (String) Preconditions.checkNotNull(str);
        this.targetType = (Class) Preconditions.checkNotNull(cls);
        this.targetAddress = (String) Preconditions.checkNotNull(str2);
        this.fencingToken = (F) Preconditions.checkNotNull(f);
        this.retryingRegistrationConfiguration = (RetryingRegistrationConfiguration) Preconditions.checkNotNull(retryingRegistrationConfiguration);
    }

    public CompletableFuture<Tuple2<G, S>> getFuture() {
        return this.completionFuture;
    }

    public void cancel() {
        this.canceled = true;
        this.completionFuture.cancel(false);
    }

    public boolean isCanceled() {
        return this.canceled;
    }

    protected abstract CompletableFuture<RegistrationResponse> invokeRegistration(G g, F f, long j) throws Exception;

    public void startRegistration() {
        if (this.canceled) {
            return;
        }
        try {
            (FencedRpcGateway.class.isAssignableFrom(this.targetType) ? this.rpcService.connect(this.targetAddress, this.fencingToken, this.targetType.asSubclass(FencedRpcGateway.class)) : this.rpcService.connect(this.targetAddress, this.targetType)).thenAcceptAsync(rpcGateway -> {
                this.log.info("Resolved {} address, beginning registration", this.targetName);
                register(rpcGateway, 1, this.retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis());
            }, this.rpcService.getExecutor()).whenCompleteAsync((r9, th) -> {
                if (th == null || this.canceled) {
                    return;
                }
                Throwable stripCompletionException = ExceptionUtils.stripCompletionException(th);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Could not resolve {} address {}, retrying in {} ms.", this.targetName, this.targetAddress, Long.valueOf(this.retryingRegistrationConfiguration.getErrorDelayMillis()), stripCompletionException);
                } else {
                    this.log.info("Could not resolve {} address {}, retrying in {} ms: {}", this.targetName, this.targetAddress, Long.valueOf(this.retryingRegistrationConfiguration.getErrorDelayMillis()), stripCompletionException.getMessage());
                }
                startRegistrationLater(this.retryingRegistrationConfiguration.getErrorDelayMillis());
            }, this.rpcService.getExecutor());
        } catch (Throwable th2) {
            this.completionFuture.completeExceptionally(th2);
            cancel();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void register(G g, int i, long j) {
        if (this.canceled) {
            return;
        }
        try {
            this.log.debug("Registration at {} attempt {} (timeout={}ms)", this.targetName, Integer.valueOf(i), Long.valueOf(j));
            invokeRegistration(g, this.fencingToken, j).thenAcceptAsync(registrationResponse -> {
                if (isCanceled()) {
                    return;
                }
                if (registrationResponse instanceof RegistrationResponse.Success) {
                    this.completionFuture.complete(Tuple2.of(g, (RegistrationResponse.Success) registrationResponse));
                    return;
                }
                if (registrationResponse instanceof RegistrationResponse.Decline) {
                    this.log.info("Registration at {} was declined: {}", this.targetName, ((RegistrationResponse.Decline) registrationResponse).getReason());
                } else {
                    this.log.error("Received unknown response to registration attempt: {}", registrationResponse);
                }
                this.log.info("Pausing and re-attempting registration in {} ms", Long.valueOf(this.retryingRegistrationConfiguration.getRefusedDelayMillis()));
                registerLater(g, 1, this.retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis(), this.retryingRegistrationConfiguration.getRefusedDelayMillis());
            }, this.rpcService.getExecutor()).whenCompleteAsync((r13, th) -> {
                if (th == null || isCanceled()) {
                    return;
                }
                if (ExceptionUtils.stripCompletionException(th) instanceof TimeoutException) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Registration at {} ({}) attempt {} timed out after {} ms", this.targetName, this.targetAddress, Integer.valueOf(i), Long.valueOf(j));
                    }
                    register(g, i + 1, Math.min(2 * j, this.retryingRegistrationConfiguration.getMaxRegistrationTimeoutMillis()));
                } else {
                    this.log.error("Registration at {} failed due to an error", this.targetName, th);
                    this.log.info("Pausing and re-attempting registration in {} ms", Long.valueOf(this.retryingRegistrationConfiguration.getErrorDelayMillis()));
                    registerLater(g, 1, this.retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis(), this.retryingRegistrationConfiguration.getErrorDelayMillis());
                }
            }, this.rpcService.getExecutor());
        } catch (Throwable th2) {
            this.completionFuture.completeExceptionally(th2);
            cancel();
        }
    }

    private void registerLater(final G g, final int i, final long j, long j2) {
        this.rpcService.scheduleRunnable(new Runnable() { // from class: org.apache.flink.runtime.registration.RetryingRegistration.1
            @Override // java.lang.Runnable
            public void run() {
                RetryingRegistration.this.register(g, i, j);
            }
        }, j2, TimeUnit.MILLISECONDS);
    }

    private void startRegistrationLater(long j) {
        this.rpcService.scheduleRunnable(this::startRegistration, j, TimeUnit.MILLISECONDS);
    }
}
