package org.apache.hadoop.hdfs.server.namenode.ha;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.net.BindException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Random;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.TestDataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.JournalSet;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.util.ThreadUtil;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.class */
public class TestStandbyCheckpoints {
    private static final int NUM_DIRS_IN_LOG = 200000;
    protected static int NUM_NNS;
    protected MiniDFSCluster cluster;
    protected FileSystem fs;
    protected File tmpOivImgDir;
    public static final Logger LOG;
    static final /* synthetic */ boolean $assertionsDisabled;
    protected NameNode[] nns = new NameNode[NUM_NNS];
    private final Random random = new Random();

    /* loaded from: input_file:org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints$SlowCodec.class */
    public static class SlowCodec extends GzipCodec {
        public CompressionOutputStream createOutputStream(OutputStream outputStream) throws IOException {
            CompressionOutputStream compressionOutputStream = (CompressionOutputStream) Mockito.spy(super.createOutputStream(outputStream));
            ((CompressionOutputStream) Mockito.doAnswer(new GenericTestUtils.SleepAnswer(5)).when(compressionOutputStream)).write((byte[]) Mockito.any(), Mockito.anyInt(), Mockito.anyInt());
            return compressionOutputStream;
        }
    }

    @Before
    public void setupCluster() throws Exception {
        Configuration configuration = setupCommonConfig();
        configuration.setInt("dfs.namenode.num.checkpoints.retained", 1);
        configuration.setInt("dfs.namenode.num.extra.edits.retained", 0);
        int i = 0;
        while (true) {
            try {
                int nextInt = 10060 + (this.random.nextInt(100) * 2);
                this.cluster = new MiniDFSCluster.Builder(configuration).nnTopology(new MiniDFSNNTopology().addNameservice(new MiniDFSNNTopology.NSConf(MiniQJMHACluster.NAMESERVICE).addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(nextInt)).addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(nextInt + 1)).addNN(new MiniDFSNNTopology.NNConf("nn3").setHttpPort(nextInt + 2)))).numDataNodes(1).build();
                this.cluster.waitActive();
                setNNs();
                this.fs = HATestUtil.configureFailoverFs(this.cluster, configuration);
                this.cluster.transitionToActive(0);
                i++;
                return;
            } catch (BindException e) {
                LOG.info("Set up MiniDFSCluster failed due to port conflicts, retry " + i + " times");
            }
        }
    }

    protected void setNNs() {
        for (int i = 0; i < NUM_NNS; i++) {
            this.nns[i] = this.cluster.getNameNode(i);
        }
    }

    protected Configuration setupCommonConfig() {
        this.tmpOivImgDir = GenericTestUtils.getTestDir("TestStandbyCheckpoints");
        this.tmpOivImgDir.mkdirs();
        Configuration configuration = new Configuration();
        configuration.setInt("dfs.namenode.checkpoint.check.period", 1);
        configuration.setInt("dfs.namenode.checkpoint.txns", 5);
        configuration.setInt("dfs.ha.tail-edits.period", 1000);
        configuration.set("dfs.namenode.legacy-oiv-image.dir", this.tmpOivImgDir.getAbsolutePath());
        configuration.setBoolean("dfs.image.compress", true);
        configuration.set("dfs.image.compression.codec", SlowCodec.class.getCanonicalName());
        CompressionCodecFactory.setCodecClasses(configuration, ImmutableList.of(SlowCodec.class));
        return configuration;
    }

    @After
    public void shutdownCluster() throws IOException {
        if (this.cluster != null) {
            this.cluster.shutdown();
            this.cluster = null;
        }
        if (this.tmpOivImgDir != null) {
            FileUtil.fullyDelete(this.tmpOivImgDir);
        }
    }

    @Test(timeout = 300000)
    public void testSBNCheckpoints() throws Exception {
        JournalSet spyOnJournalSet = NameNodeAdapter.spyOnJournalSet(this.nns[1]);
        doEdits(0, 10);
        HATestUtil.waitForStandbyToCatchUp(this.nns[0], this.nns[1]);
        HATestUtil.waitForCheckpoint(this.cluster, 1, ImmutableList.of(12));
        GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.hdfs.server.namenode.ha.TestStandbyCheckpoints.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.function.Supplier
            public Boolean get() {
                return TestStandbyCheckpoints.this.tmpOivImgDir.list().length > 0;
            }
        }, 1000L, 60000L);
        Assert.assertEquals("One file is expected", 1L, this.tmpOivImgDir.list().length);
        HATestUtil.waitForCheckpoint(this.cluster, 0, ImmutableList.of(12));
        ((JournalSet) Mockito.verify(spyOnJournalSet, Mockito.never())).purgeLogsOlderThan(Mockito.anyLong());
    }

    @Test
    public void testNewDirInitAfterCheckpointing() throws Exception {
        File file = new File(new File(PathUtils.getTestDir(TestStandbyCheckpoints.class), "testNewDirInitAfterCheckpointing"), "name1");
        if (!$assertionsDisabled && !file.mkdirs()) {
            throw new AssertionError();
        }
        this.cluster.getConfiguration(0).set("dfs.namenode.name.dir", this.cluster.getConfiguration(0).get("dfs.namenode.name.dir") + "," + Util.fileAsURI(file).toString());
        this.cluster.restartNameNode(0);
        this.nns[0] = this.cluster.getNameNode(0);
        this.cluster.transitionToActive(0);
        File file2 = new File(file, "current");
        File file3 = new File(file2, "VERSION");
        if (!$assertionsDisabled && !file2.exists()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && file3.exists()) {
            throw new AssertionError();
        }
        doEdits(0, 10);
        HATestUtil.waitForStandbyToCatchUp(this.nns[0], this.nns[1]);
        for (int i = 0; i < 20 && !file3.exists(); i++) {
            Thread.sleep(500L);
        }
        if (!$assertionsDisabled && !file3.exists()) {
            throw new AssertionError();
        }
    }

    @Test(timeout = 300000)
    public void testBothNodesInStandbyState() throws Exception {
        doEdits(0, 10);
        this.cluster.transitionToStandby(0);
        HATestUtil.waitForCheckpoint(this.cluster, 1, ImmutableList.of(12));
        HATestUtil.waitForCheckpoint(this.cluster, 0, ImmutableList.of(12));
        Assert.assertEquals(12L, this.nns[0].getNamesystem().getFSImage().getMostRecentCheckpointTxId());
        Assert.assertEquals(12L, this.nns[1].getNamesystem().getFSImage().getMostRecentCheckpointTxId());
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.addAll(FSImageTestUtil.getNameNodeCurrentDirs(this.cluster, 0));
        newArrayList.addAll(FSImageTestUtil.getNameNodeCurrentDirs(this.cluster, 1));
        FSImageTestUtil.assertParallelFilesAreIdentical(newArrayList, ImmutableSet.of());
    }

    @Test(timeout = 300000)
    public void testStandbyAndObserverState() throws Exception {
        this.cluster.transitionToObserver(2);
        doEdits(0, 10);
        this.nns[0].getRpcServer().rollEditLog();
        HATestUtil.waitForCheckpoint(this.cluster, 1, ImmutableList.of(12));
        HATestUtil.waitForCheckpoint(this.cluster, 0, ImmutableList.of(12));
        HATestUtil.waitForCheckpoint(this.cluster, 2, ImmutableList.of(12));
        Assert.assertEquals(12L, this.nns[2].getNamesystem().getFSImage().getMostRecentCheckpointTxId());
        Assert.assertEquals(12L, this.nns[1].getNamesystem().getFSImage().getMostRecentCheckpointTxId());
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.addAll(FSImageTestUtil.getNameNodeCurrentDirs(this.cluster, 2));
        newArrayList.addAll(FSImageTestUtil.getNameNodeCurrentDirs(this.cluster, 1));
        FSImageTestUtil.assertParallelFilesAreIdentical(newArrayList, ImmutableSet.of());
        this.cluster.transitionToStandby(2);
    }

    @Test(timeout = 30000)
    public void testCheckpointBeforeNameNodeInitializationIsComplete() throws Exception {
        LogVerificationAppender logVerificationAppender = new LogVerificationAppender();
        org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
        rootLogger.addAppender(logVerificationAppender);
        this.cluster.transitionToObserver(2);
        doEdits(0, 10);
        this.nns[0].getRpcServer().rollEditLog();
        NameNode nameNode = this.nns[2];
        FSImage andSetFSImageInHttpServer = NameNodeAdapter.getAndSetFSImageInHttpServer(nameNode, null);
        HATestUtil.waitForCheckpoint(this.cluster, 1, ImmutableList.of(12));
        HATestUtil.waitForCheckpoint(this.cluster, 0, ImmutableList.of(12));
        NameNodeAdapter.getAndSetFSImageInHttpServer(nameNode, andSetFSImageInHttpServer);
        this.cluster.transitionToStandby(2);
        rootLogger.removeAppender(logVerificationAppender);
        Iterator<LoggingEvent> it = logVerificationAppender.getLog().iterator();
        while (it.hasNext()) {
            String renderedMessage = it.next().getRenderedMessage();
            if (renderedMessage.contains("PutImage failed") && renderedMessage.contains("FSImage has not been set in the NameNode.")) {
                return;
            }
        }
        Assert.fail("Expected exception not present in logs.");
    }

    @Test(timeout = 300000)
    public void testCheckpointWhenNoNewTransactionsHappened() throws Exception {
        this.cluster.getConfiguration(1).setInt("dfs.namenode.checkpoint.period", 0);
        this.cluster.restartNameNode(1);
        this.nns[1] = this.cluster.getNameNode(1);
        FSImage spyOnFsImage = NameNodeAdapter.spyOnFsImage(this.nns[1]);
        Thread.sleep(1000L);
        ((FSImage) Mockito.verify(spyOnFsImage, Mockito.never())).saveNamespace((FSNamesystem) ArgumentMatchers.any());
        HATestUtil.waitForStandbyToCatchUp(this.nns[0], this.nns[1]);
        Thread.sleep(TestDataNodeFaultInjector.MetricsDataNodeFaultInjector.DELAY);
        ((FSImage) Mockito.verify(spyOnFsImage, Mockito.times(1))).saveNamespace((FSNamesystem) ArgumentMatchers.any(), (NNStorage.NameNodeFile) Mockito.eq(NNStorage.NameNodeFile.IMAGE), (Canceler) ArgumentMatchers.any());
    }

    @Test(timeout = 120000)
    public void testCheckpointCancellation() throws Exception {
        this.cluster.transitionToStandby(0);
        File file = new File(this.cluster.getSharedEditsDir(0, 1).getPath(), "current");
        File file2 = new File(MiniDFSCluster.getBaseDirectory(), "testCheckpointCancellation-tmp");
        FSImageTestUtil.createAbortedLogWithMkdirs(file2, NUM_DIRS_IN_LOG, 3L, this.cluster.getNamesystem(0).getFSDirectory().getLastInodeId() + 1);
        String inProgressEditsFileName = NNStorage.getInProgressEditsFileName(3L);
        new File(file2, inProgressEditsFileName).renameTo(new File(file, inProgressEditsFileName));
        this.cluster.getConfiguration(1).setInt("dfs.namenode.checkpoint.period", 0);
        this.cluster.restartNameNode(1);
        this.nns[1] = this.cluster.getNameNode(1);
        this.cluster.transitionToActive(0);
        boolean z = false;
        for (int i = 0; i < 10 && !z; i++) {
            doEdits(i * 10, (i * 10) + 10);
            this.cluster.transitionToStandby(0);
            this.cluster.transitionToActive(1);
            this.cluster.transitionToStandby(1);
            this.cluster.transitionToActive(0);
            z = StandbyCheckpointer.getCanceledCount() > 0;
        }
        Assert.assertTrue(z);
    }

    @Test(timeout = 60000)
    public void testCheckpointCancellationDuringUpload() throws Exception {
        this.cluster.getConfiguration(0).setInt("dfs.namenode.checkpoint.txns", 1000);
        for (int i = 0; i < NUM_NNS; i++) {
            this.cluster.getConfiguration(i).setBoolean("dfs.image.compress", false);
        }
        for (int i2 = 1; i2 < NUM_NNS; i2++) {
            this.cluster.getConfiguration(i2).setLong("dfs.image.transfer.bandwidthPerSec", 100L);
        }
        for (int i3 = 0; i3 < NUM_NNS; i3++) {
            this.cluster.restartNameNode(i3);
        }
        setNNs();
        this.cluster.transitionToActive(0);
        doEdits(0, 100);
        for (int i4 = 1; i4 < NUM_NNS; i4++) {
            HATestUtil.waitForStandbyToCatchUp(this.nns[0], this.nns[i4]);
            HATestUtil.waitForCheckpoint(this.cluster, i4, ImmutableList.of(104));
        }
        this.cluster.transitionToStandby(0);
        this.cluster.transitionToActive(1);
        this.cluster.shutdown();
        this.cluster = null;
        GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.hdfs.server.namenode.ha.TestStandbyCheckpoints.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.function.Supplier
            public Boolean get() {
                ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
                for (ThreadInfo threadInfo : threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 1)) {
                    if (threadInfo.getThreadName().startsWith("TransferFsImageUpload")) {
                        return false;
                    }
                }
                return true;
            }
        }, 1000L, 30000L);
        Assert.assertEquals(0L, this.nns[0].getFSImage().getMostRecentCheckpointTxId());
    }

    @Test(timeout = 300000)
    public void testStandbyExceptionThrownDuringCheckpoint() throws Exception {
        FSImage spyOnFsImage = NameNodeAdapter.spyOnFsImage(this.nns[1]);
        GenericTestUtils.DelayAnswer delayAnswer = new GenericTestUtils.DelayAnswer(LOG);
        ((FSImage) Mockito.doAnswer(delayAnswer).when(spyOnFsImage)).saveNamespace((FSNamesystem) ArgumentMatchers.any(FSNamesystem.class), (NNStorage.NameNodeFile) Mockito.eq(NNStorage.NameNodeFile.IMAGE), (Canceler) ArgumentMatchers.any(Canceler.class));
        doEdits(0, 1000);
        this.nns[0].getRpcServer().rollEditLog();
        delayAnswer.waitForCall();
        Assert.assertTrue("SBN is not performing checkpoint but it should be.", delayAnswer.getFireCount() == 1 && delayAnswer.getResultCount() == 0);
        ThreadUtil.sleepAtLeastIgnoreInterrupts(1000L);
        try {
            this.nns[1].getRpcServer().getFileInfo("/");
            Assert.fail("Should have thrown StandbyException, but instead succeeded.");
        } catch (StandbyException e) {
            GenericTestUtils.assertExceptionContains("is not supported", e);
        }
        Assert.assertEquals(0L, this.cluster.getNamesystem(1).getPendingDataNodeMessageCount());
        doCreate();
        Thread.sleep(1000L);
        Assert.assertTrue(this.cluster.getNamesystem(1).getPendingDataNodeMessageCount() > 0);
        Assert.assertTrue("SBN should have still been checkpointing.", delayAnswer.getFireCount() == 1 && delayAnswer.getResultCount() == 0);
        delayAnswer.proceed();
        delayAnswer.waitForResult();
        Assert.assertTrue("SBN should have finished checkpointing.", delayAnswer.getFireCount() == 1 && delayAnswer.getResultCount() == 1);
    }

    @Test(timeout = 300000)
    public void testReadsAllowedDuringCheckpoint() throws Exception {
        FSImage spyOnFsImage = NameNodeAdapter.spyOnFsImage(this.nns[1]);
        GenericTestUtils.DelayAnswer delayAnswer = new GenericTestUtils.DelayAnswer(LOG);
        ((FSImage) Mockito.doAnswer(delayAnswer).when(spyOnFsImage)).saveNamespace((FSNamesystem) ArgumentMatchers.any(FSNamesystem.class), (NNStorage.NameNodeFile) ArgumentMatchers.any(NNStorage.NameNodeFile.class), (Canceler) ArgumentMatchers.any(Canceler.class));
        doEdits(0, 1000);
        this.nns[0].getRpcServer().rollEditLog();
        delayAnswer.waitForCall();
        Assert.assertTrue("SBN is not performing checkpoint but it should be.", delayAnswer.getFireCount() == 1 && delayAnswer.getResultCount() == 0);
        ThreadUtil.sleepAtLeastIgnoreInterrupts(1000L);
        Thread thread = new Thread() { // from class: org.apache.hadoop.hdfs.server.namenode.ha.TestStandbyCheckpoints.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    TestStandbyCheckpoints.this.nns[1].getRpcServer().restoreFailedStorage("false");
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        };
        thread.start();
        ThreadUtil.sleepAtLeastIgnoreInterrupts(1000L);
        Assert.assertEquals(0L, this.nns[1].getNamesystem().getFsLockForTests().getQueueLength());
        Assert.assertFalse(this.nns[1].getNamesystem().getFsLockForTests().isWriteLocked());
        Assert.assertTrue(this.nns[1].getNamesystem().getCpLockForTests().hasQueuedThreads());
        Assert.assertTrue(DFSTestUtil.urlGet(new URL("http://" + this.nns[1].getHttpAddress().getHostName() + ":" + this.nns[1].getHttpAddress().getPort() + "/jmx")).contains("NumLiveDataNodes"));
        Assert.assertTrue("SBN should have still been checkpointing.", delayAnswer.getFireCount() == 1 && delayAnswer.getResultCount() == 0);
        delayAnswer.proceed();
        delayAnswer.waitForResult();
        Assert.assertTrue("SBN should have finished checkpointing.", delayAnswer.getFireCount() == 1 && delayAnswer.getResultCount() == 1);
        thread.join();
    }

    @Test(timeout = 300000)
    public void testNonPrimarySBNUploadFSImage() throws Exception {
        for (int i = 1; i < NUM_NNS; i++) {
            this.cluster.shutdownNameNode(i);
            this.cluster.getConfiguration(i).setInt("dfs.namenode.checkpoint.period", 1);
        }
        doEdits(0, 10);
        this.cluster.transitionToStandby(0);
        for (int i2 = 1; i2 < NUM_NNS; i2++) {
            this.cluster.restartNameNode(i2, false, new String[0]);
        }
        this.cluster.waitClusterUp();
        for (int i3 = 0; i3 < NUM_NNS; i3++) {
            HATestUtil.waitForCheckpoint(this.cluster, i3, ImmutableList.of(12));
        }
        this.cluster.transitionToActive(0);
        Thread.sleep(TestDataNodeFaultInjector.MetricsDataNodeFaultInjector.DELAY);
        doEdits(11, 20);
        this.nns[0].getRpcServer().rollEditLog();
        HATestUtil.waitForCheckpoint(this.cluster, 0, ImmutableList.of(23));
    }

    @Test(timeout = 300000)
    public void testCheckpointSucceedsWithLegacyOIVException() throws Exception {
        FileUtil.fullyDelete(this.tmpOivImgDir);
        doEdits(0, 10);
        HATestUtil.waitForStandbyToCatchUp(this.nns[0], this.nns[1]);
        HATestUtil.waitForCheckpoint(this.cluster, 1, ImmutableList.of(12));
        HATestUtil.waitForCheckpoint(this.cluster, 0, ImmutableList.of(12));
    }

    private void doEdits(int i, int i2) throws IOException {
        for (int i3 = i; i3 < i2; i3++) {
            this.fs.mkdirs(new Path("/test" + i3));
        }
    }

    private void doCreate() throws IOException {
        Path path = new Path("/testFile");
        this.fs.delete(path, false);
        FSDataOutputStream create = this.fs.create(path, (short) 1);
        create.write(42);
        create.close();
    }

    static {
        $assertionsDisabled = !TestStandbyCheckpoints.class.desiredAssertionStatus();
        NUM_NNS = 3;
        LOG = LoggerFactory.getLogger(TestStandbyCheckpoints.class);
    }
}
