package com.huawei.cdc.connect.mysql;

import com.huawei.cdc.connect.mysql.processor.TaskProcessor;
import com.huawei.cdc.connect.mysql.util.BinlogPosition;
import com.huawei.cdc.connect.mysql.util.CommonConstants;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.BlockingReader;
import io.debezium.connector.mysql.CDLMysqlSnapshotReader;
import io.debezium.connector.mysql.CdlMysqlBinlogReader;
import io.debezium.connector.mysql.ChainedReader;
import io.debezium.connector.mysql.Filters;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlJdbcContext;
import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.connector.mysql.SourceInfo;
import io.debezium.connector.mysql.TimedBlockingReader;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/cdc/connect/mysql/MySQLDebeziumAdapter.class */
public class MySQLDebeziumAdapter {
    public static final Logger log = LoggerFactory.getLogger(MySQLDebeziumAdapter.class);
    private volatile MySqlTaskContext taskContext;
    private volatile MySqlJdbcContext connectionContext;
    private volatile ChainedReader readers;

    public void start(TaskProcessor taskProcessor, Map<String, String> map) {
        Configuration from = Configuration.from(map);
        try {
            boolean z = from.getBoolean(MySqlConnectorConfig.SNAPSHOT_EVENTS_AS_INSERTS);
            this.taskContext = createAndStartTaskContext(from, getAllFilters(from));
            this.taskContext.initializeHistoryStorage();
            this.connectionContext = this.taskContext.getConnectionContext();
            SourceInfo source = this.taskContext.source();
            BinlogPosition startPosition = taskProcessor.getStartPosition();
            source.setBinlogStartPoint(startPosition.getFileName(), startPosition.getPosition());
            if (startPosition.getGtidSet() != null && source.gtidSet() == null && this.connectionContext.isGtidModeEnabled()) {
                source.setCompletedGtidSet(startPosition.getGtidSet());
            }
            boolean isBinlogFormatRow = isBinlogFormatRow();
            boolean z2 = isBinlogFormatRow && isBinlogRowImageFull();
            ChainedReader.Builder builder = new ChainedReader.Builder();
            if (1 != 0) {
                CDLMysqlSnapshotReader cDLMysqlSnapshotReader = new CDLMysqlSnapshotReader("snapshot", this.taskContext);
                cDLMysqlSnapshotReader.setBinlogPosition(startPosition);
                if (z) {
                    cDLMysqlSnapshotReader.generateInsertEvents();
                }
                if (!this.taskContext.getConnectorConfig().getSnapshotDelay().isZero()) {
                    builder.addReader(new TimedBlockingReader("timed-blocker", this.taskContext.getConnectorConfig().getSnapshotDelay()));
                }
                builder.addReader(cDLMysqlSnapshotReader);
                if (this.taskContext.isInitialSnapshotOnly()) {
                    log.warn("This connector will only perform a snapshot, and will stop after that completes.");
                    builder.addReader(new BlockingReader("blocker", "Connector has completed all of its work but will continue in the running state. It can be shut down at any time."));
                    builder.completionMessage("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate.");
                } else {
                    if (!z2) {
                        if (!isBinlogFormatRow) {
                            throw new ConnectException("The MySQL server is not configured to use a ROW binlog_format, which is required for this connector to work properly. Change the MySQL configuration to use a binlog_format=ROW and restart the connector.");
                        }
                        throw new ConnectException("The MySQL server is not configured to use a FULL binlog_row_image, which is required for this connector to work properly. Change the MySQL configuration to use a binlog_row_image=FULL and restart the connector.");
                    }
                    builder.addReader(new CdlMysqlBinlogReader("binlog", this.taskContext, null));
                }
            }
            this.readers = builder.build();
            this.readers.uponCompletion(this::completeReaders);
            this.readers.initialize();
            this.readers.start();
        } catch (Exception e) {
            try {
                stop();
            } catch (Exception e2) {
                log.error("Failed to start the connector, but got this error while cleaning up", e2);
            }
            throw new ConnectException(e);
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        ChainedReader chainedReader = this.readers;
        if (chainedReader == null) {
            return null;
        }
        return chainedReader.poll();
    }

    public void stop() {
        log.info("Stopping MySQL connector task");
        if (this.readers != null) {
            this.readers.stop();
            this.readers.destroy();
        }
    }

    private void completeReaders() {
        try {
            if (this.taskContext != null) {
                this.taskContext.shutdown();
            }
        } catch (Throwable th) {
            log.error("Unexpected error shutting down the database history and/or closing JDBC connections", th);
        }
    }

    private boolean isBinlogRowImageFull() {
        AtomicReference atomicReference = new AtomicReference(CommonConstants.EMPTY);
        try {
            this.connectionContext.jdbc().query("SHOW GLOBAL VARIABLES LIKE 'binlog_row_image'", resultSet -> {
                if (resultSet.next()) {
                    atomicReference.set(resultSet.getString(2));
                } else {
                    atomicReference.set("FULL");
                }
            });
            log.debug("binlog_row_image={}", atomicReference.get());
            return "FULL".equalsIgnoreCase((String) atomicReference.get());
        } catch (SQLException e) {
            throw new ConnectException("Unexpected error while connecting to MySQL and looking at BINLOG_ROW_IMAGE mode: ", e);
        }
    }

    private boolean isBinlogFormatRow() {
        AtomicReference atomicReference = new AtomicReference(CommonConstants.EMPTY);
        try {
            this.connectionContext.jdbc().query("SHOW GLOBAL VARIABLES LIKE 'binlog_format'", resultSet -> {
                if (resultSet.next()) {
                    atomicReference.set(resultSet.getString(2));
                }
            });
            log.debug("binlog_format={}", atomicReference.get());
            return "ROW".equalsIgnoreCase((String) atomicReference.get());
        } catch (SQLException e) {
            throw new ConnectException("Unexpected error while connecting to MySQL and looking at BINLOG_FORMAT mode: ", e);
        }
    }

    private static MySqlTaskContext createAndStartTaskContext(Configuration configuration, Filters filters) {
        MySqlTaskContext mySqlTaskContext = new MySqlTaskContext(configuration, filters);
        mySqlTaskContext.start();
        return mySqlTaskContext;
    }

    private static Filters getAllFilters(Configuration configuration) {
        return new Filters.Builder(configuration).build();
    }
}
