package org.apache.hive.org.apache.zookeeper.test;

import java.io.IOException;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hive.org.apache.zookeeper.AsyncCallback;
import org.apache.hive.org.apache.zookeeper.CreateMode;
import org.apache.hive.org.apache.zookeeper.KeeperException;
import org.apache.hive.org.apache.zookeeper.TestableZooKeeper;
import org.apache.hive.org.apache.zookeeper.WatchedEvent;
import org.apache.hive.org.apache.zookeeper.Watcher;
import org.apache.hive.org.apache.zookeeper.ZKTestCase;
import org.apache.hive.org.apache.zookeeper.ZooDefs;
import org.apache.hive.org.apache.zookeeper.data.Stat;
import org.apache.hive.org.apache.zookeeper.server.ZKDatabase;
import org.apache.hive.org.apache.zookeeper.server.quorum.Leader;
import org.apache.hive.org.apache.zookeeper.test.ClientBase;
import org.apache.hive.org.slf4j.Logger;
import org.apache.hive.org.slf4j.LoggerFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/hive/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.class */
public class FollowerResyncConcurrencyTest extends ZKTestCase {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FollowerResyncConcurrencyTest.class);
    public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
    private AtomicInteger counter = new AtomicInteger(0);
    private AtomicInteger errors = new AtomicInteger(0);
    private AtomicInteger pending = new AtomicInteger(0);

    /* loaded from: input_file:org/apache/hive/org/apache/zookeeper/test/FollowerResyncConcurrencyTest$MyWatcher.class */
    private class MyWatcher extends ClientBase.CountdownWatcher {
        LinkedBlockingQueue<WatchedEvent> events;

        private MyWatcher() {
            this.events = new LinkedBlockingQueue<>();
        }

        @Override // org.apache.hive.org.apache.zookeeper.test.ClientBase.CountdownWatcher, org.apache.hive.org.apache.zookeeper.Watcher
        public void process(WatchedEvent watchedEvent) {
            super.process(watchedEvent);
            if (watchedEvent.getType() != Watcher.Event.EventType.None) {
                try {
                    this.events.put(watchedEvent);
                } catch (InterruptedException e) {
                    FollowerResyncConcurrencyTest.LOG.warn("ignoring interrupt during event.put");
                }
            }
        }
    }

    @Before
    public void setUp() throws Exception {
        this.pending.set(0);
        this.errors.set(0);
        this.counter.set(0);
    }

    @After
    public void tearDown() throws Exception {
        LOG.info("Error count {}", Integer.valueOf(this.errors.get()));
    }

    @Test
    public void testLaggingFollowerResyncsUnderNewEpoch() throws Exception {
        ClientBase.CountdownWatcher countdownWatcher = new ClientBase.CountdownWatcher();
        ClientBase.CountdownWatcher countdownWatcher2 = new ClientBase.CountdownWatcher();
        ClientBase.CountdownWatcher countdownWatcher3 = new ClientBase.CountdownWatcher();
        QuorumUtil quorumUtil = new QuorumUtil(1);
        quorumUtil.shutdownAll();
        quorumUtil.start(1);
        quorumUtil.start(2);
        Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" + quorumUtil.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" + quorumUtil.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT));
        DisconnectableZooKeeper createClient = createClient(quorumUtil.getPeer(1).peer.getClientPort(), countdownWatcher);
        LOG.info("zk1 has session id 0x" + Long.toHexString(createClient.getSessionId()));
        createClient.create("/resyncundernewepoch", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        createClient.close();
        quorumUtil.shutdown(1);
        quorumUtil.shutdown(2);
        Assert.assertTrue("Waiting for server down", ClientBase.waitForServerDown("127.0.0.1:" + quorumUtil.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue("Waiting for server down", ClientBase.waitForServerDown("127.0.0.1:" + quorumUtil.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT));
        quorumUtil.start(1);
        quorumUtil.start(2);
        Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" + quorumUtil.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" + quorumUtil.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT));
        quorumUtil.start(3);
        Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" + quorumUtil.getPeer(3).clientPort, ClientBase.CONNECTION_TIMEOUT));
        DisconnectableZooKeeper createClient2 = createClient(quorumUtil.getPeer(1).peer.getClientPort(), countdownWatcher);
        LOG.info("zk1 has session id 0x" + Long.toHexString(createClient2.getSessionId()));
        Assert.assertNotNull("zk1 has data", createClient2.exists("/resyncundernewepoch", false));
        DisconnectableZooKeeper createClient3 = createClient(quorumUtil.getPeer(2).peer.getClientPort(), countdownWatcher2);
        LOG.info("zk2 has session id 0x" + Long.toHexString(createClient3.getSessionId()));
        Assert.assertNotNull("zk2 has data", createClient3.exists("/resyncundernewepoch", false));
        DisconnectableZooKeeper createClient4 = createClient(quorumUtil.getPeer(3).peer.getClientPort(), countdownWatcher3);
        LOG.info("zk3 has session id 0x" + Long.toHexString(createClient4.getSessionId()));
        Assert.assertNotNull("zk3 has data", createClient4.exists("/resyncundernewepoch", false));
        createClient2.close();
        createClient3.close();
        createClient4.close();
        quorumUtil.shutdownAll();
    }

    @Test
    public void testResyncBySnapThenDiffAfterFollowerCrashes() throws IOException, InterruptedException, KeeperException, Throwable {
        followerResyncCrashTest(false);
    }

    @Test
    public void testResyncByTxnlogThenDiffAfterFollowerCrashes() throws IOException, InterruptedException, KeeperException, Throwable {
        followerResyncCrashTest(true);
    }

    public void followerResyncCrashTest(boolean z) throws IOException, InterruptedException, KeeperException, Throwable {
        final Semaphore semaphore = new Semaphore(0);
        QuorumUtil quorumUtil = new QuorumUtil(1);
        quorumUtil.startAll();
        ClientBase.CountdownWatcher countdownWatcher = new ClientBase.CountdownWatcher();
        ClientBase.CountdownWatcher countdownWatcher2 = new ClientBase.CountdownWatcher();
        ClientBase.CountdownWatcher countdownWatcher3 = new ClientBase.CountdownWatcher();
        int i = 1;
        while (quorumUtil.getPeer(i).peer.leader == null) {
            i++;
        }
        Leader leader = quorumUtil.getPeer(i).peer.leader;
        Assert.assertNotNull(leader);
        if (z) {
            quorumUtil.getPeer(i).peer.getActiveServer().getZKDatabase().setSnapshotSizeFactor(1000.0d);
        } else {
            quorumUtil.getPeer(i).peer.getActiveServer().getZKDatabase().setSnapshotSizeFactor(-1.0d);
        }
        int i2 = i == 1 ? 2 : 1;
        LOG.info("Connecting to follower:" + i2);
        quorumUtil.shutdown(i2);
        final DisconnectableZooKeeper createClient = createClient(quorumUtil.getPeer(3).peer.getClientPort(), countdownWatcher3);
        LOG.info("zk3 has session id 0x" + Long.toHexString(createClient.getSessionId()));
        createClient.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        quorumUtil.restart(i2);
        DisconnectableZooKeeper createClient2 = createClient(quorumUtil.getPeer(i2).peer.getClientPort(), countdownWatcher);
        LOG.info("zk1 has session id 0x" + Long.toHexString(createClient2.getSessionId()));
        DisconnectableZooKeeper createClient3 = createClient(quorumUtil.getPeer(i2).peer.getClientPort(), countdownWatcher2);
        LOG.info("zk2 has session id 0x" + Long.toHexString(createClient3.getSessionId()));
        createClient2.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Thread thread = new Thread(new Runnable() { // from class: org.apache.hive.org.apache.zookeeper.test.FollowerResyncConcurrencyTest.1
            @Override // java.lang.Runnable
            public void run() {
                for (int i3 = 0; i3 < 3000; i3++) {
                    createClient.create("/mytestfoo", (byte[]) null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { // from class: org.apache.hive.org.apache.zookeeper.test.FollowerResyncConcurrencyTest.1.1
                        @Override // org.apache.hive.org.apache.zookeeper.AsyncCallback.StringCallback
                        public void processResult(int i4, String str, Object obj, String str2) {
                            FollowerResyncConcurrencyTest.this.pending.decrementAndGet();
                            FollowerResyncConcurrencyTest.this.counter.incrementAndGet();
                            if (i4 != 0) {
                                FollowerResyncConcurrencyTest.this.errors.incrementAndGet();
                            }
                            if (FollowerResyncConcurrencyTest.this.counter.get() == 16200) {
                                semaphore.release();
                            }
                        }
                    }, (Object) null);
                    FollowerResyncConcurrencyTest.this.pending.incrementAndGet();
                    if (i3 % 10 == 0) {
                        try {
                            Thread.sleep(100L);
                        } catch (Exception e) {
                        }
                    }
                }
            }
        });
        for (int i3 = 0; i3 < 13000; i3++) {
            createClient.create("/mybar", (byte[]) null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { // from class: org.apache.hive.org.apache.zookeeper.test.FollowerResyncConcurrencyTest.2
                @Override // org.apache.hive.org.apache.zookeeper.AsyncCallback.StringCallback
                public void processResult(int i4, String str, Object obj, String str2) {
                    FollowerResyncConcurrencyTest.this.pending.decrementAndGet();
                    FollowerResyncConcurrencyTest.this.counter.incrementAndGet();
                    if (i4 != 0) {
                        FollowerResyncConcurrencyTest.this.errors.incrementAndGet();
                    }
                    if (FollowerResyncConcurrencyTest.this.counter.get() == 16200) {
                        semaphore.release();
                    }
                }
            }, (Object) null);
            this.pending.incrementAndGet();
            if (i3 == 5000) {
                quorumUtil.shutdown(i2);
                LOG.info("Shutting down s1");
            }
            if (i3 == 12000) {
                thread.start();
                LOG.info("Restarting follower " + i2);
                quorumUtil.restart(i2);
                Thread.sleep(300L);
                LOG.info("Shutdown follower " + i2);
                quorumUtil.shutdown(i2);
                Thread.sleep(300L);
                LOG.info("Restarting follower " + i2);
                quorumUtil.restart(i2);
                LOG.info("Setting up server: " + i2);
            }
            if (i3 % 1000 == 0) {
                Thread.sleep(1000L);
            }
            if (i3 % 50 == 0) {
                createClient3.create("/newbaz", (byte[]) null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { // from class: org.apache.hive.org.apache.zookeeper.test.FollowerResyncConcurrencyTest.3
                    @Override // org.apache.hive.org.apache.zookeeper.AsyncCallback.StringCallback
                    public void processResult(int i4, String str, Object obj, String str2) {
                        FollowerResyncConcurrencyTest.this.pending.decrementAndGet();
                        FollowerResyncConcurrencyTest.this.counter.incrementAndGet();
                        if (i4 != 0) {
                            FollowerResyncConcurrencyTest.this.errors.incrementAndGet();
                        }
                        if (FollowerResyncConcurrencyTest.this.counter.get() == 16200) {
                            semaphore.release();
                        }
                    }
                }, (Object) null);
                this.pending.incrementAndGet();
            }
        }
        if (!semaphore.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
            LOG.warn("Did not aquire semaphore fast enough");
        }
        thread.join(ClientBase.CONNECTION_TIMEOUT);
        if (thread.isAlive()) {
            LOG.error("mytestfooThread is still alive");
        }
        Assert.assertTrue(waitForPendingRequests(60));
        Assert.assertTrue(waitForSync(quorumUtil, i2, 10));
        verifyState(quorumUtil, i2, leader);
        createClient2.close();
        createClient3.close();
        createClient.close();
        quorumUtil.shutdownAll();
    }

    @Test
    public void testResyncByDiffAfterFollowerCrashes() throws IOException, InterruptedException, KeeperException, Throwable {
        final Semaphore semaphore = new Semaphore(0);
        QuorumUtil quorumUtil = new QuorumUtil(1);
        quorumUtil.startAll();
        ClientBase.CountdownWatcher countdownWatcher = new ClientBase.CountdownWatcher();
        ClientBase.CountdownWatcher countdownWatcher2 = new ClientBase.CountdownWatcher();
        ClientBase.CountdownWatcher countdownWatcher3 = new ClientBase.CountdownWatcher();
        int i = 1;
        while (quorumUtil.getPeer(i).peer.leader == null) {
            i++;
        }
        Leader leader = quorumUtil.getPeer(i).peer.leader;
        Assert.assertNotNull(leader);
        int i2 = i == 1 ? 2 : 1;
        LOG.info("Connecting to follower:" + i2);
        DisconnectableZooKeeper createClient = createClient(quorumUtil.getPeer(i2).peer.getClientPort(), countdownWatcher);
        LOG.info("zk1 has session id 0x" + Long.toHexString(createClient.getSessionId()));
        DisconnectableZooKeeper createClient2 = createClient(quorumUtil.getPeer(i2).peer.getClientPort(), countdownWatcher2);
        LOG.info("zk2 has session id 0x" + Long.toHexString(createClient2.getSessionId()));
        final DisconnectableZooKeeper createClient3 = createClient(quorumUtil.getPeer(3).peer.getClientPort(), countdownWatcher3);
        LOG.info("zk3 has session id 0x" + Long.toHexString(createClient3.getSessionId()));
        createClient.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        createClient2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread(new Runnable() { // from class: org.apache.hive.org.apache.zookeeper.test.FollowerResyncConcurrencyTest.4
            @Override // java.lang.Runnable
            public void run() {
                int i3 = 0;
                while (i3 < 400) {
                    if (atomicBoolean.get()) {
                        createClient3.create("/mytestfoo", (byte[]) null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { // from class: org.apache.hive.org.apache.zookeeper.test.FollowerResyncConcurrencyTest.4.1
                            @Override // org.apache.hive.org.apache.zookeeper.AsyncCallback.StringCallback
                            public void processResult(int i4, String str, Object obj, String str2) {
                                FollowerResyncConcurrencyTest.this.pending.decrementAndGet();
                                FollowerResyncConcurrencyTest.this.counter.incrementAndGet();
                                if (i4 != 0) {
                                    FollowerResyncConcurrencyTest.this.errors.incrementAndGet();
                                }
                                if (FollowerResyncConcurrencyTest.this.counter.get() > 7300) {
                                    semaphore.release();
                                }
                            }
                        }, (Object) null);
                        FollowerResyncConcurrencyTest.this.pending.incrementAndGet();
                        try {
                            Thread.sleep(10L);
                        } catch (Exception e) {
                        }
                        i3++;
                    } else {
                        Thread.yield();
                    }
                }
            }
        });
        thread.start();
        for (int i3 = 0; i3 < 5000; i3++) {
            createClient2.create("/mybar", (byte[]) null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { // from class: org.apache.hive.org.apache.zookeeper.test.FollowerResyncConcurrencyTest.5
                @Override // org.apache.hive.org.apache.zookeeper.AsyncCallback.StringCallback
                public void processResult(int i4, String str, Object obj, String str2) {
                    FollowerResyncConcurrencyTest.this.pending.decrementAndGet();
                    FollowerResyncConcurrencyTest.this.counter.incrementAndGet();
                    if (i4 != 0) {
                        FollowerResyncConcurrencyTest.this.errors.incrementAndGet();
                    }
                    if (FollowerResyncConcurrencyTest.this.counter.get() > 7300) {
                        semaphore.release();
                    }
                }
            }, (Object) null);
            this.pending.incrementAndGet();
            if (i3 == 1000) {
                quorumUtil.shutdown(i2);
                Thread.sleep(1100L);
                LOG.info("Shutting down s1");
            }
            if (i3 == 1100 || i3 == 1150 || i3 == 1200) {
                Thread.sleep(1000L);
            }
            if (i3 == 1200) {
                quorumUtil.startThenShutdown(i2);
                atomicBoolean.set(true);
                quorumUtil.restart(i2);
                LOG.info("Setting up server: " + i2);
            }
            if (i3 >= 1000 && i3 % 2 == 0) {
                createClient3.create("/newbaz", (byte[]) null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { // from class: org.apache.hive.org.apache.zookeeper.test.FollowerResyncConcurrencyTest.6
                    @Override // org.apache.hive.org.apache.zookeeper.AsyncCallback.StringCallback
                    public void processResult(int i4, String str, Object obj, String str2) {
                        FollowerResyncConcurrencyTest.this.pending.decrementAndGet();
                        FollowerResyncConcurrencyTest.this.counter.incrementAndGet();
                        if (i4 != 0) {
                            FollowerResyncConcurrencyTest.this.errors.incrementAndGet();
                        }
                        if (FollowerResyncConcurrencyTest.this.counter.get() > 7300) {
                            semaphore.release();
                        }
                    }
                }, (Object) null);
                this.pending.incrementAndGet();
            }
            if (i3 == 1050 || i3 == 1100 || i3 == 1150) {
                Thread.sleep(1000L);
            }
        }
        if (!semaphore.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
            LOG.warn("Did not aquire semaphore fast enough");
        }
        thread.join(ClientBase.CONNECTION_TIMEOUT);
        if (thread.isAlive()) {
            LOG.error("mytestfooThread is still alive");
        }
        Assert.assertTrue(waitForPendingRequests(60));
        Assert.assertTrue(waitForSync(quorumUtil, i2, 10));
        verifyState(quorumUtil, i2, leader);
        createClient.close();
        createClient2.close();
        createClient3.close();
        quorumUtil.shutdownAll();
    }

    private static DisconnectableZooKeeper createClient(int i, ClientBase.CountdownWatcher countdownWatcher) throws IOException, TimeoutException, InterruptedException {
        DisconnectableZooKeeper disconnectableZooKeeper = new DisconnectableZooKeeper("127.0.0.1:" + i, ClientBase.CONNECTION_TIMEOUT, countdownWatcher);
        countdownWatcher.waitForConnected(CONNECTION_TIMEOUT);
        return disconnectableZooKeeper;
    }

    private boolean waitForPendingRequests(int i) throws InterruptedException {
        LOG.info("Wait for pending requests: " + this.pending.get());
        for (int i2 = 0; i2 < i; i2++) {
            Thread.sleep(1000L);
            if (this.pending.get() == 0) {
                return true;
            }
        }
        LOG.info("Timeout waiting for pending requests: " + this.pending.get());
        return false;
    }

    private boolean waitForSync(QuorumUtil quorumUtil, int i, int i2) throws InterruptedException {
        LOG.info("Wait for server to sync");
        int i3 = i == 1 ? 2 : 1;
        ZKDatabase zKDatabase = quorumUtil.getPeer(i).peer.getActiveServer().getZKDatabase();
        ZKDatabase zKDatabase2 = quorumUtil.getPeer(3).peer.getActiveServer().getZKDatabase();
        ZKDatabase zKDatabase3 = quorumUtil.getPeer(i3).peer.getActiveServer().getZKDatabase();
        long j = 0;
        long j2 = 0;
        long j3 = 0;
        for (int i4 = 0; i4 < i2; i4++) {
            j = zKDatabase3.getDataTreeLastProcessedZxid();
            j2 = zKDatabase2.getDataTreeLastProcessedZxid();
            j3 = zKDatabase.getDataTreeLastProcessedZxid();
            if (j == j2 && j == j3) {
                return true;
            }
            Thread.sleep(1000L);
        }
        LOG.info("Timeout waiting for zxid to sync: leader 0x" + Long.toHexString(j) + "clean 0x" + Long.toHexString(j2) + "restarted 0x" + Long.toHexString(j3));
        return false;
    }

    private static TestableZooKeeper createTestableClient(String str) throws IOException, TimeoutException, InterruptedException {
        return createTestableClient(new ClientBase.CountdownWatcher(), str);
    }

    private static TestableZooKeeper createTestableClient(ClientBase.CountdownWatcher countdownWatcher, String str) throws IOException, TimeoutException, InterruptedException {
        TestableZooKeeper testableZooKeeper = new TestableZooKeeper(str, ClientBase.CONNECTION_TIMEOUT, countdownWatcher);
        countdownWatcher.waitForConnected(CONNECTION_TIMEOUT);
        return testableZooKeeper;
    }

    private void verifyState(QuorumUtil quorumUtil, int i, Leader leader) {
        LOG.info("Verifying state");
        Assert.assertTrue("Not following", quorumUtil.getPeer(i).peer.follower != null);
        long zxid = quorumUtil.getPeer(i).peer.getActiveServer().getZxid() >> 32;
        Assert.assertTrue("Zxid: " + quorumUtil.getPeer(i).peer.getActiveServer().getZKDatabase().getDataTreeLastProcessedZxid() + "Current epoch: " + zxid, zxid == (leader.getEpoch() >> 32));
        int i2 = i == 1 ? 2 : 1;
        Collection<Long> sessions = quorumUtil.getPeer(i).peer.getActiveServer().getZKDatabase().getSessions();
        Collection<Long> sessions2 = quorumUtil.getPeer(i2).peer.getActiveServer().getZKDatabase().getSessions();
        for (Long l : sessions) {
            Assert.assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessions2.contains(l));
        }
        Assert.assertEquals("Should have same number of sessions", sessions2.size(), sessions.size());
        ZKDatabase zKDatabase = quorumUtil.getPeer(i).peer.getActiveServer().getZKDatabase();
        ZKDatabase zKDatabase2 = quorumUtil.getPeer(3).peer.getActiveServer().getZKDatabase();
        ZKDatabase zKDatabase3 = quorumUtil.getPeer(i2).peer.getActiveServer().getZKDatabase();
        for (Long l2 : sessions) {
            LOG.info("Validating ephemeral for session id 0x" + Long.toHexString(l2.longValue()));
            Assert.assertTrue("Should have same set of sessions in both servers, did not expect: " + l2, sessions2.contains(l2));
            Set<String> ephemerals = zKDatabase.getEphemerals(l2.longValue());
            Set<String> ephemerals2 = zKDatabase2.getEphemerals(l2.longValue());
            for (String str : ephemerals2) {
                if (!ephemerals.contains(str)) {
                    LOG.info("Restarted follower doesn't contain ephemeral {} zxid 0x{}", str, Long.toHexString(zKDatabase2.getDataTree().getNode(str).stat.getMzxid()));
                }
            }
            for (String str2 : ephemerals) {
                if (!ephemerals2.contains(str2)) {
                    LOG.info("Restarted follower has extra ephemeral {} zxid 0x{}", str2, Long.toHexString(zKDatabase.getDataTree().getNode(str2).stat.getMzxid()));
                }
            }
            Set<String> ephemerals3 = zKDatabase3.getEphemerals(l2.longValue());
            for (String str3 : ephemerals3) {
                if (!ephemerals2.contains(str3)) {
                    LOG.info("Follower doesn't contain ephemeral from leader {} zxid 0x{}", str3, Long.toHexString(zKDatabase3.getDataTree().getNode(str3).stat.getMzxid()));
                }
            }
            for (String str4 : ephemerals2) {
                if (!ephemerals3.contains(str4)) {
                    LOG.info("Leader doesn't contain ephemeral from follower {} zxid 0x{}", str4, Long.toHexString(zKDatabase2.getDataTree().getNode(str4).stat.getMzxid()));
                }
            }
            Assert.assertEquals("Should have same number of ephemerals in both followers", ephemerals.size(), ephemerals2.size());
            Assert.assertEquals("Leader should equal follower", zKDatabase3.getEphemerals(l2.longValue()).size(), ephemerals2.size());
        }
    }

    @Test
    public void testFollowerSendsLastZxid() throws Exception {
        QuorumUtil quorumUtil = new QuorumUtil(1);
        quorumUtil.startAll();
        int i = 1;
        while (quorumUtil.getPeer(i).peer.follower == null) {
            i++;
        }
        LOG.info("Connecting to follower:" + i);
        TestableZooKeeper createTestableClient = createTestableClient("localhost:" + quorumUtil.getPeer(i).peer.getClientPort());
        Assert.assertEquals(0L, createTestableClient.testableLastZxid());
        createTestableClient.exists("/", false);
        long testableLastZxid = createTestableClient.testableLastZxid();
        Assert.assertTrue("lzxid:" + testableLastZxid + " > 0", testableLastZxid > 0);
        createTestableClient.close();
        quorumUtil.shutdownAll();
    }

    @Test
    public void testFollowerWatcherResync() throws Exception {
        QuorumUtil quorumUtil = new QuorumUtil(1);
        quorumUtil.startAll();
        int i = 1;
        while (quorumUtil.getPeer(i).peer.follower == null) {
            i++;
        }
        LOG.info("Connecting to follower:" + i);
        TestableZooKeeper createTestableClient = createTestableClient("localhost:" + quorumUtil.getPeer(i).peer.getClientPort());
        createTestableClient.create("/foo", "foo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        MyWatcher myWatcher = new MyWatcher();
        TestableZooKeeper createTestableClient2 = createTestableClient(myWatcher, "localhost:" + quorumUtil.getPeer(i).peer.getClientPort());
        createTestableClient2.exists("/foo", true);
        myWatcher.reset();
        createTestableClient2.testableConnloss();
        if (!myWatcher.clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
            Assert.fail("Unable to connect to server");
        }
        Assert.assertArrayEquals("foo".getBytes(), createTestableClient2.getData("/foo", false, (Stat) null));
        Assert.assertNull(myWatcher.events.poll(5L, TimeUnit.SECONDS));
        createTestableClient.close();
        createTestableClient2.close();
        quorumUtil.shutdownAll();
    }
}
