package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.class */
class PartitionRequestClientFactory {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClientFactory.class);
    private final NettyClient nettyClient;
    private final int retryNumber;
    private final ConcurrentMap<ConnectionID, CompletableFuture<NettyPartitionRequestClient>> clients;

    PartitionRequestClientFactory(NettyClient nettyClient) {
        this(nettyClient, 0);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionRequestClientFactory(NettyClient nettyClient, int i) {
        this.clients = new ConcurrentHashMap();
        this.nettyClient = nettyClient;
        this.retryNumber = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyPartitionRequestClient createPartitionRequestClient(ConnectionID connectionID) throws IOException, InterruptedException {
        while (true) {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            CompletableFuture<NettyPartitionRequestClient> computeIfAbsent = this.clients.computeIfAbsent(connectionID, connectionID2 -> {
                atomicBoolean.set(true);
                return new CompletableFuture();
            });
            if (atomicBoolean.get()) {
                try {
                    computeIfAbsent.complete(connectWithRetries(connectionID));
                } catch (InterruptedException e) {
                    computeIfAbsent.complete(null);
                    throw e;
                } catch (Exception e2) {
                    computeIfAbsent.completeExceptionally(e2);
                }
            }
            try {
                NettyPartitionRequestClient nettyPartitionRequestClient = computeIfAbsent.get();
                if (nettyPartitionRequestClient == null) {
                    this.clients.remove(connectionID, computeIfAbsent);
                } else {
                    if (nettyPartitionRequestClient.incrementReferenceCounter()) {
                        return nettyPartitionRequestClient;
                    }
                    destroyPartitionRequestClient(connectionID, nettyPartitionRequestClient);
                }
            } catch (ExecutionException e3) {
                throw new IOException(e3);
            }
        }
    }

    private NettyPartitionRequestClient connectWithRetries(ConnectionID connectionID) throws InterruptedException {
        int i = 0;
        do {
            try {
                return connect(connectionID);
            } catch (RemoteTransportException e) {
                i++;
                LOG.error("Failed {} times to connect to {}", new Object[]{Integer.valueOf(i), connectionID.getAddress(), e});
            }
        } while (i <= this.retryNumber);
        throw new CompletionException(e);
    }

    /* JADX WARN: Type inference failed for: r0v5, types: [org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture] */
    private NettyPartitionRequestClient connect(ConnectionID connectionID) throws RemoteTransportException, InterruptedException {
        try {
            Channel channel = this.nettyClient.connect(connectionID.getAddress()).await2().channel();
            return new NettyPartitionRequestClient(channel, (NetworkClientHandler) channel.pipeline().get(NetworkClientHandler.class), connectionID, this);
        } catch (InterruptedException e) {
            throw e;
        } catch (Exception e2) {
            throw new RemoteTransportException("Connecting to remote task manager '" + connectionID.getAddress() + "' has failed. This might indicate that the remote task manager has been lost.", connectionID.getAddress(), e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeOpenChannelConnections(ConnectionID connectionID) {
        CompletableFuture<NettyPartitionRequestClient> completableFuture = this.clients.get(connectionID);
        if (completableFuture == null || completableFuture.isDone()) {
            return;
        }
        completableFuture.thenAccept(nettyPartitionRequestClient -> {
            if (nettyPartitionRequestClient.disposeIfNotUsed()) {
                this.clients.remove(connectionID, completableFuture);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumberOfActiveClients() {
        return this.clients.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void destroyPartitionRequestClient(ConnectionID connectionID, PartitionRequestClient partitionRequestClient) {
        CompletableFuture<NettyPartitionRequestClient> completableFuture = this.clients.get(connectionID);
        if (completableFuture == null || !completableFuture.isDone()) {
            return;
        }
        completableFuture.thenAccept(nettyPartitionRequestClient -> {
            if (partitionRequestClient.equals(nettyPartitionRequestClient)) {
                this.clients.remove(connectionID, completableFuture);
            }
        });
    }
}
