package org.apache.hadoop.hbase.master.procedure;

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({LargeTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.class */
public class TestWALProcedureStoreOnHDFS {
    private static final Log LOG = LogFactory.getLog(TestWALProcedureStoreOnHDFS.class);
    protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
    private WALProcedureStore store;

    private static void setupConf(Configuration configuration) {
        configuration.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3);
        configuration.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, 3);
        configuration.setInt("hbase.procedure.store.wal.wait.before.roll", 1000);
        configuration.setInt("hbase.procedure.store.wal.max.roll.retries", 5);
        configuration.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 5);
    }

    @Before
    public void setup() throws Exception {
        setupConf(UTIL.getConfiguration());
        MiniDFSCluster startMiniDFSCluster = UTIL.startMiniDFSCluster(3);
        this.store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), startMiniDFSCluster.getFileSystem(), new Path(new Path(startMiniDFSCluster.getFileSystem().getUri()), "/test-logs"));
        this.store.registerListener(new ProcedureStore.ProcedureStoreListener() { // from class: org.apache.hadoop.hbase.master.procedure.TestWALProcedureStoreOnHDFS.1
            @Override // org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener
            public void postSync() {
            }

            @Override // org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener
            public void abortProcess() {
                TestWALProcedureStoreOnHDFS.LOG.fatal("Abort the Procedure Store");
                TestWALProcedureStoreOnHDFS.this.store.stop(true);
            }
        });
        this.store.start(8);
        this.store.recoverLease();
    }

    @After
    public void tearDown() throws Exception {
        this.store.stop(false);
        UTIL.getDFSCluster().getFileSystem().delete(this.store.getWALDir(), true);
        try {
            UTIL.shutdownMiniCluster();
        } catch (Exception e) {
            LOG.warn("failure shutting down cluster", e);
        }
    }

    @Test(timeout = 60000, expected = RuntimeException.class)
    public void testWalAbortOnLowReplication() throws Exception {
        Assert.assertEquals(3L, UTIL.getDFSCluster().getDataNodes().size());
        LOG.info("Stop DataNode");
        UTIL.getDFSCluster().stopDataNode(0);
        Assert.assertEquals(2L, UTIL.getDFSCluster().getDataNodes().size());
        this.store.insert(new ProcedureTestingUtility.TestProcedure(1L, -1L), null);
        long j = 2;
        while (true) {
            long j2 = j;
            if (!this.store.isRunning()) {
                Assert.assertFalse(this.store.isRunning());
                Assert.fail("The store.insert() should throw an exeption");
                return;
            } else {
                Assert.assertEquals(2L, UTIL.getDFSCluster().getDataNodes().size());
                this.store.insert(new ProcedureTestingUtility.TestProcedure(j2, -1L), null);
                Thread.sleep(100L);
                j = j2 + 1;
            }
        }
    }

    @Test(timeout = 60000)
    public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception {
        Assert.assertEquals(3L, UTIL.getDFSCluster().getDataNodes().size());
        this.store.registerListener(new ProcedureStore.ProcedureStoreListener() { // from class: org.apache.hadoop.hbase.master.procedure.TestWALProcedureStoreOnHDFS.2
            @Override // org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener
            public void postSync() {
                Threads.sleepWithoutInterrupt(2000L);
            }

            @Override // org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener
            public void abortProcess() {
            }
        });
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        Thread[] threadArr = new Thread[(this.store.getNumThreads() * 2) + 1];
        for (int i = 0; i < threadArr.length; i++) {
            final long j = i + 1;
            threadArr[i] = new Thread() { // from class: org.apache.hadoop.hbase.master.procedure.TestWALProcedureStoreOnHDFS.3
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        TestWALProcedureStoreOnHDFS.LOG.debug("[S] INSERT " + j);
                        TestWALProcedureStoreOnHDFS.this.store.insert(new ProcedureTestingUtility.TestProcedure(j, -1L), null);
                        TestWALProcedureStoreOnHDFS.LOG.debug("[E] INSERT " + j);
                    } catch (RuntimeException e) {
                        atomicInteger.incrementAndGet();
                        TestWALProcedureStoreOnHDFS.LOG.debug("[F] INSERT " + j + ": " + e.getMessage());
                    }
                }
            };
            threadArr[i].start();
        }
        Thread.sleep(1000L);
        LOG.info("Stop DataNode");
        UTIL.getDFSCluster().stopDataNode(0);
        Assert.assertEquals(2L, UTIL.getDFSCluster().getDataNodes().size());
        for (Thread thread : threadArr) {
            thread.join();
        }
        Assert.assertFalse(this.store.isRunning());
        Assert.assertTrue(atomicInteger.toString(), atomicInteger.get() >= this.store.getNumThreads() && atomicInteger.get() < threadArr.length);
    }

    @Test(timeout = 60000)
    @Ignore
    public void testWalRollOnLowReplication() throws Exception {
        int i = 0;
        this.store.insert(new ProcedureTestingUtility.TestProcedure(1L, -1L), null);
        UTIL.getDFSCluster().restartDataNode(0);
        long j = 2;
        while (true) {
            long j2 = j;
            if (j2 >= 100) {
                Assert.assertTrue(this.store.isRunning());
                return;
            }
            this.store.insert(new ProcedureTestingUtility.TestProcedure(j2, -1L), null);
            waitForNumReplicas(3);
            Thread.sleep(100L);
            if (j2 % 30 == 0) {
                LOG.info("Restart Data Node");
                i++;
                UTIL.getDFSCluster().restartDataNode(i % 3);
            }
            j = j2 + 1;
        }
    }

    public void waitForNumReplicas(int i) throws Exception {
        while (UTIL.getDFSCluster().getDataNodes().size() < i) {
            Thread.sleep(100L);
        }
        for (int i2 = 0; i2 < i; i2++) {
            Iterator<DataNode> it = UTIL.getDFSCluster().getDataNodes().iterator();
            while (it.hasNext()) {
                DataNode next = it.next();
                while (!next.isDatanodeFullyStarted()) {
                    Thread.sleep(100L);
                }
            }
        }
    }
}
