package com.huawei.cdc.connect.drs.consumer;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.json.JsonSanitizer;
import com.huawei.cdc.common.metadata.models.HeartbeatData;
import com.huawei.cdc.common.metadata.util.UniqueIdHelper;
import com.huawei.cdc.common.util.CrypterUtils;
import com.huawei.cdc.common.util.StructUtils;
import com.huawei.cdc.connect.drs.config.ConnectorConfig;
import com.huawei.cdc.connect.drs.consumer.parser.DRSToCDLSchemaParser;
import com.huawei.cdc.connect.drs.consumer.util.DrsMessageConst;
import com.huawei.cdc.connect.drs.consumer.util.KafkaConsumerConst;
import com.huawei.cdc.connect.drs.consumer.util.SchemaConst;
import com.huawei.cdc.connect.drs.processor.TaskProcessor;
import com.huawei.cdc.parser.java.mysql.MySQLDDLParser;
import com.huawei.cdc.parser.operations.ddl.DDLOperation;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javafx.util.Pair;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/cdc/connect/drs/consumer/KafkaConsumerUtil.class */
public class KafkaConsumerUtil {
    private final KafkaConsumer kafkaConsumer;
    private Map<String, String> props;
    private Set<String> topics;
    private Map<String, String> tableTopicMap;
    private Pair<Set<String>, Map<String, String>> topicsAndTableTopicMapPair;
    private boolean kafkaMessageReaderFlag;
    private Thread kafkaMessageReaderThread;
    private boolean heartbeatEnabled;
    private int heartbeatFrequency;
    private long lastHeartbeat;
    private String heartbeatIdentifier;
    private long lastMetricUpdate;
    private int tablesProcessedIncremental;
    private long dataProcessedIncremental;
    private long recordsProcessedIncremental;
    private boolean metricsChangeFlag;
    private static final int METRIC_UPDATE_INTERVAL = 20;
    private final Object metricsLock;
    private HeartbeatData heartbeatData;
    private TaskProcessor taskProcessor;
    private ConnectorConfig config;
    private Struct dataStruct;
    private Struct beforeDataStruct;
    private String sslJaasConfigPattern;
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerUtil.class);
    private static final DRSToCDLSchemaParser DRS_TO_CDL_SCHEMA_PARSER = new DRSToCDLSchemaParser();
    private static LinkedBlockingQueue<SourceRecord> cdlRecordQueue = new LinkedBlockingQueue<>(Integer.MAX_VALUE);
    private static final ConcurrentHashMap<String, String> PROCESSED_TABLES = new ConcurrentHashMap<>();

    public KafkaConsumerUtil(Map<String, String> map, Set<String> set) {
        this.lastHeartbeat = 0L;
        this.lastMetricUpdate = 0L;
        this.tablesProcessedIncremental = 0;
        this.dataProcessedIncremental = 0L;
        this.recordsProcessedIncremental = 0L;
        this.metricsLock = new Object();
        this.sslJaasConfigPattern = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$USERNAME\" password=\"$PASSWORD\";";
        this.props = map;
        this.topics = set;
        this.kafkaConsumer = new KafkaConsumer(initProperties());
        validateSourceTopicAvailability();
    }

    public KafkaConsumerUtil(Pair<Set<String>, Map<String, String>> pair, boolean z, int i, TaskProcessor taskProcessor, ConnectorConfig connectorConfig) {
        this.lastHeartbeat = 0L;
        this.lastMetricUpdate = 0L;
        this.tablesProcessedIncremental = 0;
        this.dataProcessedIncremental = 0L;
        this.recordsProcessedIncremental = 0L;
        this.metricsLock = new Object();
        this.sslJaasConfigPattern = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$USERNAME\" password=\"$PASSWORD\";";
        this.topicsAndTableTopicMapPair = pair;
        this.heartbeatEnabled = z;
        this.heartbeatFrequency = i;
        this.taskProcessor = taskProcessor;
        this.config = connectorConfig;
        this.props = this.config.originalsStrings();
        this.heartbeatIdentifier = null;
        this.kafkaConsumer = new KafkaConsumer(initProperties());
        this.topics = (Set) this.topicsAndTableTopicMapPair.getKey();
        this.tableTopicMap = (Map) this.topicsAndTableTopicMapPair.getValue();
        this.kafkaConsumer.subscribe(getTopics());
        startPollingKafkaMessage();
    }

    private Set<String> getTopics() {
        return (Set) Stream.of((Object[]) this.props.get("topics").split(",")).collect(Collectors.toSet());
    }

    private void validateSourceTopicAvailability() {
        if (this.kafkaConsumer.listTopics().keySet().containsAll(this.topics)) {
            return;
        }
        log.error("One or more configured source topics are not available in the source kafka broker.");
        throw new ConfigException("One or more configured source topics are not available in the source kafka broker.");
    }

    public ConcurrentHashMap<String, Integer> getTopicPartitionMap() {
        ConcurrentHashMap<String, Integer> concurrentHashMap = new ConcurrentHashMap<>();
        for (String str : this.topics) {
            concurrentHashMap.put(str, Integer.valueOf(this.kafkaConsumer.partitionsFor(str).size()));
        }
        return concurrentHashMap;
    }

    private Properties initProperties() {
        Properties properties = new Properties();
        properties.put(KafkaConsumerConst.GROUP_ID, "CdlConsumer");
        properties.put(KafkaConsumerConst.MAX_POLL_INTERVAL_MS, "10000");
        properties.put(KafkaConsumerConst.ISOLATION_LEVEL, "read_committed");
        properties.put(KafkaConsumerConst.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(KafkaConsumerConst.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(KafkaConsumerConst.BOOTSTRAP_SERVERS, this.props.get(KafkaConsumerConst.BOOTSTRAP_SERVERS));
        if (this.props.containsKey("security.protocol")) {
            properties.putAll(initSecureDmsProperties());
        }
        return properties;
    }

    private Properties initSecureDmsProperties() {
        Properties properties = new Properties();
        String str = this.props.get("username");
        String replace = this.sslJaasConfigPattern.replace("$USERNAME", str).replace("$PASSWORD", new CrypterUtils().decryptString(this.props.get("password")));
        properties.put("security.protocol", this.props.get("security.protocol"));
        properties.put("ssl.truststore.location", this.props.get("ssl.truststore.location"));
        properties.put("sasl.jaas.config", replace);
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("ssl.truststore.password", "dms@kafka");
        properties.put("ssl.endpoint.identification.algorithm", SchemaConst.EMPTY);
        return properties;
    }

    private void manageStatus() {
        if (this.lastMetricUpdate == 0 || ChronoUnit.SECONDS.between(getDateTime(this.lastMetricUpdate), getDateTime(System.currentTimeMillis())) >= 20) {
            this.lastMetricUpdate = System.currentTimeMillis();
            if (this.metricsChangeFlag) {
                postCounts();
            }
        }
    }

    private void postCounts() {
        synchronized (this.metricsLock) {
            this.taskProcessor.updateMetrics(this.config.getIntTaskId(), this.tablesProcessedIncremental, this.recordsProcessedIncremental, this.dataProcessedIncremental, this.config.getJobExecutionId());
            this.metricsChangeFlag = false;
            this.tablesProcessedIncremental = 0;
            this.dataProcessedIncremental = 0L;
            this.recordsProcessedIncremental = 0L;
        }
    }

    private void updateMetrics(Struct struct, Struct struct2, Struct struct3) {
        String tableAndDDLOperation = getTableAndDDLOperation(struct);
        boolean z = false;
        if (StringUtils.isNotBlank(tableAndDDLOperation) && !PROCESSED_TABLES.containsKey(tableAndDDLOperation + "-" + this.config.getJobExecutionId())) {
            PROCESSED_TABLES.put(tableAndDDLOperation + "-" + this.config.getJobExecutionId(), SchemaConst.EMPTY);
            z = true;
        }
        updateMetrics(z, updateStructDataMetrics(struct2, struct3));
    }

    private void updateMetrics(boolean z, long j) {
        synchronized (this.metricsLock) {
            if (z) {
                this.tablesProcessedIncremental++;
            }
            this.recordsProcessedIncremental++;
            this.dataProcessedIncremental += j;
            this.metricsChangeFlag = true;
        }
    }

    private long updateStructDataMetrics(Struct struct, Struct struct2) {
        long j = 0;
        if (struct != null) {
            j = 0 + StructUtils.getSize(struct);
        }
        if (struct2 != null) {
            j += StructUtils.getSize(struct2);
        }
        return j;
    }

    private void startPollingKafkaMessage() {
        this.kafkaMessageReaderFlag = true;
        this.kafkaMessageReaderThread = new Thread(new Runnable() { // from class: com.huawei.cdc.connect.drs.consumer.KafkaConsumerUtil.1
            @Override // java.lang.Runnable
            public void run() {
                while (KafkaConsumerUtil.this.kafkaMessageReaderFlag) {
                    KafkaConsumerUtil.this.validateAndParseDrsMessage(KafkaConsumerUtil.this.kafkaConsumer.poll(1000L));
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                        KafkaConsumerUtil.log.error("Error on sleeping kafka message reader. {}", e.getMessage());
                    }
                }
                KafkaConsumerUtil.log.info("stopped polling kafka messages");
            }
        });
        this.kafkaMessageReaderThread.start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void validateAndParseDrsMessage(ConsumerRecords<String, String> consumerRecords) {
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            if (((String) consumerRecord.value()).startsWith("value:")) {
                Map<String, Object> parseStringToMap = parseStringToMap(((String) consumerRecord.value()).replace("value:", SchemaConst.EMPTY));
                boolean booleanValue = ((Boolean) parseStringToMap.get(DrsMessageConst.DRS_DDL_FLAG_FIELD)).booleanValue();
                Pair<String, DDLOperation> tableAndDDLOperation = getTableAndDDLOperation(parseStringToMap, booleanValue);
                String upperCase = ((String) tableAndDDLOperation.getKey()).toUpperCase(Locale.ENGLISH);
                DDLOperation dDLOperation = (DDLOperation) tableAndDDLOperation.getValue();
                if (this.tableTopicMap.containsKey(upperCase)) {
                    parseDRSToCDLMessage(consumerRecord.partition(), upperCase, booleanValue, dDLOperation, parseStringToMap);
                }
            }
            log.debug("Received message: (" + ((String) consumerRecord.key()) + ", " + ((String) consumerRecord.value()) + ") at offset " + consumerRecord.offset());
        }
    }

    private Pair<String, DDLOperation> getTableAndDDLOperation(Map<String, Object> map, boolean z) {
        String str;
        DDLOperation dDLOperation = null;
        if (z) {
            dDLOperation = (DDLOperation) new MySQLDDLParser().parseStatement(processSql((String) map.get(DrsMessageConst.DRS_SQL_REDO_FIELD)), (String) map.get(DrsMessageConst.DRS_DATABASE_NAME_FIELD));
            str = dDLOperation.getName();
        } else {
            str = (String) map.get(DrsMessageConst.DRS_TABLE_NAME_FIELD);
        }
        return new Pair<>(str, dDLOperation);
    }

    private String processSql(String str) {
        return str.replace(SchemaConst.BACKTICK, SchemaConst.EMPTY).replaceAll("//.*|(\"(?:\\\\[^\"]|\\\\\"|.)*?\")|(?s)/\\*.*?\\*/", SchemaConst.SPACE).trim();
    }

    private Map<String, Object> parseStringToMap(String str) {
        Map<String, Object> map = null;
        try {
            map = (Map) new ObjectMapper().readValue(JsonSanitizer.sanitize(str).getBytes(), new TypeReference<Map<String, Object>>() { // from class: com.huawei.cdc.connect.drs.consumer.KafkaConsumerUtil.2
            });
        } catch (Exception e) {
            log.error("Error while parsing drs message. {}", e.getMessage());
        }
        return map;
    }

    private void parseDRSToCDLMessage(int i, String str, boolean z, DDLOperation dDLOperation, Map<String, Object> map) {
        Struct struct;
        if (map != null) {
            String str2 = this.props.get(KafkaConsumerConst.DATASTORE_TYPE);
            if (this.heartbeatEnabled) {
                this.heartbeatIdentifier = getHeartbeat();
            }
            if (z) {
                struct = DRS_TO_CDL_SCHEMA_PARSER.getCDLDDLMessageStruct(map, str2, str, dDLOperation);
            } else {
                Map<String, Struct> cDLDMLMessageStruct = DRS_TO_CDL_SCHEMA_PARSER.getCDLDMLMessageStruct(map, str2, str, this.heartbeatIdentifier);
                this.dataStruct = cDLDMLMessageStruct.get("dataStruct");
                this.beforeDataStruct = cDLDMLMessageStruct.get("beforeDataStruct");
                struct = cDLDMLMessageStruct.get("cdlDMLMessageStruct");
            }
            updateMetrics(struct, this.beforeDataStruct, this.dataStruct);
            if (this.heartbeatIdentifier != null) {
                this.lastHeartbeat = System.currentTimeMillis();
                prepareHeartBeatData(struct, map);
                this.taskProcessor.postHeartbeat(this.heartbeatData);
            }
            cdlRecordQueue.add(new SourceRecord(Collections.singletonMap(KafkaConsumerConst.PARTITION_FIELD, Integer.valueOf(i)), new HashMap(), this.tableTopicMap.get(str), struct.schema(), struct));
        }
    }

    private String getHeartbeat() {
        if (this.lastHeartbeat == 0 || ChronoUnit.SECONDS.between(getDateTime(this.lastHeartbeat), getDateTime(System.currentTimeMillis())) >= this.heartbeatFrequency) {
            return UniqueIdHelper.getId();
        }
        return null;
    }

    private LocalDateTime getDateTime(long j) {
        return LocalDateTime.ofInstant(Instant.ofEpochMilli(j), TimeZone.getDefault().toZoneId());
    }

    private void prepareHeartBeatData(Struct struct, Map<String, Object> map) {
        this.heartbeatData = new HeartbeatData();
        this.heartbeatData.setLazyUid(this.heartbeatIdentifier);
        this.heartbeatData.setSubmissionId(this.config.getJobExecutionId());
        this.heartbeatData.setSourceSchema(getSegOwner(struct));
        this.heartbeatData.setSourceEntity(getTableAndDDLOperation(struct));
        this.heartbeatData.setSourceTaskId(this.config.getConnectorName() + "_" + this.config.getIntTaskId());
        this.heartbeatData.setSourceCommitTime(String.valueOf(map.get(DrsMessageConst.DRS_KAFKA_DELIVERY_TIMESTAMP_FIELD)));
        this.heartbeatData.setKafkaCommitTime(String.valueOf(this.lastHeartbeat));
        this.heartbeatData.setSourceConnectionId(this.config.getConnectionId());
        this.heartbeatData.setCreatedUser((String) null);
        this.heartbeatData.setUpdatedUser((String) null);
    }

    private String getSegOwner(Struct struct) {
        return struct.getString("SEG_OWNER");
    }

    private String getTableAndDDLOperation(Struct struct) {
        return struct.schema().field("OBJECT_NAME") != null ? struct.getString("OBJECT_NAME") : struct.getString("TABLE_NAME");
    }

    public List<SourceRecord> getParsedCDLRecords() {
        manageStatus();
        ArrayList arrayList = new ArrayList();
        while (!cdlRecordQueue.isEmpty()) {
            try {
                arrayList.add(cdlRecordQueue.take());
            } catch (InterruptedException e) {
                log.error("Error on taking records from cdl queue. {}", e.getCause(), e);
            }
        }
        return arrayList;
    }

    public void stop() {
        try {
            this.kafkaMessageReaderFlag = false;
            this.kafkaMessageReaderThread.join();
            this.kafkaMessageReaderThread = null;
            this.kafkaConsumer.unsubscribe();
        } catch (InterruptedException e) {
            log.error("Error on stopping drs message poll {}", e.getCause(), e);
        }
    }
}
