package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.mapreduce.ImportTsv;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSink;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({LargeTests.class, ReplicationTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/replication/TestBulkloadReplicationMetrics.class */
public class TestBulkloadReplicationMetrics {
    final byte[] FAMILY = "f1".getBytes();
    private static Configuration conf2;
    private static HBaseTestingUtility utility1;
    private static HBaseTestingUtility utility2;
    private static final String DELIMITER = ",";
    private static final Logger LOG = LoggerFactory.getLogger(TestBulkloadReplicationMetrics.class);

    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestBulkloadReplicationMetrics.class);
    private static Configuration conf1 = HBaseConfiguration.create();
    private static int fileCounter = 1;
    private static TestParams testParams = new TestParams();

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/TestBulkloadReplicationMetrics$CustomReplicationEndpoint.class */
    public static class CustomReplicationEndpoint extends HBaseInterClusterReplicationEndpoint {
        private TestParams localTestParams;

        public void init(ReplicationEndpoint.Context context) throws IOException {
            super.init(context);
            this.localTestParams = TestBulkloadReplicationMetrics.testParams;
        }

        public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
            while (TestBulkloadReplicationMetrics.testParams.pauseReplication) {
                this.localTestParams.paused = true;
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                }
            }
            boolean replicate = super.replicate(replicateContext);
            this.localTestParams.paused = false;
            return replicate;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hbase/replication/TestBulkloadReplicationMetrics$FilePaths.class */
    public static class FilePaths {
        public String inputPah;
        public String outputPath;

        FilePaths() {
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/TestBulkloadReplicationMetrics$SrcBlkLoadMetrics.class */
    static class SrcBlkLoadMetrics {
        public long sizeOfLogToReplicate;
        public long timeForLogToReplicate;
        public long shippedOps;
        public long shippedHFiles;
        public long sizeOfHFileRefsQueue;
        public long logEditsFiltered;
        public long failedReplicationAttempts;

        public SrcBlkLoadMetrics(MetricsSource metricsSource) {
            this.sizeOfLogToReplicate = metricsSource.getSizeOfLogToReplicate();
            this.timeForLogToReplicate = metricsSource.getTimeForLogToReplicate();
            this.shippedOps = metricsSource.getShippedOps();
            this.shippedHFiles = metricsSource.getShippedHFiles();
            this.sizeOfHFileRefsQueue = metricsSource.getSizeOfHFileRefsQueue();
            this.logEditsFiltered = metricsSource.getLogEditsFiltered();
            this.failedReplicationAttempts = metricsSource.getFailedReplicationAttempts();
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/TestBulkloadReplicationMetrics$TestParams.class */
    static class TestParams {
        public boolean pauseReplication;
        public boolean paused;

        TestParams() {
        }
    }

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        conf1.set("zookeeper.znode.parent", "/1");
        conf1.setInt("hbase.hfile.compaction.discharger.thread.count", 1);
        conf1.setInt("hbase.master.procedure.threads", 2);
        conf1.setBoolean("hbase.replication.bulkload.enabled", true);
        conf1.setStrings("hbase.replication.cluster.id", new String[]{"activeReplicationPeer"});
        conf1.set("hbase.replication.source.fs.conf.provider", TestSourceFSConfigurationProvider.class.getCanonicalName());
        utility1 = new HBaseTestingUtility(conf1);
        utility1.startMiniZKCluster();
        MiniZooKeeperCluster zkCluster = utility1.getZkCluster();
        conf2 = HBaseConfiguration.create(conf1);
        conf2.set("zookeeper.znode.parent", "/2");
        utility2 = new HBaseTestingUtility(conf2);
        utility2.setZkCluster(zkCluster);
        utility1.startMiniCluster(1);
        utility2.setDFSCluster(utility1.getDFSCluster(), false);
        utility2.startMiniCluster(1);
        ReplicationPeerConfigBuilder newBuilder = ReplicationPeerConfig.newBuilder();
        newBuilder.setClusterKey(utility2.getClusterKey());
        newBuilder.setReplicationEndpointImpl(CustomReplicationEndpoint.class.getName());
        utility1.getAdmin().addReplicationPeer("2", newBuilder.build());
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        utility2.shutdownMiniCluster();
        utility1.setDFSCluster((MiniDFSCluster) null, false);
        utility1.shutdownMiniCluster();
    }

    @Test(timeout = 300000)
    public void testReplicationSourceMetrics() throws Throwable {
        testParams.pauseReplication = false;
        MetricsSource sourceMetrics = utility1.getHBaseCluster().getRegionServer(0).getReplicationSourceService().getReplicationManager().getSource("2").getSourceMetrics();
        SrcBlkLoadMetrics srcBlkLoadMetrics = new SrcBlkLoadMetrics(sourceMetrics);
        doBulkLoad("bulkLoadTableForSourceMetrics");
        waitForShippedHFilesIncrease(srcBlkLoadMetrics.shippedHFiles, 120000L, sourceMetrics);
        SrcBlkLoadMetrics srcBlkLoadMetrics2 = new SrcBlkLoadMetrics(sourceMetrics);
        Assert.assertEquals("Expected number of hfiles were not shipped", srcBlkLoadMetrics.shippedHFiles + 2, srcBlkLoadMetrics2.shippedHFiles);
        Assert.assertEquals("After all the replication completion, size of the logs to replicate should be zero", 0L, srcBlkLoadMetrics2.sizeOfLogToReplicate);
        Assert.assertEquals("After all the replication completion, hfiles must be removed from the queues", 0L, srcBlkLoadMetrics2.sizeOfHFileRefsQueue);
        Assert.assertEquals("Nothing left for replicaiton, so there should not be any time left for replicaiton", 0L, srcBlkLoadMetrics2.timeForLogToReplicate);
        Assert.assertEquals("No replicaiton should fail", 0L, srcBlkLoadMetrics2.failedReplicationAttempts);
    }

    @Test(timeout = 300000)
    public void testReplicationSourceMetricSizeOfHFileRefsQueue() throws Throwable {
        testParams.pauseReplication = true;
        MetricsSource sourceMetrics = utility1.getHBaseCluster().getRegionServer(0).getReplicationSourceService().getReplicationManager().getSource("2").getSourceMetrics();
        SrcBlkLoadMetrics srcBlkLoadMetrics = new SrcBlkLoadMetrics(sourceMetrics);
        long sizeOfHFileRefsQueue = sourceMetrics.getSizeOfHFileRefsQueue();
        HashMap hashMap = new HashMap();
        hashMap.put("importtsv.separator", DELIMITER);
        hashMap.put("importtsv.columns", "HBASE_ROW_KEY,f1:c1,f1:c2,f2:c3");
        importData("bulkLoadTableForSourceMetrics", hashMap, 50);
        utility1.waitFor(60000L, new Waiter.Predicate<IOException>() { // from class: org.apache.hadoop.hbase.replication.TestBulkloadReplicationMetrics.1
            public boolean evaluate() throws IOException {
                return TestBulkloadReplicationMetrics.testParams.paused;
            }
        });
        Assert.assertEquals("sizeOfHFileRefsQueue metrics value is not correct", 2 + sizeOfHFileRefsQueue, sourceMetrics.getSizeOfHFileRefsQueue());
        testParams.pauseReplication = false;
        ReplicationTestUtil.waitForReplication(utility2.getConnection().getTable(TableName.valueOf("bulkLoadTableForSourceMetrics")), 50, 10000L);
        Assert.assertEquals(50, count(r0));
        waitForShippedHFilesIncrease(srcBlkLoadMetrics.shippedHFiles, 120000L, sourceMetrics);
        Assert.assertEquals("After all the replication completion, size of the logs to replicate should be zero", 0L, sourceMetrics.getSizeOfHFileRefsQueue());
    }

    @Test(timeout = 300000)
    public void testReplicationSinkMetricsAppliedHFiles() throws Throwable {
        MetricsSink metricsSink = new MetricsSink();
        long appliedHFiles = metricsSink.getAppliedHFiles();
        doBulkLoad("bulkLoadTable");
        waitForAppliedHFilesCountIncrease(appliedHFiles, 120000L, metricsSink);
        Assert.assertEquals(appliedHFiles + 2, metricsSink.getAppliedHFiles());
    }

    @Test(timeout = 300000)
    public void testSourceAndSinkHFilesMetricsConsistency() throws Throwable {
        testParams.pauseReplication = false;
        MetricsSource sourceMetrics = utility1.getHBaseCluster().getRegionServer(0).getReplicationSourceService().getReplicationManager().getSource("2").getSourceMetrics();
        SrcBlkLoadMetrics srcBlkLoadMetrics = new SrcBlkLoadMetrics(sourceMetrics);
        MetricsSink metricsSink = new MetricsSink();
        TableName valueOf = TableName.valueOf("t1");
        utility1.getAdmin().createTable(TableDescriptorBuilder.newBuilder(valueOf).setColumnFamily(ColumnFamilyDescriptorBuilder.of(this.FAMILY)).build());
        utility1.getAdmin().enableTableReplication(valueOf);
        Table table = utility1.getConnection().getTable(valueOf);
        long appliedHFiles = metricsSink.getAppliedHFiles();
        doBulkLoad("bulkLoadTableForSourceAndSinkMetrics");
        waitForShippedHFilesIncrease(srcBlkLoadMetrics.shippedHFiles, 120000L, sourceMetrics);
        waitForAppliedHFilesCountIncrease(appliedHFiles, 120000L, metricsSink);
        writeData(table, 100, utility1.getAdmin());
        waitForShippedHFilesIncrease(srcBlkLoadMetrics.shippedHFiles, 120000L, sourceMetrics);
        waitForAppliedHFilesCountIncrease(appliedHFiles, 120000L, metricsSink);
        SrcBlkLoadMetrics srcBlkLoadMetrics2 = new SrcBlkLoadMetrics(sourceMetrics);
        long appliedHFiles2 = metricsSink.getAppliedHFiles();
        doBulkLoad("bulkLoadTableForSourceAndSinkMetrics2");
        waitForShippedHFilesIncrease(srcBlkLoadMetrics2.shippedHFiles, 120000L, sourceMetrics);
        waitForAppliedHFilesCountIncrease(appliedHFiles2, 120000L, metricsSink);
        SrcBlkLoadMetrics srcBlkLoadMetrics3 = new SrcBlkLoadMetrics(sourceMetrics);
        Assert.assertEquals(srcBlkLoadMetrics3.shippedHFiles, metricsSink.getAppliedHFiles());
        Assert.assertEquals(srcBlkLoadMetrics3.shippedOps, metricsSink.getAppliedOps());
        Assert.assertEquals(appliedHFiles + 4, metricsSink.getAppliedHFiles());
        Assert.assertEquals(srcBlkLoadMetrics.shippedHFiles + 4, srcBlkLoadMetrics3.shippedHFiles);
    }

    private void doBulkLoad(String str) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("importtsv.separator", DELIMITER);
        hashMap.put("importtsv.columns", "HBASE_ROW_KEY,f1:c1,f1:c2,f2:c3");
        importData(str, hashMap, 50);
        ReplicationTestUtil.waitForReplication(utility2.getConnection().getTable(TableName.valueOf(str)), 50, 10000L);
        Assert.assertEquals(50, count(r0));
    }

    private int count(Table table) throws IOException {
        ResultScanner scanner = table.getScanner(new Scan());
        int i = 0;
        for (Result next = scanner.next(); next != null; next = scanner.next()) {
            i++;
        }
        return i;
    }

    private void waitForAppliedHFilesCountIncrease(long j, long j2, MetricsSink metricsSink) throws IOException, InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        while (metricsSink.getAppliedHFiles() <= j && System.currentTimeMillis() <= currentTimeMillis + j2) {
            try {
                Thread.sleep(200L);
            } catch (InterruptedException e) {
                return;
            }
        }
    }

    private void waitForShippedHFilesIncrease(long j, long j2, MetricsSource metricsSource) throws IOException, InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        while (metricsSource.getShippedHFiles() <= j && System.currentTimeMillis() <= currentTimeMillis + j2) {
            try {
                Thread.sleep(200L);
            } catch (InterruptedException e) {
                return;
            }
        }
    }

    private void importData(String str, Map<String, String> map, int i) throws Exception {
        FilePaths writeDataToFile = writeDataToFile(getData(i));
        map.put("importtsv.bulk.output", writeDataToFile.outputPath);
        LOG.info("Running ImportTsv with arguments: " + getArgs(str, writeDataToFile.inputPah, map));
        Assert.assertEquals("ImportTsv run failed.", 0L, ToolRunner.run(utility1.getConfiguration(), new ImportTsv(), r0));
        utility1.getAdmin().enableTableReplication(TableName.valueOf(str));
        Assert.assertEquals("LoadIncrementalHFiles run failed.", 0L, ToolRunner.run(utility1.getConfiguration(), new LoadIncrementalHFiles(utility1.getConfiguration()), new String[]{writeDataToFile.outputPath, str}));
    }

    private String[] getArgs(String str, String str2, Map<String, String> map) {
        String[] strArr = new String[map.size() + 2];
        int i = 0;
        for (Map.Entry<String, String> entry : map.entrySet()) {
            strArr[i] = "-D" + entry.getKey() + "=" + entry.getValue();
            i++;
        }
        strArr[i] = str;
        strArr[i + 1] = str2;
        return strArr;
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.String[], java.lang.String[][]] */
    private String[][] getData(int i) {
        ?? r0 = new String[i];
        for (int i2 = 0; i2 < i; i2++) {
            String[] strArr = new String[4];
            strArr[0] = "row" + i2;
            strArr[1] = "name" + i2;
            strArr[2] = String.valueOf(Math.floor(Math.random() * 50.0d));
            strArr[3] = String.valueOf(Math.floor(Math.random() * 50000.0d));
            r0[i2] = strArr;
        }
        return r0;
    }

    private FilePaths writeDataToFile(String[][] strArr) throws IOException {
        return writeDataToFile(strArr, new ArrayList());
    }

    private FilePaths writeDataToFile(String[][] strArr, List<String> list) throws IOException {
        FileSystem fileSystem = FileSystem.get(utility1.getConfiguration());
        HBaseTestingUtility hBaseTestingUtility = utility1;
        StringBuilder append = new StringBuilder().append("input");
        int i = fileCounter;
        fileCounter = i + 1;
        Path makeQualified = fileSystem.makeQualified(new Path(hBaseTestingUtility.getDataTestDirOnTestFS(append.append(i).toString()), "import.txt"));
        Path dataTestDirOnTestFS = utility1.getDataTestDirOnTestFS();
        StringBuilder append2 = new StringBuilder().append("output");
        int i2 = fileCounter;
        fileCounter = i2 + 1;
        Path makeQualified2 = fileSystem.makeQualified(new Path(dataTestDirOnTestFS, append2.append(i2).toString()));
        FSDataOutputStream create = fileSystem.create(makeQualified, true);
        StringBuffer stringBuffer = new StringBuffer();
        for (int i3 = 0; i3 < strArr.length; i3++) {
            stringBuffer.append(strArr[i3][0]);
            stringBuffer.append(DELIMITER);
            stringBuffer.append(strArr[i3][1]);
            stringBuffer.append(DELIMITER);
            stringBuffer.append(strArr[i3][2]);
            stringBuffer.append(DELIMITER);
            stringBuffer.append(strArr[i3][3]);
            stringBuffer.append(System.getProperty("line.separator"));
        }
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            stringBuffer.append(it.next());
            stringBuffer.append(System.getProperty("line.separator"));
        }
        LOG.info("data=" + stringBuffer.toString());
        create.writeChars(stringBuffer.toString());
        LOG.info(String.format("Wrote test data to file: %s", makeQualified));
        FilePaths filePaths = new FilePaths();
        filePaths.inputPah = makeQualified.toString();
        filePaths.outputPath = makeQualified2.toString();
        create.close();
        return filePaths;
    }

    private void writeData(Table table, int i, Admin admin) throws IOException {
        byte[] bytes = "c1".getBytes();
        for (int i2 = 0; i2 < i; i2++) {
            Put put = new Put(("row" + i2).getBytes());
            put.addColumn(this.FAMILY, bytes, ("value" + i2).getBytes());
            table.put(put);
        }
        admin.flush(table.getName());
    }
}
