package org.apache.hadoop.hbase.coprocessor;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.TableInfo;
import org.apache.carbondata.sdk.file.CarbonSchemaWriter;
import org.apache.carbondata.sdk.file.CarbonWriter;
import org.apache.carbondata.sdk.file.CarbonWriterBuilder;
import org.apache.carbondata.sdk.file.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/coprocessor/CarbonReplicationEndpoint.class */
public class CarbonReplicationEndpoint extends BaseReplicationEndpoint {
    private static final String CARBON_APPEND_BATCH = "hbase.carbon.append.batch";
    private static final String CARBON_WRITE_BATCH_SIZE_BYTES = "hbase.carbon.write.batch.size.bytes";
    private static final int CARBON_WRITE_BATCH_SIZE_BYTES_DEFAULT = 33554432;
    private static final String CARBON_WRITER_KEEP_ALIVE_TIME_MS = "hbase.carbon.writer.keepalive.time.ms";
    private static final long CARBON_WRITER_KEEP_ALIVE_TIME_MS_DEFAULT = 300000;
    private static final String CARBON_CONFIG_FOLDER = "hbase.carbon.config.folder";
    private Configuration conf;
    private int replicationRpcLimit;
    private int writeBatchSizeBytes;
    private volatile long lastReplicateTime;
    private volatile boolean isWriting;
    private ThreadPoolExecutor exec;
    private int maxThreads;
    private TableDescriptors tableDescriptors;
    private boolean appendBatch;
    private static final Logger LOG = LoggerFactory.getLogger(CarbonReplicationEndpoint.class);
    private static String DELETE = String.valueOf(1);
    private static String DELETEFAMILY = String.valueOf(2);
    private static String INSERT = String.valueOf(0);
    private ScheduledExecutorService writeTimeChecker = Executors.newSingleThreadScheduledExecutor();
    private Map<TableName, CarbonHbaseMeta> tableSchemaMap = Maps.newConcurrentMap();
    private Map<String, CarbonWriter> regionsWriterMap = Maps.newConcurrentMap();
    private Map<String, CarbonTable> pathCarbonTableMap = Maps.newConcurrentMap();
    private CarbonWriterBuilder builder = null;
    private String taskNo = null;
    private long factTimeStamp = 0;

    public UUID getPeerUUID() {
        return this.ctx.getClusterId();
    }

    public boolean canReplicateToSameCluster() {
        return true;
    }

    public void start() {
        startAsync();
    }

    public void stop() {
        closeCarbonWriters();
        stopAsync();
    }

    protected void doStart() {
        notifyStarted();
    }

    protected void doStop() {
        notifyStopped();
    }

    public void init(ReplicationEndpoint.Context context) throws IOException {
        super.init(context);
        this.conf = this.ctx.getConfiguration();
        this.tableDescriptors = this.ctx.getTableDescriptors();
        this.replicationRpcLimit = (int) (0.95d * this.conf.getLong("hbase.ipc.max.request.size", 268435456L));
        this.writeBatchSizeBytes = Math.min(this.conf.getInt(CARBON_WRITE_BATCH_SIZE_BYTES, CARBON_WRITE_BATCH_SIZE_BYTES_DEFAULT), this.replicationRpcLimit);
        this.lastReplicateTime = System.currentTimeMillis();
        this.maxThreads = this.conf.getInt("hbase.replication.source.maxthreads", 10);
        this.appendBatch = this.conf.getBoolean(CARBON_APPEND_BATCH, true);
        this.exec = new ThreadPoolExecutor(this.maxThreads, this.maxThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue());
        this.exec.allowCoreThreadTimeOut(true);
        final long j = this.conf.getLong(CARBON_WRITER_KEEP_ALIVE_TIME_MS, CARBON_WRITER_KEEP_ALIVE_TIME_MS_DEFAULT);
        this.writeTimeChecker.scheduleAtFixedRate(new Runnable() { // from class: org.apache.hadoop.hbase.coprocessor.CarbonReplicationEndpoint.1
            @Override // java.lang.Runnable
            public void run() {
                if (System.currentTimeMillis() - CarbonReplicationEndpoint.this.lastReplicateTime <= j || CarbonReplicationEndpoint.this.isWriting) {
                    return;
                }
                CarbonReplicationEndpoint.LOG.info("No data comes, will close carbon writers.");
                CarbonReplicationEndpoint.this.closeCarbonWriters();
            }
        }, 0L, 1L, TimeUnit.MINUTES);
        try {
            loadCarbonConfigFromFile(this.conf);
        } catch (Exception e) {
            LOG.warn("Error while loading carbon config, just ignore it. ", e);
        }
    }

    @VisibleForTesting
    public void init(Configuration configuration) throws IOException {
        this.conf = configuration;
        this.replicationRpcLimit = (int) (0.95d * configuration.getLong("hbase.ipc.max.request.size", 268435456L));
        this.writeBatchSizeBytes = Math.min(configuration.getInt(CARBON_WRITE_BATCH_SIZE_BYTES, CARBON_WRITE_BATCH_SIZE_BYTES_DEFAULT), this.replicationRpcLimit);
        this.lastReplicateTime = System.currentTimeMillis();
        this.maxThreads = configuration.getInt("hbase.replication.source.maxthreads", 10);
        this.appendBatch = configuration.getBoolean(CARBON_APPEND_BATCH, true);
        this.exec = new ThreadPoolExecutor(this.maxThreads, this.maxThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue());
        this.exec.allowCoreThreadTimeOut(true);
        final long j = configuration.getLong(CARBON_WRITER_KEEP_ALIVE_TIME_MS, CARBON_WRITER_KEEP_ALIVE_TIME_MS_DEFAULT);
        this.writeTimeChecker.scheduleAtFixedRate(new Runnable() { // from class: org.apache.hadoop.hbase.coprocessor.CarbonReplicationEndpoint.2
            @Override // java.lang.Runnable
            public void run() {
                if (System.currentTimeMillis() - CarbonReplicationEndpoint.this.lastReplicateTime <= j || CarbonReplicationEndpoint.this.isWriting) {
                    return;
                }
                CarbonReplicationEndpoint.LOG.info("No data comes, will close carbon writers.");
                CarbonReplicationEndpoint.this.closeCarbonWriters();
            }
        }, 0L, 1L, TimeUnit.MINUTES);
        try {
            loadCarbonConfigFromFile(configuration);
        } catch (Exception e) {
            LOG.warn("Error while loading carbon config, just ignore it. ", e);
        }
    }

    public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
        LOG.info("I will replicate to carbon...");
        this.isWriting = true;
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(this.exec);
        try {
            List<List<WAL.Entry>> createBatches = createBatches(replicateContext.getEntries());
            while (isRunning() && !this.exec.isShutdown() && !isPeerEnabled()) {
                Threads.sleep(1000L);
            }
            initCarbonWriters(createBatches);
            parallelReplicate(executorCompletionService, replicateContext, createBatches);
            this.isWriting = false;
            return true;
        } catch (Exception e) {
            LOG.error("Exception occured while writing the data", e);
            return false;
        }
    }

    private List<List<WAL.Entry>> createBatches(List<WAL.Entry> list) {
        TreeMap treeMap = new TreeMap(Bytes.BYTES_COMPARATOR);
        for (WAL.Entry entry : list) {
            TableName tableName = entry.getKey().getTableName();
            try {
                if (getCarbonSchemaStr(tableName) != null) {
                    ((List) treeMap.computeIfAbsent(entry.getKey().getEncodedRegionName(), bArr -> {
                        return new ArrayList();
                    })).add(entry);
                }
            } catch (Exception e) {
                LOG.error("Exception occured while retrieving carbon schema details of table=" + tableName.getNameAsString(), e);
            }
        }
        return new ArrayList(treeMap.values());
    }

    public String getCarbonSchemaStr(TableName tableName) throws IOException {
        if (this.tableDescriptors.get(tableName) != null) {
            return this.tableDescriptors.get(tableName).getValue(CarbonMasterObserver.CARBON_SCHEMA_DESC);
        }
        LOG.warn("table desc is null: " + tableName.getNameAsString());
        return null;
    }

    private long parallelReplicate(CompletionService<Integer> completionService, ReplicationEndpoint.ReplicateContext replicateContext, List<List<WAL.Entry>> list) throws IOException {
        int i = 0;
        for (int i2 = 0; i2 < list.size(); i2++) {
            List<WAL.Entry> list2 = list.get(i2);
            if (!list2.isEmpty()) {
                LOG.info("Submitting {} entries of total size {}, total batches {}", new Object[]{Integer.valueOf(list2.size()), Integer.valueOf(replicateContext.getSize()), Integer.valueOf(list.size())});
                completionService.submit(createReplicator(list2, i2));
                i++;
            }
        }
        IOException iOException = null;
        long j = 0;
        for (int i3 = 0; i3 < i; i3++) {
            try {
                int intValue = completionService.take().get().intValue();
                List<WAL.Entry> list3 = list.get(intValue);
                list.set(intValue, Collections.emptyList());
                long writeTime = list3.get(list3.size() - 1).getKey().getWriteTime();
                if (writeTime > j) {
                    j = writeTime;
                }
            } catch (InterruptedException e) {
                iOException = new IOException(e);
            } catch (ExecutionException e2) {
                iOException = (IOException) e2.getCause();
            }
        }
        if (iOException != null) {
            throw iOException;
        }
        if (!this.appendBatch) {
            LOG.info("Will close carbon writers.");
            closeCarbonWriters();
        }
        this.lastReplicateTime = System.currentTimeMillis();
        return j;
    }

    protected Callable<Integer> createReplicator(List<WAL.Entry> list, int i) {
        return () -> {
            return Integer.valueOf(serialReplicateRegionEntries(list, i));
        };
    }

    private int serialReplicateRegionEntries(List<WAL.Entry> list, int i) throws IOException {
        int i2 = 0;
        int i3 = 0;
        ArrayList arrayList = new ArrayList();
        for (WAL.Entry entry : list) {
            int estimatedEntrySize = getEstimatedEntrySize(entry);
            if (i2 > 0 && i2 + estimatedEntrySize > this.writeBatchSizeBytes) {
                int i4 = i3;
                i3++;
                writeToCarbonFile(arrayList, i4);
                arrayList.clear();
                i2 = 0;
            }
            arrayList.add(entry);
            i2 += estimatedEntrySize;
        }
        if (i2 > 0) {
            writeToCarbonFile(arrayList, i3);
        }
        return i;
    }

    private int getEstimatedEntrySize(WAL.Entry entry) {
        return (int) (entry.getKey().estimatedSerializedSizeOf() + entry.getEdit().estimatedSerializedSizeOf());
    }

    protected boolean isPeerEnabled() {
        return this.ctx.getReplicationPeer().isPeerEnabled();
    }

    private void initCarbonWriters(List<List<WAL.Entry>> list) {
        for (List<WAL.Entry> list2 : list) {
            if (!list2.isEmpty()) {
                for (WAL.Entry entry : list2) {
                    String bytes = Bytes.toString(entry.getKey().getEncodedRegionName());
                    if (this.regionsWriterMap.get(bytes) == null) {
                        try {
                            createCarbonWriter(entry.getKey().getTableName(), bytes);
                        } catch (Exception e) {
                            LOG.error("Exception occured while initializing carbon writer for region " + bytes, e);
                        }
                    }
                }
            }
        }
    }

    private void createCarbonWriter(TableName tableName, String str) throws IOException, InvalidLoadOptionException {
        Schema schema;
        Map<String, String> tblProperties;
        CarbonHbaseMeta carbonHbaseMeta = this.tableSchemaMap.get(tableName);
        if (carbonHbaseMeta == null) {
            String carbonSchemaStr = getCarbonSchemaStr(tableName);
            LOG.info("Region name is: " + str + " Current thread is: " + Thread.currentThread().getName());
            LOG.info("Carbon schema is: " + carbonSchemaStr);
            tblProperties = new HashMap();
            schema = CarbonSchemaWriter.convertToSchemaFromJSON(carbonSchemaStr, tblProperties);
            carbonHbaseMeta = new CarbonHbaseMeta(schema, tblProperties);
            this.tableSchemaMap.put(tableName, carbonHbaseMeta);
        } else {
            schema = carbonHbaseMeta.getSchema();
            tblProperties = carbonHbaseMeta.getTblProperties();
        }
        tblProperties.put("primary_key_columns", carbonHbaseMeta.getPrimaryKeyColumns());
        String str2 = tblProperties.get(CarbonMasterObserver.PATH);
        String str3 = tblProperties.get(SparkS3Constants.FS_OBS_AK);
        String str4 = tblProperties.get(SparkS3Constants.FS_OBS_SK);
        String str5 = tblProperties.get(SparkS3Constants.FS_OBS_END_POINT);
        HashMap hashMap = new HashMap(tblProperties);
        hashMap.remove(CarbonMasterObserver.HBASE_MAPPING_DETAILS);
        hashMap.remove(CarbonMasterObserver.PATH);
        hashMap.remove(SparkS3Constants.FS_OBS_AK, str3);
        hashMap.remove(SparkS3Constants.FS_OBS_SK, str4);
        hashMap.remove(SparkS3Constants.FS_OBS_END_POINT, str5);
        Configuration configuration = new Configuration(this.conf);
        configuration.set(SparkS3Constants.FS_S3_AK, str3);
        configuration.set(SparkS3Constants.FS_S3_SK, str4);
        configuration.set(SparkS3Constants.FS_S3_END_POINT, str5);
        configuration.set(SparkS3Constants.FS_OBS_AK, str3);
        configuration.set(SparkS3Constants.FS_OBS_SK, str4);
        configuration.set(SparkS3Constants.FS_OBS_END_POINT, str5);
        configuration.set("spark.hadoop.fs.obs.impl", "org.apache.hadoop.fs.obs.OBSFileSystem");
        configuration.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.obs.OBSFileSystem");
        configuration.set("carbon.streamsegment.maxsize", "1073741824");
        ThreadLocalSessionInfo.setConfigurationToCurrentThread(configuration);
        CarbonTable carbonTable = this.pathCarbonTableMap.get(str2);
        String namespaceAsString = tableName.getNamespaceAsString();
        String qualifierAsString = tableName.getQualifierAsString();
        if (carbonTable == null) {
            try {
                String schemaFilePath = CarbonTablePath.getSchemaFilePath(str2);
                if (FileFactory.isFileExist(schemaFilePath, FileFactory.getFileType(schemaFilePath))) {
                    TableInfo readSchemaFile = CarbonUtil.readSchemaFile(schemaFilePath);
                    ThriftWrapperSchemaConverterImpl thriftWrapperSchemaConverterImpl = new ThriftWrapperSchemaConverterImpl();
                    carbonTable = CarbonTable.buildFromTableInfo((namespaceAsString == null || qualifierAsString == null) ? thriftWrapperSchemaConverterImpl.fromExternalToWrapperTableInfo(readSchemaFile, "", "_tempTable_" + String.valueOf(System.nanoTime()), str2) : thriftWrapperSchemaConverterImpl.fromExternalToWrapperTableInfo(readSchemaFile, namespaceAsString, qualifierAsString, str2));
                    this.pathCarbonTableMap.putIfAbsent(str2, carbonTable);
                }
            } catch (Exception e) {
                LOG.error("Error occured while reading thrift schema file for pk table, ", e);
            }
        }
        String str6 = "yyyy-MM-dd HH:mm:ss";
        if (carbonTable.getTableInfo() != null && carbonTable.getTableInfo().getFactTable().getTableProperties().get("timestampformat") != null) {
            str6 = (String) carbonTable.getTableInfo().getFactTable().getTableProperties().get("timestampformat");
        }
        this.builder = CarbonWriter.builder().outputPath(str2).withTable(carbonTable).withTableProperties(hashMap).withLoadOption("bad_records_action", "force").withLoadOption("timestampformat", str6).withRowFormat(schema).writtenBy(CarbonReplicationEndpoint.class.getSimpleName()).withHadoopConf(configuration);
        this.regionsWriterMap.put(str, this.builder.build());
        LOG.info("Path is: " + str2 + " Region name is: " + str + " Thread: " + Thread.currentThread().getName());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeCarbonWriters() {
        try {
            Iterator<CarbonWriter> it = this.regionsWriterMap.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.regionsWriterMap.clear();
        } catch (Exception e) {
            LOG.error("Exception occured while closing the carbon writer", e);
        }
    }

    private void writeToCarbonFile(List<WAL.Entry> list, int i) {
        String bytes = Bytes.toString(list.get(0).getKey().getEncodedRegionName());
        try {
            TableName tableName = list.get(i).getKey().getTableName();
            CarbonWriter carbonWriter = this.regionsWriterMap.get(bytes);
            if (carbonWriter == null) {
                createCarbonWriter(tableName, bytes);
                carbonWriter = this.regionsWriterMap.get(bytes);
            }
            CarbonHbaseMeta carbonHbaseMeta = this.tableSchemaMap.get(tableName);
            DataTypeConverter dataTypeConverter = carbonHbaseMeta.getDataTypeConverter();
            ByteArrayWrapper byteArrayWrapper = null;
            String[] strArr = null;
            Iterator<WAL.Entry> it = list.iterator();
            while (it.hasNext()) {
                Iterator it2 = it.next().getEdit().getCells().iterator();
                while (it2.hasNext()) {
                    Cell cell = (Cell) it2.next();
                    ByteArrayWrapper byteArrayWrapper2 = new ByteArrayWrapper(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), cell.getType());
                    boolean z = cell.getType() == Cell.Type.DeleteFamily;
                    boolean z2 = cell.getType() == Cell.Type.Delete;
                    boolean z3 = false;
                    if (byteArrayWrapper != null && byteArrayWrapper.equals(byteArrayWrapper2)) {
                        z3 = true;
                    }
                    if (!z3) {
                        if (strArr != null) {
                            carbonWriter.write(strArr);
                        }
                        strArr = new String[carbonHbaseMeta.getSchema().getFieldsLength()];
                        int[] keyColumnIndex = carbonHbaseMeta.getKeyColumnIndex();
                        dataTypeConverter.convertRowKeyWithDecode(HBaseTableReferenceBuilder.buildTableReference("replicate", carbonHbaseMeta.getPrimaryKeyColumns(), carbonHbaseMeta.getSchema().getFields()), keyColumnIndex.length, cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), keyColumnIndex, carbonHbaseMeta.getSchema().getFields(), strArr);
                        strArr[carbonHbaseMeta.getTimestampMapIndex()] = String.valueOf(cell.getTimestamp());
                        if (z) {
                            fillCellDeleteFamily(strArr, carbonHbaseMeta);
                        } else {
                            fillCell(strArr, cell, carbonHbaseMeta, z2);
                        }
                        if (z2) {
                            strArr[carbonHbaseMeta.getDeleteStatusMap()] = DELETE;
                        } else if (z) {
                            strArr[carbonHbaseMeta.getDeleteStatusMap()] = DELETEFAMILY;
                        } else {
                            strArr[carbonHbaseMeta.getDeleteStatusMap()] = INSERT;
                        }
                    } else if (z) {
                        fillCellDeleteFamily(strArr, carbonHbaseMeta);
                    } else {
                        fillCell(strArr, cell, carbonHbaseMeta, z2);
                    }
                    byteArrayWrapper = byteArrayWrapper2;
                }
            }
            if (strArr != null) {
                carbonWriter.write(strArr);
            }
            carbonWriter.flushBatch();
        } catch (Exception e) {
            LOG.error("Exception occured while performing writer flush", e);
        }
    }

    private static void fillCellDeleteFamily(String[] strArr, CarbonHbaseMeta carbonHbaseMeta) {
        Iterator<Integer> it = carbonHbaseMeta.getSchemaMapping().values().iterator();
        while (it.hasNext()) {
            strArr[it.next().intValue()] = DELETE;
        }
    }

    private static void fillCell(String[] strArr, Cell cell, CarbonHbaseMeta carbonHbaseMeta, boolean z) {
        int schemaIndexOfColumn = carbonHbaseMeta.getSchemaIndexOfColumn(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
        if (schemaIndexOfColumn == -1) {
            return;
        }
        if (z) {
            strArr[schemaIndexOfColumn] = DELETE;
        } else {
            strArr[schemaIndexOfColumn] = carbonHbaseMeta.getDataTypeConverter().convertWithDecode(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), carbonHbaseMeta.getSchema().getFields()[schemaIndexOfColumn].getDataType());
        }
    }

    private void loadCarbonConfigFromFile(Configuration configuration) {
        String str = configuration.get(SparkS3Constants.FS_OBS_END_POINT);
        String str2 = configuration.get(SparkS3Constants.FS_OBS_AK);
        String str3 = configuration.get(SparkS3Constants.FS_OBS_SK);
        String str4 = configuration.get(CARBON_CONFIG_FOLDER);
        if (str == null || str2 == null || str3 == null || str4 == null) {
            LOG.warn("Carbon config file info is not found, will not load it.");
        } else {
            CarbonPropertiesLoader.loadConfig(str, str2, str3, str4);
        }
    }
}
