package com.huawei.cdc.connect.mysql;

import com.huawei.cdc.common.Scheduler;
import com.huawei.cdc.common.SourceTaskInfoCache;
import com.huawei.cdc.common.conf.CommonConfiguration;
import com.huawei.cdc.common.kafka.TopicCreation;
import com.huawei.cdc.common.metadata.client.ConnectorClient;
import com.huawei.cdc.common.util.CrypterUtils;
import com.huawei.cdc.connect.mysql.config.ConfigUtil;
import com.huawei.cdc.connect.mysql.config.ConnectorConfig;
import com.huawei.cdc.connect.mysql.processor.MysqlRecordFormatProcessor;
import com.huawei.cdc.connect.mysql.processor.TaskProcessor;
import com.huawei.cdc.connect.mysql.processor.TaskProcessorFactory;
import com.huawei.cdc.connect.mysql.util.BinlogPosition;
import com.huawei.cdc.connect.mysql.util.CDCMysqlProperties;
import com.huawei.cdc.connect.mysql.util.CommonConstants;
import com.huawei.cdc.connect.mysql.util.DebeziumMysqlProperties;
import com.huawei.cdc.connect.mysql.util.DebeziumRecordConstants;
import com.huawei.cdc.connect.mysql.util.MySqlMetricsData;
import com.huawei.cdc.connect.mysql.util.TransactionInfo;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/cdc/connect/mysql/MysqlSourceTask.class */
public class MysqlSourceTask extends SourceTask {
    MysqlRecordFormatProcessor mysqlRecordFormatProcessor;
    TaskProcessor taskProcessor;
    ConnectorConfig taskConfig;
    Set<String> assignedTables;
    private static final int METRIC_UPDATE_INTERVAL = 20;
    private TransactionInfo processingTransaction;
    private TransactionInfo successfulTransaction;
    private String internalTopicName;
    private BinlogPosition startPosition;
    private Map<String, String> tablesInfo;
    private Scheduler scheduler;
    MySqlMetricsData mySqlMetricsData;
    public static final Logger log = LoggerFactory.getLogger(MysqlSourceTask.class);
    private static final ConcurrentHashMap<String, String> PROCESSED_TABLES = new ConcurrentHashMap<>();
    private long totalRecords = 0;
    MySQLDebeziumAdapter mySQLDebeziumAdapter = new MySQLDebeziumAdapter();
    private long lastMetricUpdate = 0;
    private final AtomicBoolean metricsChangeFlag = new AtomicBoolean(false);
    private final AtomicLong dataProcessedIncremental = new AtomicLong(0);
    private final AtomicLong recordsProcessedIncremental = new AtomicLong(0);
    private final AtomicInteger tablesProcessedIncremental = new AtomicInteger(0);
    private final Object metricsLock = new Object();

    public String version() {
        return "1.1.0";
    }

    public void start(Map<String, String> map) {
        log.info("Begin to start MySQl source task.");
        this.tablesInfo = map;
        this.taskConfig = new ConnectorConfig(this.tablesInfo);
        setStartPosition(this.tablesInfo);
        this.scheduler = new Scheduler(MysqlSourceConnector.class, Duration.ofMillis(Integer.parseInt(CommonConfiguration.CDL_SCHEDULER_TIMEOUT_MS)));
        try {
            this.scheduler.execute(this::startMysqlSourceTask, "Starting MySQL Source task");
        } catch (Exception e) {
            log.error("Error occur when start MySQL source task.");
            throw new RuntimeException(e.getCause());
        }
    }

    private void startMysqlSourceTask() {
        this.mySqlMetricsData = new MySqlMetricsData(PROCESSED_TABLES, this.metricsLock, this.tablesProcessedIncremental, this.metricsChangeFlag, this.dataProcessedIncremental, this.recordsProcessedIncremental);
        this.assignedTables = ConfigUtil.getTablesProcessing(this.tablesInfo.get(CDCMysqlProperties.TABLES));
        log.info("Init MySQl source task.");
        initProcessor();
        log.info("Task {} started", this.taskConfig.getIntTaskId());
        SourceTaskInfoCache.putTable(this.taskConfig.getConnectorName(), this.taskConfig.getIntTaskId(), this.tablesInfo.get(CDCMysqlProperties.TABLES));
        Map<String, String> debeziumTaskProps = ConfigUtil.getDebeziumTaskProps(new CrypterUtils().decryptMap(this.tablesInfo));
        this.internalTopicName = debeziumTaskProps.get(DebeziumMysqlProperties.DATABASE_HISTORY_KAFKA_TOPIC);
        log.info("DBZ properties derived for task");
        this.mySQLDebeziumAdapter.start(this.taskProcessor, debeziumTaskProps);
        log.info("DBZ MySQL source task started");
    }

    private void setStartPosition(Map<String, String> map) {
        this.startPosition = new BinlogPosition(map.get(CDCMysqlProperties.BINLOG_FILENAME), Long.parseLong(map.get(CDCMysqlProperties.BINLOG_POSITION)), map.get(CDCMysqlProperties.GTID_SET));
    }

    private void initProcessor() {
        try {
            this.processingTransaction = new TransactionInfo();
            this.successfulTransaction = new TransactionInfo();
            this.taskProcessor = TaskProcessorFactory.getInstance().getTaskProcessor(this.taskConfig);
            this.taskProcessor.init(this.taskConfig, CommonConstants.TASK_SOURCE);
            if (this.taskProcessor == null) {
                return;
            }
            this.mysqlRecordFormatProcessor = new MysqlRecordFormatProcessor(this.taskProcessor, this.mySqlMetricsData, this.taskConfig, this.startPosition, this.assignedTables);
        } catch (Exception e) {
            log.error("Error during init processor in mysql Source Connector");
            this.taskProcessor.postError("HIGH", "TASK FAILED", e, this.taskConfig.getJobExecutionId());
            stopMysqlSourceTask();
            throw new ConnectException(e);
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        if (this.taskProcessor.getTables() == null) {
            log.info("Resume the task: Initializing task processor");
            initProcessor();
        }
        CompletableFuture.runAsync(() -> {
            manageStatus();
        });
        if (this.taskConfig.isSchemaAutoCreateEnabled() && !this.taskProcessor.ddlRecordQueue.isEmpty()) {
            ArrayList arrayList = null;
            while (true) {
                SourceRecord poll = this.taskProcessor.ddlRecordQueue.poll();
                if (poll == null) {
                    break;
                }
                if (arrayList == null) {
                    arrayList = new ArrayList();
                }
                arrayList.add(poll);
            }
            if (arrayList != null && arrayList.size() > 0) {
                return arrayList;
            }
        }
        List<SourceRecord> poll2 = this.mySQLDebeziumAdapter.poll();
        if (poll2 == null || poll2.size() == 0) {
            return null;
        }
        CompletableFuture.runAsync(() -> {
            recordProcessingTransaction(poll2);
            SourceTaskInfoCache.putIdentifier(this.taskConfig.getConnectorName(), this.taskConfig.getIntTaskId(), this.processingTransaction.getTransactionInfo(), this.processingTransaction.getTimestamp());
        });
        List<SourceRecord> processDebeziumRecords = this.mysqlRecordFormatProcessor.processDebeziumRecords(poll2);
        if (processDebeziumRecords.size() > 0) {
            recordSuccessfulTransaction(processDebeziumRecords);
            SourceTaskInfoCache.putSuccessfulIdentifier(this.taskConfig.getConnectorName(), this.taskConfig.getIntTaskId(), this.successfulTransaction.getTransactionInfo(), this.successfulTransaction.getTimestamp());
            synchronized (this.metricsLock) {
                this.totalRecords += processDebeziumRecords.size();
                this.metricsChangeFlag.set(true);
            }
            log.debug("Task: {} Completed Source records: {}", this.taskConfig.getIntTaskId(), Long.valueOf(this.totalRecords));
        }
        return processDebeziumRecords;
    }

    public void stop() {
        log.info("Begin to stop MySQL source task.");
        try {
            this.scheduler.execute(this::stopMysqlSourceTask, "Stopping MySQL Source task");
        } catch (Exception e) {
            log.error("Failed to stop MySQL source task.");
        }
        if (this.scheduler != null) {
            try {
                this.scheduler.close();
            } catch (Throwable th) {
                log.warn("Failed to close {}", this.scheduler.getClass().getSimpleName());
            }
        }
        log.info("Success in  stopping MySQL source task.");
    }

    private void stopMysqlSourceTask() {
        if (this.mySQLDebeziumAdapter != null) {
            this.mySQLDebeziumAdapter.stop();
        }
        try {
            TopicCreation topicCreation = new TopicCreation();
            Throwable th = null;
            try {
                topicCreation.deleteTopics(Collections.singletonList(this.internalTopicName));
                if (topicCreation != null) {
                    if (0 != 0) {
                        try {
                            topicCreation.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topicCreation.close();
                    }
                }
            } finally {
            }
        } catch (Exception e) {
            log.info("Could not delete internal topic {}", this.internalTopicName);
        }
        if (this.taskProcessor != null) {
            this.taskProcessor.detachTask(this.taskConfig.getIntTaskId());
        }
        this.taskProcessor = null;
        if (this.processingTransaction != null && this.successfulTransaction != null && this.taskConfig != null) {
            ConnectorClient connectorClient = new ConnectorClient();
            connectorClient.updateFailedIdentifier(this.processingTransaction.getTransactionInfo(), this.successfulTransaction.getTransactionInfo(), this.taskConfig.getConnectorName(), this.taskConfig.getIntTaskId(), "mysql".toUpperCase(Locale.ENGLISH));
            connectorClient.shutdown();
        }
        this.mySQLDebeziumAdapter = null;
        this.mysqlRecordFormatProcessor = null;
        this.processingTransaction = null;
        this.successfulTransaction = null;
    }

    private LocalDateTime getDateTime(long j) {
        return LocalDateTime.ofInstant(Instant.ofEpochMilli(j), TimeZone.getDefault().toZoneId());
    }

    public void manageStatus() {
        if (this.lastMetricUpdate == 0 || ChronoUnit.SECONDS.between(getDateTime(this.lastMetricUpdate), getDateTime(System.currentTimeMillis())) >= 20) {
            this.lastMetricUpdate = System.currentTimeMillis();
            if (this.metricsChangeFlag.get()) {
                postCounts();
            }
        }
    }

    private void recordProcessingTransaction(List<SourceRecord> list) {
        Struct struct = (Struct) list.get(0).value();
        if (struct != null) {
            Struct struct2 = struct.getStruct(DebeziumRecordConstants.DBZ_SOURCE);
            String string = struct2.getString("gtid");
            String string2 = struct2.getString("file");
            Long int64 = struct2.getInt64("pos");
            this.processingTransaction.setTransactionInfo(string, string2, int64 == null ? CommonConstants.EMPTY : int64.toString());
            if (struct2.getInt64(DebeziumRecordConstants.DBZ_TIMESTAMP).longValue() != 0) {
                this.processingTransaction.setTimestamp(struct2.getInt64(DebeziumRecordConstants.DBZ_TIMESTAMP).longValue());
            }
        }
    }

    private void recordSuccessfulTransaction(List<SourceRecord> list) {
        if (list == null || list.size() <= 0) {
            return;
        }
        Struct struct = (Struct) list.get(list.size() - 1).value();
        List<Struct> array = ((Struct) struct.get("transaction")).getArray("properties");
        String str = CommonConstants.EMPTY;
        String str2 = CommonConstants.EMPTY;
        String str3 = CommonConstants.EMPTY;
        for (Struct struct2 : array) {
            String string = struct2.getString(CDCMysqlProperties.CONNECTOR_NAME);
            if ("file".equals(string)) {
                str = struct2.getString("value");
            } else if ("pos".equals(string)) {
                str2 = struct2.getString("value");
            } else if ("gtid".equals(string)) {
                str3 = struct2.getString("value");
            }
        }
        this.successfulTransaction.setTransactionInfo(str3, str, str2);
        if (struct.get("TIMESTAMP") != null) {
            this.successfulTransaction.setTimestamp((Timestamp) struct.get("TIMESTAMP"));
        }
    }

    private void postCounts() {
        synchronized (this.metricsLock) {
            this.taskProcessor.updateMetrics(this.taskConfig.getIntTaskId(), this.tablesProcessedIncremental.get(), this.totalRecords - this.recordsProcessedIncremental.get(), this.dataProcessedIncremental.get(), this.taskConfig.getJobExecutionId());
            this.metricsChangeFlag.set(false);
            this.tablesProcessedIncremental.set(0);
            this.recordsProcessedIncremental.set(this.totalRecords);
            this.dataProcessedIncremental.set(0L);
        }
    }
}
