package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.lang.Thread;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
import org.apache.flink.util.NetUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.class */
public class PartitionRequestClientFactoryTest {
    private static final int SERVER_PORT = NetUtils.getAvailablePort();

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest$AwaitingNettyClient.class */
    private static class AwaitingNettyClient extends NettyClient {
        private volatile boolean awaitForInterrupts;
        private final NettyClient client;

        AwaitingNettyClient(NettyClient nettyClient) {
            super((NettyConfig) null);
            this.client = nettyClient;
        }

        ChannelFuture connect(InetSocketAddress inetSocketAddress) {
            if (this.awaitForInterrupts) {
                return new NeverCompletingChannelFuture();
            }
            try {
                return this.client.connect(inetSocketAddress);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest$CountDownLatchOnConnectHandler.class */
    private static class CountDownLatchOnConnectHandler extends ChannelOutboundHandlerAdapter {
        private final CountDownLatch syncOnConnect;

        public CountDownLatchOnConnectHandler(CountDownLatch countDownLatch) {
            this.syncOnConnect = countDownLatch;
        }

        public void connect(ChannelHandlerContext channelHandlerContext, SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) {
            this.syncOnConnect.countDown();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest$FailingNettyClient.class */
    private static class FailingNettyClient extends NettyClient {
        FailingNettyClient() {
            super((NettyConfig) null);
        }

        ChannelFuture connect(InetSocketAddress inetSocketAddress) {
            throw new ChannelException("Simulate connect failure");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest$UncaughtTestExceptionHandler.class */
    private static class UncaughtTestExceptionHandler implements Thread.UncaughtExceptionHandler {
        private final List<Throwable> errors = new ArrayList(1);

        private UncaughtTestExceptionHandler() {
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            this.errors.add(th);
        }

        private List<Throwable> getErrors() {
            return this.errors;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest$UnstableNettyClient.class */
    private static class UnstableNettyClient extends NettyClient {
        private final NettyClient nettyClient;
        private int retry;

        UnstableNettyClient(NettyClient nettyClient, int i) {
            super((NettyConfig) null);
            this.nettyClient = nettyClient;
            this.retry = i;
        }

        ChannelFuture connect(InetSocketAddress inetSocketAddress) {
            if (this.retry <= 0) {
                return this.nettyClient.connect(inetSocketAddress);
            }
            this.retry--;
            throw new ChannelException("Simulate connect failure");
        }
    }

    @Test
    public void testInterruptsNotCached() throws Exception {
        NettyTestUtil.NettyServerAndClient createNettyServerAndClient = createNettyServerAndClient();
        try {
            AwaitingNettyClient awaitingNettyClient = new AwaitingNettyClient(createNettyServerAndClient.client());
            PartitionRequestClientFactory partitionRequestClientFactory = new PartitionRequestClientFactory(awaitingNettyClient, 0);
            awaitingNettyClient.awaitForInterrupts = true;
            connectAndInterrupt(partitionRequestClientFactory, createNettyServerAndClient.getConnectionID(0));
            awaitingNettyClient.awaitForInterrupts = false;
            partitionRequestClientFactory.createPartitionRequestClient(createNettyServerAndClient.getConnectionID(0));
            createNettyServerAndClient.client().shutdown();
            createNettyServerAndClient.server().shutdown();
        } catch (Throwable th) {
            createNettyServerAndClient.client().shutdown();
            createNettyServerAndClient.server().shutdown();
            throw th;
        }
    }

    private void connectAndInterrupt(PartitionRequestClientFactory partitionRequestClientFactory, ConnectionID connectionID) throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        Thread thread = new Thread(() -> {
            try {
                completableFuture.complete(null);
                partitionRequestClientFactory.createPartitionRequestClient(connectionID);
            } catch (InterruptedException e) {
                completableFuture2.complete(null);
            } catch (Exception e2) {
                completableFuture2.completeExceptionally(e2);
            }
        });
        thread.start();
        completableFuture.get();
        thread.interrupt();
        completableFuture2.get();
    }

    @Test
    public void testExceptionsAreNotCached() throws Exception {
        NettyTestUtil.NettyServerAndClient createNettyServerAndClient = createNettyServerAndClient();
        try {
            PartitionRequestClientFactory partitionRequestClientFactory = new PartitionRequestClientFactory(new UnstableNettyClient(createNettyServerAndClient.client(), 1), 0);
            ConnectionID connectionID = createNettyServerAndClient.getConnectionID(0);
            try {
                partitionRequestClientFactory.createPartitionRequestClient(connectionID);
                Assert.fail("Expected the first request to fail.");
            } catch (RemoteTransportException e) {
            }
            partitionRequestClientFactory.createPartitionRequestClient(connectionID);
            createNettyServerAndClient.client().shutdown();
            createNettyServerAndClient.server().shutdown();
        } catch (Throwable th) {
            createNettyServerAndClient.client().shutdown();
            createNettyServerAndClient.server().shutdown();
            throw th;
        }
    }

    @Test
    public void testNettyClientConnectRetry() throws Exception {
        NettyTestUtil.NettyServerAndClient createNettyServerAndClient = createNettyServerAndClient();
        new PartitionRequestClientFactory(new UnstableNettyClient(createNettyServerAndClient.client(), 2), 2).createPartitionRequestClient(createNettyServerAndClient.getConnectionID(0));
        createNettyServerAndClient.client().shutdown();
        createNettyServerAndClient.server().shutdown();
    }

    @Test(expected = IOException.class)
    public void testFailureReportedToSubsequentRequests() throws Exception {
        PartitionRequestClientFactory partitionRequestClientFactory = new PartitionRequestClientFactory(new FailingNettyClient(), 2);
        try {
            partitionRequestClientFactory.createPartitionRequestClient(new ConnectionID(new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0));
        } catch (Exception e) {
        }
        partitionRequestClientFactory.createPartitionRequestClient(new ConnectionID(new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0));
    }

    @Test(expected = IOException.class)
    public void testNettyClientConnectRetryFailure() throws Exception {
        NettyTestUtil.NettyServerAndClient createNettyServerAndClient = createNettyServerAndClient();
        try {
            new PartitionRequestClientFactory(new UnstableNettyClient(createNettyServerAndClient.client(), 3), 2).createPartitionRequestClient(createNettyServerAndClient.getConnectionID(0));
            createNettyServerAndClient.client().shutdown();
            createNettyServerAndClient.server().shutdown();
        } catch (Throwable th) {
            createNettyServerAndClient.client().shutdown();
            createNettyServerAndClient.server().shutdown();
            throw th;
        }
    }

    @Test
    public void testNettyClientConnectRetryMultipleThread() throws Exception {
        NettyTestUtil.NettyServerAndClient createNettyServerAndClient = createNettyServerAndClient();
        PartitionRequestClientFactory partitionRequestClientFactory = new PartitionRequestClientFactory(new UnstableNettyClient(createNettyServerAndClient.client(), 2), 2);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(newFixedThreadPool.submit(() -> {
                NettyPartitionRequestClient nettyPartitionRequestClient = null;
                try {
                    nettyPartitionRequestClient = partitionRequestClientFactory.createPartitionRequestClient(createNettyServerAndClient.getConnectionID(0));
                } catch (Exception e) {
                    Assert.fail(e.getMessage());
                }
                return nettyPartitionRequestClient;
            }));
        }
        arrayList.forEach(future -> {
            try {
                Assert.assertNotNull((NettyPartitionRequestClient) future.get());
            } catch (Exception e) {
                System.out.println(e.getMessage());
                Assert.fail();
            }
        });
        newFixedThreadPool.shutdown();
        createNettyServerAndClient.client().shutdown();
        createNettyServerAndClient.server().shutdown();
    }

    private NettyTestUtil.NettyServerAndClient createNettyServerAndClient() throws Exception {
        return NettyTestUtil.initServerAndClient(new NettyProtocol(null, null) { // from class: org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactoryTest.1
            public ChannelHandler[] getServerChannelHandlers() {
                return new ChannelHandler[10];
            }

            public ChannelHandler[] getClientChannelHandlers() {
                return new ChannelHandler[]{(ChannelHandler) Mockito.mock(NetworkClientHandler.class)};
            }
        });
    }

    private static Tuple2<NettyServer, NettyClient> createNettyServerAndClient(NettyProtocol nettyProtocol) throws IOException {
        NettyConfig nettyConfig = new NettyConfig(InetAddress.getLocalHost(), NettyTestUtil.createPortIterator(), 32768, 1, new Configuration());
        NettyServer nettyServer = new NettyServer(nettyConfig);
        NettyClient nettyClient = new NettyClient(nettyConfig);
        boolean z = false;
        try {
            NettyBufferPool nettyBufferPool = new NettyBufferPool(1);
            nettyServer.init(nettyProtocol, nettyBufferPool);
            nettyClient.init(nettyProtocol, nettyBufferPool);
            z = true;
            if (1 == 0) {
                nettyServer.shutdown();
                nettyClient.shutdown();
            }
            return new Tuple2<>(nettyServer, nettyClient);
        } catch (Throwable th) {
            if (!z) {
                nettyServer.shutdown();
                nettyClient.shutdown();
            }
            throw th;
        }
    }
}
