package org.apache.flink.runtime.leaderelection;

import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/runtime/leaderelection/LeaderElectionTest.class */
public class LeaderElectionTest extends TestLogger {
    private final ServiceClass serviceClass;

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/LeaderElectionTest$EmbeddedServiceClass.class */
    private static final class EmbeddedServiceClass implements ServiceClass {
        private EmbeddedLeaderService embeddedLeaderService;

        private EmbeddedServiceClass() {
        }

        @Override // org.apache.flink.runtime.leaderelection.LeaderElectionTest.ServiceClass
        public void setup() {
            this.embeddedLeaderService = new EmbeddedLeaderService(TestingUtils.defaultExecutionContext());
        }

        @Override // org.apache.flink.runtime.leaderelection.LeaderElectionTest.ServiceClass
        public void teardown() {
            if (this.embeddedLeaderService != null) {
                this.embeddedLeaderService.shutdown();
                this.embeddedLeaderService = null;
            }
        }

        @Override // org.apache.flink.runtime.leaderelection.LeaderElectionTest.ServiceClass
        public LeaderElectionService createLeaderElectionService() throws Exception {
            return this.embeddedLeaderService.createLeaderElectionService();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/LeaderElectionTest$LeaderElectionType.class */
    enum LeaderElectionType {
        ZooKeeper,
        Embedded,
        Standalone
    }

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/LeaderElectionTest$ManualLeaderContender.class */
    private static final class ManualLeaderContender implements LeaderContender {
        private static final UUID NULL_LEADER_SESSION_ID = new UUID(0, 0);
        private final ArrayBlockingQueue<UUID> leaderSessionIds;
        private volatile Exception exception;

        private ManualLeaderContender() {
            this.leaderSessionIds = new ArrayBlockingQueue<>(10);
        }

        public void grantLeadership(UUID uuid) {
            this.leaderSessionIds.offer(uuid);
        }

        public void revokeLeadership() {
            this.leaderSessionIds.offer(NULL_LEADER_SESSION_ID);
        }

        public String getDescription() {
            return "foobar";
        }

        public void handleError(Exception exc) {
            this.exception = exc;
        }

        void rethrowError() throws Exception {
            if (this.exception != null) {
                throw this.exception;
            }
        }

        UUID waitForLeaderSessionId() throws InterruptedException {
            return this.leaderSessionIds.poll(1L, TimeUnit.MINUTES);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/LeaderElectionTest$ServiceClass.class */
    private interface ServiceClass {
        void setup() throws Exception;

        void teardown() throws Exception;

        LeaderElectionService createLeaderElectionService() throws Exception;
    }

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/LeaderElectionTest$StandaloneServiceClass.class */
    private static final class StandaloneServiceClass implements ServiceClass {
        private StandaloneServiceClass() {
        }

        @Override // org.apache.flink.runtime.leaderelection.LeaderElectionTest.ServiceClass
        public void setup() throws Exception {
        }

        @Override // org.apache.flink.runtime.leaderelection.LeaderElectionTest.ServiceClass
        public void teardown() throws Exception {
        }

        @Override // org.apache.flink.runtime.leaderelection.LeaderElectionTest.ServiceClass
        public LeaderElectionService createLeaderElectionService() throws Exception {
            return new StandaloneLeaderElectionService();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/LeaderElectionTest$ZooKeeperServiceClass.class */
    private static final class ZooKeeperServiceClass implements ServiceClass {
        private TestingServer testingServer;
        private CuratorFramework client;
        private Configuration configuration;

        private ZooKeeperServiceClass() {
        }

        @Override // org.apache.flink.runtime.leaderelection.LeaderElectionTest.ServiceClass
        public void setup() throws Exception {
            try {
                this.testingServer = new TestingServer();
                this.configuration = new Configuration();
                this.configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, this.testingServer.getConnectString());
                this.configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
                this.client = ZooKeeperUtils.startCuratorFramework(this.configuration);
            } catch (Exception e) {
                throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
            }
        }

        @Override // org.apache.flink.runtime.leaderelection.LeaderElectionTest.ServiceClass
        public void teardown() throws Exception {
            if (this.client != null) {
                this.client.close();
                this.client = null;
            }
            if (this.testingServer != null) {
                this.testingServer.stop();
                this.testingServer = null;
            }
        }

        @Override // org.apache.flink.runtime.leaderelection.LeaderElectionTest.ServiceClass
        public LeaderElectionService createLeaderElectionService() throws Exception {
            return ZooKeeperUtils.createLeaderElectionService(this.client, this.configuration);
        }
    }

    @Parameterized.Parameters(name = "Leader election: {0}")
    public static Collection<LeaderElectionType> parameters() {
        return Arrays.asList(LeaderElectionType.values());
    }

    public LeaderElectionTest(LeaderElectionType leaderElectionType) {
        switch (leaderElectionType) {
            case ZooKeeper:
                this.serviceClass = new ZooKeeperServiceClass();
                return;
            case Embedded:
                this.serviceClass = new EmbeddedServiceClass();
                return;
            case Standalone:
                this.serviceClass = new StandaloneServiceClass();
                return;
            default:
                throw new IllegalArgumentException(String.format("Unknown leader election type: %s.", leaderElectionType));
        }
    }

    @Before
    public void setup() throws Exception {
        this.serviceClass.setup();
    }

    @After
    public void teardown() throws Exception {
        this.serviceClass.teardown();
    }

    @Test
    public void testHasLeadership() throws Exception {
        LeaderElectionService createLeaderElectionService = this.serviceClass.createLeaderElectionService();
        ManualLeaderContender manualLeaderContender = new ManualLeaderContender();
        try {
            Assert.assertThat(Boolean.valueOf(createLeaderElectionService.hasLeadership(UUID.randomUUID())), Matchers.is(false));
            createLeaderElectionService.start(manualLeaderContender);
            UUID waitForLeaderSessionId = manualLeaderContender.waitForLeaderSessionId();
            Assert.assertThat(Boolean.valueOf(createLeaderElectionService.hasLeadership(waitForLeaderSessionId)), Matchers.is(true));
            Assert.assertThat(Boolean.valueOf(createLeaderElectionService.hasLeadership(UUID.randomUUID())), Matchers.is(false));
            createLeaderElectionService.confirmLeadership(waitForLeaderSessionId, "foobar");
            Assert.assertThat(Boolean.valueOf(createLeaderElectionService.hasLeadership(waitForLeaderSessionId)), Matchers.is(true));
            createLeaderElectionService.stop();
            Assert.assertThat(Boolean.valueOf(createLeaderElectionService.hasLeadership(waitForLeaderSessionId)), Matchers.is(false));
            manualLeaderContender.rethrowError();
        } catch (Throwable th) {
            manualLeaderContender.rethrowError();
            throw th;
        }
    }
}
