package org.apache.hadoop.hbase.regionserver;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ReplicationTests.class, MediumTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.class */
public class TestBulkLoadReplication extends TestReplicationBase {
    private static final String PEER1_CLUSTER_ID = "peer1";
    private static final String PEER2_CLUSTER_ID = "peer2";
    private static final String PEER3_CLUSTER_ID = "peer3";
    private static final String PEER_ID1 = "1";
    private static final String PEER_ID3 = "3";
    private static AtomicInteger BULK_LOADS_COUNT;
    private static CountDownLatch BULK_LOAD_LATCH;

    @Rule
    public TestName name = new TestName();

    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestBulkLoadReplication.class);
    protected static final Logger LOG = LoggerFactory.getLogger(TestBulkLoadReplication.class);
    protected static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility();
    protected static final Configuration CONF3 = UTIL3.getConfiguration();
    private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");

    @ClassRule
    public static TemporaryFolder testFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication$BulkReplicationTestObserver.class */
    public static class BulkReplicationTestObserver implements RegionCoprocessor {
        String clusterName;
        AtomicInteger bulkLoadCounts = new AtomicInteger();

        public Optional<RegionObserver> getRegionObserver() {
            return Optional.of(new RegionObserver() { // from class: org.apache.hadoop.hbase.regionserver.TestBulkLoadReplication.BulkReplicationTestObserver.1
                public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> observerContext, List<Pair<byte[], String>> list, Map<byte[], List<Path>> map) throws IOException {
                    TestBulkLoadReplication.BULK_LOAD_LATCH.countDown();
                    TestBulkLoadReplication.BULK_LOADS_COUNT.incrementAndGet();
                    TestBulkLoadReplication.LOG.debug("Another file bulk loaded. Total for {}: {}", BulkReplicationTestObserver.this.clusterName, Integer.valueOf(BulkReplicationTestObserver.this.bulkLoadCounts.addAndGet(1)));
                }
            });
        }
    }

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID);
        setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID);
        setupBulkLoadConfigsForCluster(CONF3, PEER3_CLUSTER_ID);
        setupConfig(UTIL3, "/3");
        TestReplicationBase.setUpBeforeClass();
        startThirdCluster();
    }

    private static void startThirdCluster() throws Exception {
        LOG.info("Setup Zk to same one from UTIL1 and UTIL2");
        UTIL3.setZkCluster(UTIL1.getZkCluster());
        UTIL3.setDFSCluster(UTIL1.getDFSCluster(), false);
        UTIL3.startMiniCluster(NUM_SLAVES1);
        TableDescriptor build = TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMobEnabled(true).setMobThreshold(4000L).setScope(1).build()).setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
        Admin admin = ConnectionFactory.createConnection(CONF3).getAdmin();
        try {
            admin.createTable(build, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
            if (admin != null) {
                admin.close();
            }
            UTIL3.waitUntilAllRegionsAssigned(tableName);
        } catch (Throwable th) {
            if (admin != null) {
                try {
                    admin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.apache.hadoop.hbase.replication.TestReplicationBase
    @Before
    public void setUpBase() throws Exception {
        super.setUpBase();
        ReplicationPeerConfig peerConfigForCluster = getPeerConfigForCluster(UTIL1);
        ReplicationPeerConfig peerConfigForCluster2 = getPeerConfigForCluster(UTIL2);
        ReplicationPeerConfig peerConfigForCluster3 = getPeerConfigForCluster(UTIL3);
        UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peerConfigForCluster);
        UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peerConfigForCluster3);
        UTIL3.getAdmin().addReplicationPeer("2", peerConfigForCluster2);
        setupCoprocessor(UTIL1);
        setupCoprocessor(UTIL2);
        setupCoprocessor(UTIL3);
        BULK_LOADS_COUNT = new AtomicInteger(0);
    }

    private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility hBaseTestingUtility) {
        return ReplicationPeerConfig.newBuilder().setClusterKey(hBaseTestingUtility.getClusterKey()).setSerial(isSerialPeer()).build();
    }

    private void setupCoprocessor(HBaseTestingUtility hBaseTestingUtility) {
        hBaseTestingUtility.getHBaseCluster().getRegions(tableName).forEach(hRegion -> {
            try {
                if (hRegion.getCoprocessorHost().findCoprocessor(BulkReplicationTestObserver.class) == null) {
                    hRegion.getCoprocessorHost().load(BulkReplicationTestObserver.class, 0, hBaseTestingUtility.getConfiguration());
                    hRegion.getCoprocessorHost().findCoprocessor(BulkReplicationTestObserver.class).clusterName = hBaseTestingUtility.getClusterKey();
                }
            } catch (Exception e) {
                LOG.error(e.getMessage(), e);
            }
        });
    }

    @Override // org.apache.hadoop.hbase.replication.TestReplicationBase
    @After
    public void tearDownBase() throws Exception {
        super.tearDownBase();
        UTIL2.getAdmin().removeReplicationPeer(PEER_ID1);
        UTIL2.getAdmin().removeReplicationPeer(PEER_ID3);
        UTIL3.getAdmin().removeReplicationPeer("2");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void setupBulkLoadConfigsForCluster(Configuration configuration, String str) throws Exception {
        configuration.setBoolean("hbase.replication.bulkload.enabled", true);
        configuration.set("hbase.replication.cluster.id", str);
        configuration.writeXml(new FileOutputStream(new File(testFolder.newFolder(str).getAbsolutePath() + "/hbase-site.xml")));
        configuration.set("hbase.replication.conf.dir", testFolder.getRoot().getAbsolutePath());
    }

    @Test
    public void testBulkLoadReplicationActiveActive() throws Exception {
        Table table = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
        Table table2 = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
        Table table3 = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
        assertBulkLoadConditions(tableName, Bytes.toBytes("001"), Bytes.toBytes("v1"), UTIL1, table, table2, table3);
        assertBulkLoadConditions(tableName, Bytes.toBytes("002"), Bytes.toBytes("v2"), UTIL2, table, table2, table3);
        assertBulkLoadConditions(tableName, Bytes.toBytes("003"), Bytes.toBytes("v3"), UTIL3, table, table2, table3);
        Thread.sleep(400L);
        Assert.assertEquals(9L, BULK_LOADS_COUNT.get());
    }

    @Test
    public void testBulkLoadReplicationActiveActiveForNoRepFamily() throws Exception {
        assertBulkLoadConditionsForNoRepFamily(Bytes.toBytes("004"), Bytes.toBytes("v4"), UTIL1, UTIL1.getConnection().getTable(TestReplicationBase.tableName), UTIL2.getConnection().getTable(TestReplicationBase.tableName), UTIL3.getConnection().getTable(TestReplicationBase.tableName));
        Thread.sleep(400L);
        Assert.assertEquals(1L, BULK_LOADS_COUNT.get());
        Assert.assertTrue(CollectionUtils.isEmpty(ZKUtil.listChildrenNoWatch(UTIL1.getZooKeeperWatcher(), "/1/replication/hfile-refs/2")));
    }

    @Test
    public void testBulkLoadReplicationWithDisableBulkloadReplication() throws Exception {
        Table table = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
        Table table2 = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
        Table table3 = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
        UTIL2.getAdmin().updateReplicationPeerConfig(PEER_ID3, switchTableRepBulk(String.valueOf(TestReplicationBase.tableName), false, UTIL2.getAdmin().getReplicationPeerConfig(PEER_ID3)));
        assertBulkLoadConditionsForDisableBulkReplication(tableName, Bytes.toBytes("005"), Bytes.toBytes("v5"), UTIL2, table, table2, table3);
        ZKWatcher zooKeeperWatcher = UTIL2.getZooKeeperWatcher();
        Thread.sleep(400L);
        Assert.assertTrue(CollectionUtils.isEmpty(ZKUtil.listChildrenNoWatch(zooKeeperWatcher, "/2/replication/hfile-refs/3")));
        UTIL2.getAdmin().updateReplicationPeerConfig(PEER_ID3, switchTableRepBulk(String.valueOf(TestReplicationBase.tableName), true, UTIL2.getAdmin().getReplicationPeerConfig(PEER_ID3)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertBulkLoadConditions(TableName tableName, byte[] bArr, byte[] bArr2, HBaseTestingUtility hBaseTestingUtility, Table... tableArr) throws Exception {
        BULK_LOAD_LATCH = new CountDownLatch(3);
        bulkLoadOnCluster(tableName, bArr, bArr2, hBaseTestingUtility);
        Assert.assertTrue(BULK_LOAD_LATCH.await(1L, TimeUnit.MINUTES));
        assertTableHasValue(tableArr[0], bArr, bArr2);
        assertTableHasValue(tableArr[1], bArr, bArr2);
        assertTableHasValue(tableArr[2], bArr, bArr2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void bulkLoadOnCluster(TableName tableName, byte[] bArr, byte[] bArr2, HBaseTestingUtility hBaseTestingUtility) throws Exception {
        copyToHdfs(createHFileForFamilies(bArr, bArr2, hBaseTestingUtility.getConfiguration()), hBaseTestingUtility.getDFSCluster());
        new BulkLoadHFilesTool(hBaseTestingUtility.getConfiguration()).bulkLoad(tableName, BULK_LOAD_BASE_DIR);
    }

    private void assertBulkLoadConditionsForNoRepFamily(byte[] bArr, byte[] bArr2, HBaseTestingUtility hBaseTestingUtility, Table... tableArr) throws Exception {
        BULK_LOAD_LATCH = new CountDownLatch(1);
        bulkLoadOnClusterForNoRepFamily(bArr, bArr2, hBaseTestingUtility);
        Assert.assertTrue(BULK_LOAD_LATCH.await(1L, TimeUnit.MINUTES));
        assertTableHasValue(tableArr[0], bArr, bArr2);
        assertTableNotHasValue(tableArr[1], bArr, bArr2);
        assertTableNotHasValue(tableArr[2], bArr, bArr2);
    }

    protected void assertBulkLoadConditionsForDisableBulkReplication(TableName tableName, byte[] bArr, byte[] bArr2, HBaseTestingUtility hBaseTestingUtility, Table... tableArr) throws Exception {
        BULK_LOAD_LATCH = new CountDownLatch(2);
        bulkLoadOnCluster(tableName, bArr, bArr2, hBaseTestingUtility);
        Assert.assertTrue(BULK_LOAD_LATCH.await(1L, TimeUnit.MINUTES));
        assertTableHasValue(tableArr[0], bArr, bArr2);
        assertTableHasValue(tableArr[1], bArr, bArr2);
        assertTableNotHasValue(tableArr[2], bArr, bArr2);
    }

    private void copyToHdfsForDisableBulkReplication(String str, MiniDFSCluster miniDFSCluster) throws Exception {
        Path path = new Path(BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/");
        miniDFSCluster.getFileSystem().mkdirs(path);
        miniDFSCluster.getFileSystem().copyFromLocalFile(new Path(str), path);
    }

    private void bulkLoadOnClusterForNoRepFamily(byte[] bArr, byte[] bArr2, HBaseTestingUtility hBaseTestingUtility) throws Exception {
        String createHFileForNoRepFamilies = createHFileForNoRepFamilies(bArr, bArr2, hBaseTestingUtility.getConfiguration());
        Path path = new Path(createHFileForNoRepFamilies);
        copyToHdfsForNoRepFamily(createHFileForNoRepFamilies, hBaseTestingUtility.getDFSCluster());
        BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(hBaseTestingUtility.getConfiguration());
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Path(BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/" + path.getName()));
        hashMap.put(noRepfamName, arrayList);
        bulkLoadHFilesTool.bulkLoad(tableName, hashMap);
    }

    private void copyToHdfsForNoRepFamily(String str, MiniDFSCluster miniDFSCluster) throws Exception {
        Path path = new Path(BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/");
        miniDFSCluster.getFileSystem().mkdirs(path);
        miniDFSCluster.getFileSystem().copyFromLocalFile(new Path(str), path);
    }

    private void copyToHdfs(String str, MiniDFSCluster miniDFSCluster) throws Exception {
        Path path = new Path(BULK_LOAD_BASE_DIR, "f");
        miniDFSCluster.getFileSystem().mkdirs(path);
        miniDFSCluster.getFileSystem().copyFromLocalFile(new Path(str), path);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertTableHasValue(Table table, byte[] bArr, byte[] bArr2) throws Exception {
        Result result = table.get(new Get(bArr));
        Assert.assertTrue(result.advance());
        Assert.assertEquals(Bytes.toString(bArr2), Bytes.toString(result.value()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertTableNoValue(Table table, byte[] bArr, byte[] bArr2) throws Exception {
        Assert.assertTrue(table.get(new Get(bArr)).isEmpty());
    }

    private void assertTableNotHasValue(Table table, byte[] bArr, byte[] bArr2) throws Exception {
        Assert.assertNotEquals(Bytes.toString(bArr2), Bytes.toString(table.get(new Get(bArr)).value()));
    }

    private String createHFileForFamilies(byte[] bArr, byte[] bArr2, Configuration configuration) throws IOException {
        CellBuilder create = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
        create.setRow(bArr).setFamily(TestReplicationBase.famName).setQualifier(Bytes.toBytes(PEER_ID1)).setValue(bArr2).setType(Cell.Type.Put);
        HFile.WriterFactory writerFactoryNoCache = HFile.getWriterFactoryNoCache(configuration);
        File newFile = testFolder.newFile();
        FSDataOutputStream fSDataOutputStream = new FSDataOutputStream(new FileOutputStream(newFile), (FileSystem.Statistics) null);
        try {
            writerFactoryNoCache.withOutputStream(fSDataOutputStream);
            writerFactoryNoCache.withFileContext(new HFileContextBuilder().build());
            HFile.Writer create2 = writerFactoryNoCache.create();
            try {
                create2.append(new KeyValue(create.build()));
                create2.close();
                return newFile.getAbsoluteFile().getAbsolutePath();
            } catch (Throwable th) {
                create2.close();
                throw th;
            }
        } finally {
            fSDataOutputStream.close();
        }
    }

    private String createHFileForNoRepFamilies(byte[] bArr, byte[] bArr2, Configuration configuration) throws IOException {
        CellBuilder create = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
        create.setRow(bArr).setFamily(TestReplicationBase.noRepfamName).setQualifier(Bytes.toBytes(PEER_ID1)).setValue(bArr2).setType(Cell.Type.Put);
        HFile.WriterFactory writerFactoryNoCache = HFile.getWriterFactoryNoCache(configuration);
        File newFile = testFolder.newFile();
        FSDataOutputStream fSDataOutputStream = new FSDataOutputStream(new FileOutputStream(newFile), (FileSystem.Statistics) null);
        try {
            writerFactoryNoCache.withOutputStream(fSDataOutputStream);
            writerFactoryNoCache.withFileContext(new HFileContextBuilder().build());
            HFile.Writer create2 = writerFactoryNoCache.create();
            try {
                create2.append(new KeyValue(create.build()));
                create2.close();
                return newFile.getAbsoluteFile().getAbsolutePath();
            } catch (Throwable th) {
                create2.close();
                throw th;
            }
        } finally {
            fSDataOutputStream.close();
        }
    }
}
