package org.apache.hadoop.hbase.hindex.replication;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.hindex.IndexTestingUtil;
import org.apache.hadoop.hbase.hindex.client.HIndexAdmin;
import org.apache.hadoop.hbase.hindex.client.impl.HIndexClient;
import org.apache.hadoop.hbase.hindex.common.HIndexSpecification;
import org.apache.hadoop.hbase.hindex.common.TableIndices;
import org.apache.hadoop.hbase.hindex.mapreduce.HIndexImportTsv;
import org.apache.hadoop.hbase.hindex.protobuf.generated.HIndexProtos;
import org.apache.hadoop.hbase.hindex.server.builder.HIndexUtils;
import org.apache.hadoop.hbase.hindex.server.manager.HIndexManager;
import org.apache.hadoop.hbase.hindex.server.master.HIndexMasterCoprocessor;
import org.apache.hadoop.hbase.hindex.server.master.procV2.AddTableIndexProcedure;
import org.apache.hadoop.hbase.hindex.server.regionserver.HIndexRegionCoprocessor;
import org.apache.hadoop.hbase.hindex.server.regionserver.HIndexRegionServerCoprocessor;
import org.apache.hadoop.hbase.hindex.server.regionserver.HIndexWALCoprocessor;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({MediumTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/hindex/replication/TestHIndexReplication.class */
public class TestHIndexReplication {
    private static Configuration peerConf;
    private static HBaseTestingUtility activeUtil;
    private static HBaseTestingUtility peerUtil;
    private static ReplicationAdmin activeRepAdmin;
    private static final String peerId = "peer1";
    protected static HIndexAdmin aciveIndexAdmin;
    protected static HIndexAdmin peerIndexAdmin;
    private static final String DELIMITER = ",";
    private static final String COLUMN_INPUT = "HBASE_ROW_KEY,f1:c1,f1:c2,f2:c3";
    private static final Log LOG = LogFactory.getLog(TestHIndexReplication.class);
    private static Configuration activeConf = HBaseConfiguration.create();
    private static final String FAMILY = "f1";
    private static final byte[] famName = Bytes.toBytes(FAMILY);
    private static final String COLUMN = "c1";
    private static final byte[] columnName = Bytes.toBytes(COLUMN);
    private static String uniqueID = Long.toString(System.currentTimeMillis());
    private static int fileCounter = 1;
    private static int userRecordCount = 3;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hbase/hindex/replication/TestHIndexReplication$FilePaths.class */
    public static class FilePaths {
        public String inputPah;
        public String outputPath;

        FilePaths() {
        }
    }

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        activeConf.setBoolean("hbase.replication", true);
        activeConf.setBoolean("hbase.replication.bulkload.enabled", true);
        activeConf.setStrings("hbase.coprocessor.regionserver.classes", new String[]{HIndexRegionServerCoprocessor.class.getName()});
        activeConf.setStrings("hbase.coprocessor.master.classes", new String[]{HIndexMasterCoprocessor.class.getName()});
        activeConf.setStrings("hbase.coprocessor.region.classes", new String[]{HIndexRegionCoprocessor.class.getName()});
        activeConf.setStrings("hbase.replication.cluster.id", new String[]{"indexReplicationActive" + uniqueID});
        activeConf.setStrings("hbase.coprocessor.wal.classes", new String[]{HIndexWALCoprocessor.class.getName()});
        activeConf.set("hbase.replication.source.fs.conf.provider", TestSourceFSConfigurationProvider.class.getCanonicalName());
        activeUtil = new HBaseTestingUtility(activeConf);
        activeUtil.startMiniCluster();
        activeRepAdmin = new ReplicationAdmin(activeConf);
        peerConf = HBaseConfiguration.create(activeConf);
        peerConf.setBoolean("hbase.replication", true);
        peerConf.set("zookeeper.znode.parent", "/2");
        peerConf.setInt("hbase.zookeeper.property.clientPort", 2182);
        peerConf.setStrings("hbase.replication.cluster.id", new String[]{"indexReplicationPeer" + uniqueID});
        peerUtil = new HBaseTestingUtility(peerConf);
        peerUtil.startMiniCluster();
        ReplicationPeerConfig replicationPeerConfig = new ReplicationPeerConfig();
        replicationPeerConfig.setClusterKey(peerUtil.getClusterKey());
        activeRepAdmin.addPeer(peerId, replicationPeerConfig, (Map) null);
        aciveIndexAdmin = HIndexClient.newHIndexAdmin(activeUtil.getHBaseAdmin());
        peerIndexAdmin = HIndexClient.newHIndexAdmin(peerUtil.getHBaseAdmin());
        IndexTestingUtil.checkIndexCacheInitialized();
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        AddTableIndexProcedure.setSkipValidation(false);
        activeRepAdmin.close();
        activeUtil.shutdownMiniCluster();
        peerUtil.shutdownMiniCluster();
    }

    @Test
    public void testNormalTableReplication() throws Exception {
        TableName valueOf = TableName.valueOf("normalTableReplication" + uniqueID);
        HTableDescriptor hTableDescriptor = new HTableDescriptor(valueOf);
        hTableDescriptor.addFamily(new HColumnDescriptor(famName));
        activeUtil.getHBaseAdmin().createTable(hTableDescriptor);
        activeUtil.waitUntilAllRegionsAssigned(valueOf);
        activeRepAdmin.enableTableRep(valueOf);
        Assert.assertTrue(peerUtil.getHBaseAdmin().tableExists(valueOf));
        Table table = activeUtil.getConnection().getTable(valueOf);
        Put put = new Put("row1".getBytes());
        put.addColumn(famName, columnName, "v1".getBytes());
        table.put(put);
        Assert.assertEquals("Table row is not added yet", 1L, getTableRows(table).size());
        waitForReplication(peerUtil.getConnection().getTable(valueOf), 1);
        Assert.assertEquals("Tables rows are not replicated", 1L, getTableRows(r0).size());
    }

    private void waitForReplication(Table table, int i) throws IOException, InterruptedException {
        for (int i2 = 0; i2 < 30; i2++) {
            Scan scan = new Scan();
            scan.setAttribute("FETCH_INDEX_DATA", "true".getBytes());
            if (i2 == 30 - 1) {
                LOG.info("Waited too much time for replication");
                return;
            }
            ResultScanner scanner = table.getScanner(scan);
            Result[] next = scanner.next(i);
            scanner.close();
            if (next.length == i) {
                LOG.info("Got " + next.length + " rows");
                return;
            } else {
                LOG.info("Only got " + next.length + " rows");
                Thread.sleep(500L);
            }
        }
    }

    @Test
    public void testIndexedTableReplication() throws Exception {
        TableName valueOf = TableName.valueOf("indexTableReplication" + uniqueID);
        HTableDescriptor hTableDescriptor = new HTableDescriptor(valueOf);
        hTableDescriptor.addFamily(new HColumnDescriptor(famName));
        activeUtil.getHBaseAdmin().createTable(hTableDescriptor);
        activeUtil.waitUntilAllRegionsAssigned(valueOf);
        TableIndices tableIndices = new TableIndices();
        tableIndices.addIndex(getIndex("idxRep"));
        aciveIndexAdmin.addIndicesWithData(valueOf, tableIndices);
        AddTableIndexProcedure.setSkipValidation(true);
        activeRepAdmin.enableTableRep(valueOf);
        AddTableIndexProcedure.setSkipValidation(false);
        Assert.assertTrue(peerUtil.getHBaseAdmin().tableExists(valueOf));
        List listIndices = peerIndexAdmin.listIndices(valueOf);
        Assert.assertEquals("Table is replicated but index meta not created.", 1L, listIndices.size());
        Assert.assertEquals("index should be in active state", HIndexManager.IndexState.ACTIVE, ((Pair) listIndices.get(0)).getSecond());
        Table table = activeUtil.getConnection().getTable(valueOf);
        Put put = new Put("row1".getBytes());
        put.addColumn(famName, columnName, "v1".getBytes());
        table.put(put);
        List<Result> tableRows = getTableRows(table);
        Assert.assertEquals("Table row is not added yet", 2L, tableRows.size());
        Table table2 = peerUtil.getConnection().getTable(valueOf);
        waitForReplication(table2, 2);
        List<Result> tableRows2 = getTableRows(table2);
        Assert.assertEquals("Tables rows are not replicated", 2L, tableRows2.size());
        Assert.assertNotNull(HIndexUtils.getIndexColumnFamily(table.getTableDescriptor()));
        Assert.assertEquals("index data is not same in active and peer clusters", 0L, Bytes.compareTo(tableRows.get(0).getColumnLatestCell(r0.getBytes(), "".getBytes()).getValueArray(), tableRows2.get(0).getColumnLatestCell(r0.getBytes(), "".getBytes()).getValueArray()));
    }

    @Test
    public void testIndexedTableReplicationWithBigData() throws Exception {
        TableName valueOf = TableName.valueOf("indexTableReplicationBigData" + uniqueID);
        HTableDescriptor hTableDescriptor = new HTableDescriptor(valueOf);
        hTableDescriptor.addFamily(new HColumnDescriptor(famName));
        activeUtil.getHBaseAdmin().createTable(hTableDescriptor);
        activeUtil.waitUntilAllRegionsAssigned(valueOf);
        TableIndices tableIndices = new TableIndices();
        tableIndices.addIndex(getIndex("idxRep"));
        aciveIndexAdmin.addIndicesWithData(valueOf, tableIndices);
        Table table = activeUtil.getConnection().getTable(valueOf);
        Assert.assertNotNull("Index family can not be null", HIndexUtils.getIndexColumnFamily(table.getTableDescriptor()));
        AddTableIndexProcedure.setSkipValidation(true);
        activeRepAdmin.enableTableRep(valueOf);
        AddTableIndexProcedure.setSkipValidation(false);
        Assert.assertTrue(peerUtil.getHBaseAdmin().tableExists(valueOf));
        Assert.assertEquals("Table is replicated but index meta not created.", 1L, peerIndexAdmin.listIndices(valueOf).size());
        for (int i = 0; i < 2000; i++) {
            Put put = new Put(("row" + i).getBytes());
            put.addColumn(famName, columnName, ("v" + i).getBytes());
            table.put(put);
        }
        List<Result> tableRows = getTableRows(table);
        Assert.assertEquals("Table row is not added yet", 2000 * 2, tableRows.size());
        Table table2 = peerUtil.getConnection().getTable(valueOf);
        waitForReplication(table2, 2000 * 2);
        List<Result> tableRows2 = getTableRows(table2);
        Assert.assertEquals("Tables rows are not replicated", 2000 * 2, tableRows2.size());
        Assert.assertNotNull(HIndexUtils.getIndexColumnFamily(table.getTableDescriptor()));
        Assert.assertEquals("index data is not same in active and peer clusters", 0L, Bytes.compareTo(tableRows.get(0).getColumnLatestCell(r0.getBytes(), "".getBytes()).getValueArray(), tableRows2.get(0).getColumnLatestCell(r0.getBytes(), "".getBytes()).getValueArray()));
    }

    @Test
    public void testIndexedBulkLoadReplication() throws Exception {
        TableName valueOf = TableName.valueOf("indexTableBulkLoadReplication" + uniqueID);
        HTableDescriptor hTableDescriptor = new HTableDescriptor(valueOf);
        hTableDescriptor.addFamily(new HColumnDescriptor(famName));
        hTableDescriptor.addFamily(new HColumnDescriptor("f2"));
        activeUtil.getHBaseAdmin().createTable(hTableDescriptor);
        activeUtil.waitUntilAllRegionsAssigned(valueOf);
        TableIndices tableIndices = new TableIndices();
        tableIndices.addIndex(getIndex("idxRep"));
        aciveIndexAdmin.addIndicesWithData(valueOf, tableIndices);
        AddTableIndexProcedure.setSkipValidation(true);
        activeRepAdmin.enableTableRep(valueOf);
        AddTableIndexProcedure.setSkipValidation(false);
        Assert.assertTrue(peerUtil.getHBaseAdmin().tableExists(valueOf));
        importData(valueOf.getNameAsString(), getSystemInput("unusedIndexSpec"));
        Table table = activeUtil.getConnection().getTable(valueOf);
        List<Result> tableRows = getTableRows(table);
        Assert.assertEquals("Table row is not added yet", userRecordCount * 2, tableRows.size());
        Table table2 = peerUtil.getConnection().getTable(valueOf);
        waitForReplication(table2, userRecordCount * 2);
        List<Result> tableRows2 = getTableRows(table2);
        Assert.assertEquals("Tables rows are not replicated", userRecordCount * 2, tableRows2.size());
        Assert.assertNotNull(HIndexUtils.getIndexColumnFamily(table.getTableDescriptor()));
        Assert.assertEquals("index data is not same in active and peer clusters", 0L, Bytes.compareTo(tableRows.get(0).getColumnLatestCell(r0.getBytes(), "".getBytes()).getValueArray(), tableRows2.get(0).getColumnLatestCell(r0.getBytes(), "".getBytes()).getValueArray()));
    }

    private Map<String, String> getSystemInput(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("importtsv.separator", DELIMITER);
        hashMap.put("table.columns.index", str);
        hashMap.put("importtsv.columns", COLUMN_INPUT);
        return hashMap;
    }

    private void importData(String str, Map<String, String> map) throws Exception {
        FilePaths writeDataToFile = writeDataToFile(getData(userRecordCount));
        Boolean valueOf = Boolean.valueOf(map.get("importtsv.dry.run"));
        map.put("importtsv.bulk.output", writeDataToFile.outputPath);
        if (!valueOf.booleanValue()) {
            map.put("importtsv.dry.run", "false");
        }
        LOG.info("Running HIndexImportTsv with arguments: " + getArgs(str, writeDataToFile.inputPah, map));
        Assert.assertEquals("HIndexImportTsv run failed.", 0L, ToolRunner.run(activeConf, new HIndexImportTsv(), r0));
        if (valueOf.booleanValue()) {
            return;
        }
        Assert.assertEquals("LoadIncrementalHFiles run failed.", 0L, ToolRunner.run(activeConf, new LoadIncrementalHFiles(activeConf), new String[]{writeDataToFile.outputPath, str}));
    }

    private FilePaths writeDataToFile(String[][] strArr) throws IOException {
        return writeDataToFile(strArr, new ArrayList());
    }

    private FilePaths writeDataToFile(String[][] strArr, List<String> list) throws IOException {
        FileSystem fileSystem = FileSystem.get(activeConf);
        HBaseTestingUtility hBaseTestingUtility = activeUtil;
        StringBuilder append = new StringBuilder().append("input");
        int i = fileCounter;
        fileCounter = i + 1;
        Path makeQualified = fileSystem.makeQualified(new Path(hBaseTestingUtility.getDataTestDirOnTestFS(append.append(i).toString()), "import.txt"));
        Path dataTestDirOnTestFS = activeUtil.getDataTestDirOnTestFS();
        StringBuilder append2 = new StringBuilder().append("output");
        int i2 = fileCounter;
        fileCounter = i2 + 1;
        Path makeQualified2 = fileSystem.makeQualified(new Path(dataTestDirOnTestFS, append2.append(i2).toString()));
        FSDataOutputStream create = fileSystem.create(makeQualified, true);
        StringBuffer stringBuffer = new StringBuffer();
        for (int i3 = 0; i3 < strArr.length; i3++) {
            stringBuffer.append(strArr[i3][0]);
            stringBuffer.append(DELIMITER);
            stringBuffer.append(strArr[i3][1]);
            stringBuffer.append(DELIMITER);
            stringBuffer.append(strArr[i3][2]);
            stringBuffer.append(DELIMITER);
            stringBuffer.append(strArr[i3][3]);
            stringBuffer.append(System.getProperty("line.separator"));
        }
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            stringBuffer.append(it.next());
            stringBuffer.append(System.getProperty("line.separator"));
        }
        LOG.info("data=" + stringBuffer.toString());
        create.writeChars(stringBuffer.toString());
        LOG.info(String.format("Wrote test data to file: %s", makeQualified));
        FilePaths filePaths = new FilePaths();
        filePaths.inputPah = makeQualified.toString();
        filePaths.outputPath = makeQualified2.toString();
        close(fileSystem, create);
        return filePaths;
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.String[], java.lang.String[][]] */
    private String[][] getData(int i) {
        ?? r0 = new String[i];
        for (int i2 = 0; i2 < i; i2++) {
            String[] strArr = new String[4];
            strArr[0] = "row" + i2;
            strArr[1] = "name" + i2;
            strArr[2] = String.valueOf(Math.floor(Math.random() * 50.0d));
            strArr[3] = String.valueOf(Math.floor(Math.random() * 50000.0d));
            r0[i2] = strArr;
        }
        return r0;
    }

    private HIndexSpecification getIndex(String str) throws IOException {
        HIndexSpecification hIndexSpecification = new HIndexSpecification(str);
        hIndexSpecification.addIndexColumn(new HColumnDescriptor(famName), Bytes.toString(columnName), HIndexProtos.ColumnQualifier.ValueType.STRING);
        return hIndexSpecification;
    }

    private List<Result> getTableRows(Table table) throws IOException {
        ArrayList arrayList = new ArrayList();
        Scan scan = new Scan();
        scan.setAttribute("FETCH_INDEX_DATA", "true".getBytes());
        ResultScanner scanner = table.getScanner(scan);
        Iterator it = scanner.iterator();
        while (it.hasNext()) {
            arrayList.add((Result) it.next());
        }
        scanner.close();
        return arrayList;
    }

    private void close(FileSystem fileSystem, FSDataOutputStream fSDataOutputStream) {
        try {
            fSDataOutputStream.close();
        } catch (Throwable th) {
            LOG.error(th);
        }
    }

    private String[] getArgs(String str, String str2, Map<String, String> map) {
        String[] strArr = new String[map.size() + 2];
        int i = 0;
        for (Map.Entry<String, String> entry : map.entrySet()) {
            strArr[i] = "-D" + entry.getKey() + "=" + entry.getValue();
            i++;
        }
        strArr[i] = str;
        strArr[i + 1] = str2;
        return strArr;
    }
}
