package org.apache.hadoop.hdfs.server.namenode.ha;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.Thread;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcScheduler;
import org.apache.hadoop.ipc.Schedulable;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.class */
public class TestConsistentReadsObserver {
    public static final Logger LOG = LoggerFactory.getLogger(TestConsistentReadsObserver.class.getName());
    private static Configuration conf;
    private static MiniQJMHACluster qjmhaCluster;
    private static MiniDFSCluster dfsCluster;
    private static DistributedFileSystem dfs;
    private final Path testPath = new Path("/TestConsistentReadsObserver");

    /* loaded from: input_file:org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver$TestRpcScheduler.class */
    public static class TestRpcScheduler implements RpcScheduler {
        private int allowed = 10;

        public int getPriorityLevel(Schedulable schedulable) {
            return 0;
        }

        public boolean shouldBackOff(Schedulable schedulable) {
            int i = this.allowed - 1;
            this.allowed = i;
            return i < 0;
        }

        public void stop() {
        }
    }

    @BeforeClass
    public static void startUpCluster() throws Exception {
        conf = new Configuration();
        conf.setBoolean("dfs.namenode.state.context.enabled", true);
        qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 0, false);
        dfsCluster = qjmhaCluster.getDfsCluster();
    }

    @Before
    public void setUp() throws Exception {
        dfs = setObserverRead(true);
    }

    @After
    public void cleanUp() throws IOException {
        dfs.delete(this.testPath, true);
    }

    @AfterClass
    public static void shutDownCluster() throws IOException {
        if (qjmhaCluster != null) {
            qjmhaCluster.shutdown();
        }
    }

    @Test
    public void testRequeueCall() throws Exception {
        setObserverRead(true);
        NameNode nameNode = dfsCluster.getNameNode(2);
        int port = nameNode.getNameNodeAddress().getPort();
        Configuration configuration = dfsCluster.getConfiguration(2);
        Configuration configuration2 = new Configuration(configuration);
        String str = "ipc." + port + ".";
        configuration2.set(str + "scheduler.impl", TestRpcScheduler.class.getName());
        configuration2.setBoolean(str + "backoff.enable", true);
        NameNodeAdapter.getRpcServer(nameNode).refreshCallQueue(configuration2);
        dfs.create(this.testPath, (short) 1).close();
        assertSentTo(0);
        dfs.getFileStatus(this.testPath);
        assertSentTo(0);
        NameNodeAdapter.getRpcServer(nameNode).refreshCallQueue(configuration);
    }

    @Test
    public void testMsyncSimple() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        dfs.getClient().getHAServiceState();
        dfs.mkdir(this.testPath, FsPermission.getDefault());
        assertSentTo(0);
        new Thread(() -> {
            try {
                dfs.getFileStatus(this.testPath);
                atomicInteger.set(1);
            } catch (IOException e) {
                e.printStackTrace();
                atomicInteger.set(-1);
            }
        }).start();
        Assert.assertEquals(0L, atomicInteger.get());
        dfsCluster.rollEditLogAndTail(0);
        GenericTestUtils.waitFor(() -> {
            return Boolean.valueOf(atomicInteger.get() != 0);
        }, 100, 10000);
        Assert.assertEquals(1L, atomicInteger.get());
    }

    private void testMsync(boolean z, long j) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Configuration configuration = new Configuration(conf);
        configuration.setBoolean("fs.hdfs.impl.disable.cache", true);
        if (z) {
            configuration.setTimeDuration("dfs.client.failover.observer.auto-msync-period." + dfs.getUri().getHost(), j, TimeUnit.MILLISECONDS);
        }
        DistributedFileSystem distributedFileSystem = FileSystem.get(configuration);
        dfs.getClient().getHAServiceState();
        distributedFileSystem.getClient().getHAServiceState();
        dfs.mkdir(new Path("/test"), FsPermission.getDefault());
        dfsCluster.rollEditLogAndTail(0);
        dfs.mkdir(this.testPath, FsPermission.getDefault());
        assertSentTo(0);
        new Thread(() -> {
            try {
                if (!z) {
                    distributedFileSystem.msync();
                } else if (j > 0) {
                    Thread.sleep(j);
                }
                distributedFileSystem.getFileStatus(this.testPath);
                if (HATestUtil.isSentToAnyOfNameNodes(distributedFileSystem, dfsCluster, 2)) {
                    atomicInteger.set(1);
                } else {
                    atomicInteger.set(-1);
                }
            } catch (Exception e) {
                e.printStackTrace();
                atomicInteger.set(-1);
            }
        }).start();
        Thread.sleep(100L);
        Assert.assertEquals(0L, atomicInteger.get());
        dfsCluster.rollEditLogAndTail(0);
        GenericTestUtils.waitFor(() -> {
            return Boolean.valueOf(atomicInteger.get() != 0);
        }, 100, 3000);
        Assert.assertEquals(1L, atomicInteger.get());
    }

    @Test
    public void testExplicitMsync() throws Exception {
        testMsync(false, -1L);
    }

    @Test
    public void testAutoMsyncPeriod0() throws Exception {
        testMsync(true, 0L);
    }

    @Test
    public void testAutoMsyncPeriod5() throws Exception {
        testMsync(true, 5L);
    }

    @Test(expected = TimeoutException.class)
    public void testAutoMsyncLongPeriod() throws Exception {
        testMsync(true, Long.MAX_VALUE);
    }

    @Test
    public void testCallFromNewClient() throws Exception {
        dfsCluster.transitionToStandby(0);
        dfsCluster.transitionToObserver(0);
        dfsCluster.transitionToStandby(2);
        dfsCluster.transitionToActive(2);
        try {
            AtomicInteger atomicInteger = new AtomicInteger(0);
            dfs.getClient().getHAServiceState();
            dfs.mkdir(new Path("/test"), FsPermission.getDefault());
            dfsCluster.getNameNode(2).getRpcServer().rollEditLog();
            dfsCluster.getNameNode(0).getNamesystem().getEditLogTailer().doTailEdits();
            dfs.mkdir(this.testPath, FsPermission.getDefault());
            assertSentTo(2);
            Configuration configuration = new Configuration(conf);
            configuration.setBoolean("fs.hdfs.impl.disable.cache", true);
            DistributedFileSystem distributedFileSystem = FileSystem.get(configuration);
            distributedFileSystem.getClient().getHAServiceState();
            new Thread(() -> {
                try {
                    distributedFileSystem.getFileStatus(this.testPath);
                    atomicInteger.set(1);
                } catch (Exception e) {
                    e.printStackTrace();
                    atomicInteger.set(-1);
                }
            }).start();
            Thread.sleep(100L);
            Assert.assertEquals(0L, atomicInteger.get());
            dfsCluster.getNameNode(2).getRpcServer().rollEditLog();
            dfsCluster.getNameNode(0).getNamesystem().getEditLogTailer().doTailEdits();
            GenericTestUtils.waitFor(() -> {
                return Boolean.valueOf(atomicInteger.get() != 0);
            }, 100, 10000);
            Assert.assertEquals(1L, atomicInteger.get());
            dfsCluster.transitionToStandby(2);
            dfsCluster.transitionToObserver(2);
            dfsCluster.transitionToStandby(0);
            dfsCluster.transitionToActive(0);
        } catch (Throwable th) {
            dfsCluster.transitionToStandby(2);
            dfsCluster.transitionToObserver(2);
            dfsCluster.transitionToStandby(0);
            dfsCluster.transitionToActive(0);
            throw th;
        }
    }

    @Test
    public void testUncoordinatedCall() throws Exception {
        dfs.mkdir(this.testPath, FsPermission.getDefault());
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Thread thread = new Thread(() -> {
            try {
                dfs.getClient().getFileInfo("/");
                atomicInteger.set(1);
                Assert.fail("Should have been interrupted before getting here.");
            } catch (IOException e) {
                e.printStackTrace();
                atomicInteger.set(-1);
            }
        });
        thread.start();
        long now = Time.now();
        dfs.getClient().datanodeReport(HdfsConstants.DatanodeReportType.ALL);
        Assert.assertTrue(Time.now() - now < 200);
        Assert.assertEquals(0L, atomicInteger.get());
        Thread.sleep(5000L);
        Assert.assertEquals(0L, atomicInteger.get());
        Assert.assertEquals(Thread.State.WAITING, thread.getState());
        thread.interrupt();
    }

    @Test
    public void testRequestFromNonObserverProxyProvider() throws Exception {
        Configuration configuration = new Configuration(conf);
        HATestUtil.setFailoverConfigurations(configuration, HATestUtil.getLogicalHostname(dfsCluster), (List<InetSocketAddress>) Collections.singletonList(dfsCluster.getNameNode(2).getNameNodeAddress()), ConfiguredFailoverProxyProvider.class);
        configuration.setBoolean("fs.hdfs.impl.disable.cache", true);
        configuration.setInt("dfs.client.retry.max.attempts", 1);
        configuration.setInt("dfs.client.failover.max.attempts", 1);
        FileSystem fileSystem = FileSystem.get(configuration);
        dfs.mkdir(this.testPath, FsPermission.getDefault());
        dfsCluster.rollEditLogAndTail(0);
        try {
            fileSystem.listStatus(this.testPath);
            Assert.fail("listStatus should have thrown exception");
        } catch (RemoteException e) {
            IOException unwrapRemoteException = e.unwrapRemoteException();
            Assert.assertTrue("should have thrown StandbyException but got " + unwrapRemoteException.getClass().getSimpleName(), unwrapRemoteException instanceof StandbyException);
        }
    }

    @Test(timeout = 10000)
    public void testMsyncFileContext() throws Exception {
        NameNode nameNode = dfsCluster.getNameNode(0);
        NameNode nameNode2 = dfsCluster.getNameNode(2);
        Assert.assertEquals("nn0 is not active", HAServiceProtocol.HAServiceState.ACTIVE, nameNode.getRpcServer().getServiceStatus().getState());
        Assert.assertEquals("nn2 is not observer", HAServiceProtocol.HAServiceState.OBSERVER, nameNode2.getRpcServer().getServiceStatus().getState());
        FileContext fileContext = FileContext.getFileContext(conf);
        fileContext.getFsStatus(this.testPath);
        Path path = new Path(this.testPath, "testMsyncFileContext");
        fileContext.mkdir(path, FsPermission.getDefault(), true);
        fileContext.msync();
        dfsCluster.rollEditLogAndTail(0);
        LOG.info("State id active = {}, Stat id observer = {}", Long.valueOf(nameNode.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId()), Long.valueOf(nameNode2.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId()));
        try {
            fileContext.getFileStatus(path);
        } catch (FileNotFoundException e) {
            Assert.fail("File should exist on Observer after msync");
        }
    }

    private void assertSentTo(int i) throws IOException {
        Assert.assertTrue("Request was not sent to the expected namenode " + i, HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, i));
    }

    private DistributedFileSystem setObserverRead(boolean z) throws Exception {
        return HATestUtil.configureObserverReadFs(dfsCluster, conf, ObserverReadProxyProvider.class, z);
    }
}
