package com.huawei.cdc.connect.drs;

import com.huawei.cdc.common.conf.CommonConfiguration;
import com.huawei.cdc.connect.drs.config.ConnectorConfig;
import com.huawei.cdc.connect.drs.consumer.KafkaConsumerUtil;
import com.huawei.cdc.connect.drs.processor.TaskProcessor;
import com.huawei.cdc.connect.drs.processor.TaskProcessorFactory;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javafx.util.Pair;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/cdc/connect/drs/DrsSourceTask.class */
public class DrsSourceTask extends SourceTask {
    static final Logger log = LoggerFactory.getLogger(DrsSourceTask.class);
    private Map<String, String> propsMap;
    Pair<Set<String>, Map<String, String>> topicTableMappingPair;
    private KafkaConsumerUtil kafkaConsumerUtil;
    private TaskProcessor taskProcessor;
    private ConnectorConfig config;
    private boolean heartbeatEnabled;
    private int heartbeatFrequency;

    public String version() {
        return "1.1.0";
    }

    public void start(Map<String, String> map) {
        this.propsMap = map;
        this.config = new ConnectorConfig(this.propsMap);
        initProcessor();
        this.topicTableMappingPair = this.taskProcessor.getTopicTableMappingPair();
        log.debug("started drs source task...");
        this.kafkaConsumerUtil = new KafkaConsumerUtil(this.topicTableMappingPair, this.heartbeatEnabled, this.heartbeatFrequency, this.taskProcessor, this.config);
    }

    private void initProcessor() {
        this.taskProcessor = TaskProcessorFactory.getInstance().getTaskProcessor(this.config);
        initTaskProcessor();
        this.taskProcessor.waitForPreProcessing();
        if ("TRUE".equalsIgnoreCase(CommonConfiguration.ENABLE_HEARTBEAT)) {
            this.heartbeatEnabled = true;
            this.heartbeatFrequency = Integer.parseInt(CommonConfiguration.HEARTBEAT_FREQUENCY);
        }
    }

    private void initTaskProcessor() {
        try {
            this.taskProcessor.init(this.config, "TASK");
        } catch (Exception e) {
            log.error("Error initializing taskProcessor: {}", e.getMessage());
            this.taskProcessor.postError("HIGH", "TASK FAILED", e, this.config.getJobExecutionId());
            throw new ConnectException(e);
        }
    }

    public List<SourceRecord> poll() {
        log.debug("polling parsed cdl records.");
        return this.kafkaConsumerUtil.getParsedCDLRecords();
    }

    public void stop() {
        if (this.kafkaConsumerUtil != null) {
            this.kafkaConsumerUtil.stop();
        }
        if (this.taskProcessor != null) {
            this.taskProcessor.detach(this.config.getIntTaskId());
        }
        this.taskProcessor = null;
        log.debug("stopped drs source task...");
    }
}
