package org.apache.zookeeper.server.quorum;

import java.io.DataOutputStream;
import java.io.File;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.thrift.protocol.TMultiplexedProtocol;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.server.quorum.QuorumCnxManager;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.test.ClientBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zookeeper/server/quorum/CnxManagerTest.class */
public class CnxManagerTest extends ZKTestCase {
    protected static final Logger LOG = LoggerFactory.getLogger(CnxManagerTest.class);
    protected static final int THRESHOLD = 4;
    int count;
    HashMap<Long, QuorumPeer.QuorumServer> peers;
    File[] peerTmpdir;
    int[] peerQuorumPort;
    int[] peerClientPort;

    /* loaded from: input_file:org/apache/zookeeper/server/quorum/CnxManagerTest$CnxManagerThread.class */
    class CnxManagerThread extends Thread {
        boolean failed = false;

        CnxManagerThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                QuorumCnxManager quorumCnxManager = new QuorumCnxManager(new QuorumPeer(CnxManagerTest.this.peers, CnxManagerTest.this.peerTmpdir[0], CnxManagerTest.this.peerTmpdir[0], CnxManagerTest.this.peerClientPort[0], 3, 0L, 1000, 2, 2));
                QuorumCnxManager.Listener listener = quorumCnxManager.listener;
                if (listener != null) {
                    listener.start();
                } else {
                    CnxManagerTest.LOG.error("Null listener when initializing cnx manager");
                }
                quorumCnxManager.toSend(1L, CnxManagerTest.this.createMsg(QuorumPeer.ServerState.LOOKING.ordinal(), 0L, -1L, 1L));
                QuorumCnxManager.Message message = null;
                int i = 1;
                while (message == null) {
                    int i2 = i;
                    i++;
                    if (i2 > 4) {
                        break;
                    }
                    message = quorumCnxManager.pollRecvQueue(3000L, TimeUnit.MILLISECONDS);
                    if (message == null) {
                        quorumCnxManager.connectAll();
                    }
                }
                if (i > 4) {
                    this.failed = true;
                    return;
                }
                quorumCnxManager.testInitiateConnection(1L);
                if (quorumCnxManager.pollRecvQueue(3000L, TimeUnit.MILLISECONDS) == null) {
                    this.failed = true;
                }
            } catch (Exception e) {
                CnxManagerTest.LOG.error("Exception while running mock thread", (Throwable) e);
                Assert.fail("Unexpected exception");
            }
        }
    }

    /* loaded from: input_file:org/apache/zookeeper/server/quorum/CnxManagerTest$TestCnxManager.class */
    class TestCnxManager extends QuorumCnxManager {
        TestCnxManager(QuorumPeer quorumPeer) {
            super(quorumPeer);
        }

        boolean senderWorkerMapContains(Long l) {
            return this.senderWorkerMap.containsKey(l);
        }

        long getSid(QuorumCnxManager.Message message) {
            return message.sid;
        }

        String getMsgString(QuorumCnxManager.Message message) {
            return new String(message.buffer.array());
        }
    }

    @Before
    public void setUp() throws Exception {
        this.count = 3;
        this.peers = new HashMap<>(this.count);
        this.peerTmpdir = new File[this.count];
        this.peerQuorumPort = new int[this.count];
        this.peerClientPort = new int[this.count];
        for (int i = 0; i < this.count; i++) {
            this.peerQuorumPort[i] = PortAssignment.unique();
            this.peerClientPort[i] = PortAssignment.unique();
            this.peers.put(Long.valueOf(i), new QuorumPeer.QuorumServer(i, new InetSocketAddress(this.peerQuorumPort[i]), new InetSocketAddress(PortAssignment.unique())));
            this.peerTmpdir[i] = ClientBase.createTmpDir();
        }
    }

    ByteBuffer createMsg(int i, long j, long j2, long j3) {
        return FastLeaderElection.buildMsg(i, j, j2, 0L, j3);
    }

    @Test
    public void testCnxManager() throws Exception {
        CnxManagerThread cnxManagerThread = new CnxManagerThread();
        cnxManagerThread.start();
        QuorumCnxManager quorumCnxManager = new QuorumCnxManager(new QuorumPeer(this.peers, this.peerTmpdir[1], this.peerTmpdir[1], this.peerClientPort[1], 3, 1L, 1000, 2, 2));
        QuorumCnxManager.Listener listener = quorumCnxManager.listener;
        if (listener != null) {
            listener.start();
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        quorumCnxManager.toSend(new Long(0L), createMsg(QuorumPeer.ServerState.LOOKING.ordinal(), 1L, -1L, 1L));
        QuorumCnxManager.Message message = null;
        int i = 1;
        while (message == null) {
            int i2 = i;
            i++;
            if (i2 > 4) {
                break;
            }
            message = quorumCnxManager.pollRecvQueue(3000L, TimeUnit.MILLISECONDS);
            if (message == null) {
                quorumCnxManager.connectAll();
            }
        }
        Assert.assertTrue("Exceeded number of retries", i <= 4);
        cnxManagerThread.join(5000L);
        if (cnxManagerThread.isAlive()) {
            Assert.fail("Thread didn't join");
        } else if (cnxManagerThread.failed) {
            Assert.fail("Did not receive expected message");
        }
    }

    @Test
    public void testCnxManagerTimeout() throws Exception {
        byte nextInt = (byte) new Random().nextInt();
        int unique = PortAssignment.unique();
        String str = new String("10.1.1." + ((int) nextInt));
        LOG.info("This is the dead address I'm trying: " + str);
        this.peers.put(2L, new QuorumPeer.QuorumServer(2L, new InetSocketAddress(str, unique), new InetSocketAddress(str, PortAssignment.unique())));
        this.peerTmpdir[2] = ClientBase.createTmpDir();
        QuorumCnxManager quorumCnxManager = new QuorumCnxManager(new QuorumPeer(this.peers, this.peerTmpdir[1], this.peerTmpdir[1], this.peerClientPort[1], 3, 1L, 1000, 2, 2));
        QuorumCnxManager.Listener listener = quorumCnxManager.listener;
        if (listener != null) {
            listener.start();
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        long currentTimeMillis = System.currentTimeMillis();
        quorumCnxManager.toSend(new Long(2L), createMsg(QuorumPeer.ServerState.LOOKING.ordinal(), 1L, -1L, 1L));
        if (System.currentTimeMillis() - currentTimeMillis > 6000) {
            Assert.fail("Waited more than necessary");
        }
    }

    @Test
    public void testCnxManagerSpinLock() throws Exception {
        QuorumPeer quorumPeer = new QuorumPeer(this.peers, this.peerTmpdir[1], this.peerTmpdir[1], this.peerClientPort[1], 3, 1L, 1000, 2, 2);
        QuorumCnxManager quorumCnxManager = new QuorumCnxManager(quorumPeer);
        QuorumCnxManager.Listener listener = quorumCnxManager.listener;
        if (listener != null) {
            listener.start();
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        int port = this.peers.get(Long.valueOf(quorumPeer.getId())).electionAddr.getPort();
        LOG.info("Election port: " + port);
        new InetSocketAddress(port);
        Thread.sleep(1000L);
        SocketChannel open = SocketChannel.open();
        open.socket().connect(this.peers.get(new Long(1L)).electionAddr, 5000);
        ByteBuffer wrap = ByteBuffer.wrap(new byte[8]);
        wrap.putLong(new Long(2L).longValue());
        wrap.position(0);
        open.write(wrap);
        ByteBuffer wrap2 = ByteBuffer.wrap(new byte[4]);
        wrap2.putInt(-20);
        wrap2.position(0);
        open.write(wrap2);
        Thread.sleep(1000L);
        for (int i = 0; i < 100; i++) {
            try {
                wrap2.position(0);
                open.write(wrap2);
            } catch (Exception e) {
                LOG.info("Socket has been closed as expected");
            }
        }
        Assert.fail("Socket has not been closed");
        quorumPeer.shutdown();
        quorumCnxManager.halt();
    }

    @Test
    public void testCnxFromFutureVersion() throws Exception {
        QuorumPeer quorumPeer = new QuorumPeer(this.peers, this.peerTmpdir[1], this.peerTmpdir[1], this.peerClientPort[1], 3, 1L, 1000, 2, 2);
        TestCnxManager testCnxManager = new TestCnxManager(quorumPeer);
        QuorumCnxManager.Listener listener = testCnxManager.listener;
        if (listener != null) {
            listener.start();
        } else {
            Assert.fail("Null listener when initializing cnx manager");
        }
        LOG.info("Election port: " + this.peers.get(Long.valueOf(quorumPeer.getId())).electionAddr.getPort());
        Thread.sleep(1000L);
        SocketChannel open = SocketChannel.open();
        open.socket().connect(this.peers.get(new Long(1L)).electionAddr, 5000);
        InetSocketAddress inetSocketAddress = this.peers.get(new Long(2L)).electionAddr;
        DataOutputStream dataOutputStream = new DataOutputStream(open.socket().getOutputStream());
        dataOutputStream.writeLong(-65536L);
        dataOutputStream.writeLong(new Long(2L).longValue());
        byte[] bytes = (inetSocketAddress.getHostName() + TMultiplexedProtocol.SEPARATOR + inetSocketAddress.getPort()).getBytes();
        dataOutputStream.writeInt(bytes.length);
        dataOutputStream.write(bytes);
        dataOutputStream.flush();
        Thread.sleep(1000L);
        Assert.assertEquals("Server 1 got connection request from server 2", true, Boolean.valueOf(testCnxManager.senderWorkerMapContains(new Long(2L))));
        byte[] bytes2 = "this is a test message string".getBytes();
        dataOutputStream.writeInt(bytes2.length);
        dataOutputStream.write(bytes2);
        dataOutputStream.flush();
        QuorumCnxManager.Message message = null;
        int i = 1;
        while (message == null) {
            int i2 = i;
            i++;
            if (i2 > 4) {
                break;
            }
            message = testCnxManager.pollRecvQueue(3000L, TimeUnit.MILLISECONDS);
            if (message == null) {
                testCnxManager.connectAll();
            }
        }
        if (i > 4) {
            Assert.fail("Test message hasn't been found in recvQueue");
        }
        Assert.assertEquals("Message sender should be 2", 2L, testCnxManager.getSid(message));
        Assert.assertEquals("Message from 2 doesn't match test sring", "this is a test message string", testCnxManager.getMsgString(message));
        quorumPeer.shutdown();
        testCnxManager.halt();
    }

    @Test
    public void testSocketTimeout() throws Exception {
        QuorumPeer quorumPeer = new QuorumPeer(this.peers, this.peerTmpdir[1], this.peerTmpdir[1], this.peerClientPort[1], 3, 1L, 2000, 2, 2);
        QuorumCnxManager quorumCnxManager = new QuorumCnxManager(quorumPeer);
        QuorumCnxManager.Listener listener = quorumCnxManager.listener;
        if (listener != null) {
            listener.start();
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        int port = this.peers.get(Long.valueOf(quorumPeer.getId())).electionAddr.getPort();
        LOG.info("Election port: " + port);
        new InetSocketAddress(port);
        Thread.sleep(1000L);
        Socket socket = new Socket();
        socket.connect(this.peers.get(new Long(1L)).electionAddr, 5000);
        long currentTimeMillis = System.currentTimeMillis();
        quorumCnxManager.receiveConnection(socket);
        if (System.currentTimeMillis() - currentTimeMillis > (quorumPeer.getSyncLimit() * quorumPeer.getTickTime()) + 500) {
            Assert.fail("Waited more than necessary");
        }
    }

    @Test
    public void testWorkerThreads() throws Exception {
        ArrayList<QuorumPeer> arrayList = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            try {
                QuorumPeer quorumPeer = new QuorumPeer(this.peers, this.peerTmpdir[i], this.peerTmpdir[i], this.peerClientPort[i], 3, i, 1000, 2, 2);
                LOG.info("Starting peer {}", Long.valueOf(quorumPeer.getId()));
                quorumPeer.start();
                arrayList.add(i, quorumPeer);
            } finally {
                Iterator<QuorumPeer> it = arrayList.iterator();
                while (it.hasNext()) {
                    it.next().shutdown();
                }
            }
        }
        String verifyThreadCount = verifyThreadCount(arrayList, 4L);
        if (verifyThreadCount != null) {
            Assert.fail(verifyThreadCount);
        }
        for (int i2 = 0; i2 < 3; i2++) {
            for (int i3 = 0; i3 < 5; i3++) {
                QuorumPeer quorumPeer2 = arrayList.get(i2);
                LOG.info("Round {}, halting peer {}", Integer.valueOf(i3), Long.valueOf(quorumPeer2.getId()));
                quorumPeer2.shutdown();
                arrayList.remove(i2);
                String verifyThreadCount2 = verifyThreadCount(arrayList, 2L);
                Assert.assertNull(verifyThreadCount2, verifyThreadCount2);
                QuorumPeer quorumPeer3 = new QuorumPeer(this.peers, this.peerTmpdir[i2], this.peerTmpdir[i2], this.peerClientPort[i2], 3, i2, 1000, 2, 2);
                LOG.info("Round {}, restarting peer {}" + new Object[]{Integer.valueOf(i3), Long.valueOf(quorumPeer3.getId())});
                quorumPeer3.start();
                arrayList.add(i2, quorumPeer3);
                String verifyThreadCount3 = verifyThreadCount(arrayList, 4L);
                Assert.assertNull(verifyThreadCount3, verifyThreadCount3);
            }
        }
    }

    public String verifyThreadCount(ArrayList<QuorumPeer> arrayList, long j) throws InterruptedException {
        String str = null;
        for (int i = 0; i < 480; i++) {
            Thread.sleep(500L);
            str = _verifyThreadCount(arrayList, j);
            if (str == null) {
                return null;
            }
        }
        return str;
    }

    public String _verifyThreadCount(ArrayList<QuorumPeer> arrayList, long j) {
        for (int i = 0; i < arrayList.size(); i++) {
            long threadCount = arrayList.get(i).getQuorumCnxManager().getThreadCount();
            if (threadCount != j) {
                return new String(new Date() + " Incorrect number of Worker threads for sid=" + i + " expected " + j + " found " + threadCount);
            }
        }
        return null;
    }
}
