package org.apache.hadoop.hdfs.server.federation.router;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.federation.FederationTestUtils;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.class */
public class TestObserverWithRouter {
    private MiniRouterDFSCluster cluster;

    public void startUpCluster(int i) throws Exception {
        int i2 = 2 + i;
        Configuration configuration = new Configuration();
        configuration.setBoolean("dfs.federation.router.observer.read.enable", true);
        configuration.setInt("dfs.federation.router.observer.auto-msync-period", 0);
        configuration.setBoolean("dfs.ha.tail-edits.in-progress", true);
        configuration.set("dfs.ha.tail-edits.period", "0ms");
        this.cluster = new MiniRouterDFSCluster(true, 1, i2);
        this.cluster.addNamenodeOverrides(configuration);
        this.cluster.startCluster();
        this.cluster.addRouterOverrides(configuration);
        if (this.cluster.isHighAvailability()) {
            for (String str : this.cluster.getNameservices()) {
                this.cluster.switchToActive(str, FederationTestUtils.NAMENODES[0]);
                this.cluster.switchToStandby(str, FederationTestUtils.NAMENODES[1]);
                for (int i3 = 2; i3 < i2; i3++) {
                    this.cluster.switchToObserver(str, FederationTestUtils.NAMENODES[i3]);
                }
            }
        }
        this.cluster.startRouters();
        this.cluster.registerNamenodes();
        this.cluster.waitNamenodeRegistration();
        this.cluster.installMockLocations();
        this.cluster.waitActiveNamespaces();
    }

    @After
    public void teardown() throws IOException {
        if (this.cluster != null) {
            this.cluster.shutdown();
            this.cluster = null;
        }
    }

    @Test
    public void testObserverRead() throws Exception {
        startUpCluster(1);
        MiniRouterDFSCluster.RouterContext randomRouter = this.cluster.getRandomRouter();
        randomRouter.getRouter().initRpcMetricsCount();
        Assert.assertTrue("First namenode should be observer", ((FederationNamenodeContext) randomRouter.getRouter().getNamenodeResolver().getNamenodesForNameserviceId(this.cluster.getNameservices().get(0), true, false).get(0)).getState().equals(FederationNamenodeServiceState.OBSERVER));
        FileSystem fileSystem = randomRouter.getFileSystem();
        Path path = new Path("/testFile");
        fileSystem.create(path).close();
        fileSystem.open(path).close();
        Assert.assertEquals("Three call should send to active", 3L, ((AtomicLong) randomRouter.getRouter().rpcCountMap().get(FederationNamenodeServiceState.ACTIVE)).get());
        Assert.assertEquals("One call should send to observer", 1L, ((AtomicLong) randomRouter.getRouter().rpcCountMap().get(FederationNamenodeServiceState.OBSERVER)).get());
        fileSystem.close();
    }

    @Test
    public void testReadWhenObserverIsDown() throws Exception {
        startUpCluster(1);
        MiniRouterDFSCluster.RouterContext randomRouter = this.cluster.getRandomRouter();
        FileSystem fileSystem = randomRouter.getFileSystem();
        Path path = new Path("/testFile1");
        fileSystem.create(path).close();
        Assert.assertNotEquals("No observer found", 3L, stopObserver(1));
        fileSystem.open(path).close();
        Assert.assertEquals("Four call should send to active", 4L, ((AtomicLong) randomRouter.getRouter().rpcCountMap().get(FederationNamenodeServiceState.ACTIVE)).get());
        Assert.assertEquals("No call should send to observer", 0L, ((AtomicLong) randomRouter.getRouter().rpcCountMap().get(FederationNamenodeServiceState.OBSERVER)).get());
        fileSystem.close();
    }

    @Test
    public void disableObserverReadFromClient() throws Exception {
        startUpCluster(1);
        MiniRouterDFSCluster.RouterContext randomRouter = this.cluster.getRandomRouter();
        randomRouter.getConf().setBoolean("dfs.observer.read.enable", false);
        FileSystem fileSystem = randomRouter.getFileSystem();
        Path path = new Path("/testFile2");
        fileSystem.create(path).close();
        fileSystem.open(path).close();
        Assert.assertEquals("Four call should send to active", 3L, ((AtomicLong) randomRouter.getRouter().rpcCountMap().get(FederationNamenodeServiceState.ACTIVE)).get());
        Assert.assertEquals("No call should send to observer", 0L, ((AtomicLong) randomRouter.getRouter().rpcCountMap().get(FederationNamenodeServiceState.OBSERVER)).get());
        fileSystem.close();
    }

    @Test
    public void testMultipleObserver() throws Exception {
        startUpCluster(2);
        MiniRouterDFSCluster.RouterContext randomRouter = this.cluster.getRandomRouter();
        FileSystem fileSystem = randomRouter.getFileSystem();
        Path path = new Path("/testFile1");
        fileSystem.create(path).close();
        stopObserver(1);
        fileSystem.open(path).close();
        Assert.assertEquals("Four call should send to active", 3L, ((AtomicLong) randomRouter.getRouter().rpcCountMap().get(FederationNamenodeServiceState.ACTIVE)).get());
        Assert.assertEquals("Read should be success with another observer", 1L, ((AtomicLong) randomRouter.getRouter().rpcCountMap().get(FederationNamenodeServiceState.OBSERVER)).get());
        randomRouter.getRouter().initRpcMetricsCount();
        stopObserver(1);
        fileSystem.open(path).close();
        Assert.assertEquals("Four call should send to active", 2L, ((AtomicLong) randomRouter.getRouter().rpcCountMap().get(FederationNamenodeServiceState.ACTIVE)).get());
        Assert.assertEquals("No call should send to observer", 0L, ((AtomicLong) randomRouter.getRouter().rpcCountMap().get(FederationNamenodeServiceState.OBSERVER)).get());
        fileSystem.close();
    }

    private int stopObserver(int i) {
        int i2 = 0;
        while (i2 < this.cluster.getNamenodes().size()) {
            NameNode nameNode = this.cluster.getCluster().getNameNode(i2);
            if (nameNode != null && nameNode.isObserverState()) {
                this.cluster.getCluster().shutdownNameNode(i2);
                i--;
                if (i == 0) {
                    break;
                }
            }
            i2++;
        }
        return i2;
    }

    @Test
    public void testMultipleObserverRouter() throws Exception {
        StateStoreDFSCluster stateStoreDFSCluster = new StateStoreDFSCluster(true, 4, 4, TimeUnit.SECONDS.toMillis(5L), TimeUnit.SECONDS.toMillis(5L));
        Configuration build = new RouterConfigBuilder().stateStore().admin().rpc().enableLocalHeartbeat(true).heartbeat().build();
        new StringBuilder();
        String str = stateStoreDFSCluster.getNameservices().get(0);
        MiniRouterDFSCluster.NamenodeContext namenodeContext = stateStoreDFSCluster.getNamenodes(str).get(1);
        build.set("dfs.nameservice.id", str);
        build.set("dfs.ha.namenode.id", namenodeContext.getNamenodeId());
        StringBuilder sb = new StringBuilder();
        Iterator<MiniRouterDFSCluster.NamenodeContext> it = stateStoreDFSCluster.getNamenodes(stateStoreDFSCluster.getNameservices().get(1)).iterator();
        while (it.hasNext()) {
            String confSuffix = it.next().getConfSuffix();
            if (sb.length() != 0) {
                sb.append(",");
            }
            sb.append(confSuffix);
        }
        build.set("dfs.federation.router.monitor.namenode", sb.toString());
        build.setBoolean("dfs.federation.router.observer.read.enable", true);
        build.setInt("dfs.federation.router.observer.auto-msync-period", 0);
        build.setBoolean("dfs.ha.tail-edits.in-progress", true);
        build.set("dfs.ha.tail-edits.period", "0ms");
        stateStoreDFSCluster.addNamenodeOverrides(build);
        stateStoreDFSCluster.addRouterOverrides(build);
        stateStoreDFSCluster.startCluster();
        if (stateStoreDFSCluster.isHighAvailability()) {
            for (String str2 : stateStoreDFSCluster.getNameservices()) {
                stateStoreDFSCluster.switchToActive(str2, FederationTestUtils.NAMENODES[0]);
                stateStoreDFSCluster.switchToStandby(str2, FederationTestUtils.NAMENODES[1]);
                for (int i = 2; i < 4; i++) {
                    stateStoreDFSCluster.switchToObserver(str2, FederationTestUtils.NAMENODES[i]);
                }
            }
        }
        stateStoreDFSCluster.startRouters();
        stateStoreDFSCluster.waitClusterUp();
        MembershipNamenodeResolver namenodeResolver = stateStoreDFSCluster.getRandomRouter().getRouter().getNamenodeResolver();
        namenodeResolver.loadCache(true);
        List namenodesForNameserviceId = namenodeResolver.getNamenodesForNameserviceId(str, true, false);
        List namenodesForNameserviceId2 = namenodeResolver.getNamenodesForNameserviceId(str, true, false);
        Assert.assertEquals(((FederationNamenodeContext) namenodesForNameserviceId.get(0)).getState(), FederationNamenodeServiceState.OBSERVER);
        Assert.assertEquals(((FederationNamenodeContext) namenodesForNameserviceId2.get(0)).getState(), FederationNamenodeServiceState.OBSERVER);
        Assert.assertNotEquals(((FederationNamenodeContext) namenodesForNameserviceId.get(0)).getNamenodeId(), ((FederationNamenodeContext) namenodesForNameserviceId2.get(0)).getNamenodeId());
    }
}
