package com.huawei.cdc.connect.pgsql;

import com.huawei.cdc.common.Scheduler;
import com.huawei.cdc.common.SourceTaskInfoCache;
import com.huawei.cdc.common.conf.CommonConfiguration;
import com.huawei.cdc.common.metadata.client.ConnectorClient;
import com.huawei.cdc.common.util.ConnectorStatusUtil;
import com.huawei.cdc.connect.pgsql.config.ConnectorConfig;
import com.huawei.cdc.connect.pgsql.processor.PgsqlSchemaConst;
import com.huawei.cdc.connect.pgsql.processor.PgsqlTransactionInfo;
import com.huawei.cdc.connect.pgsql.processor.RecordFormatProcessor;
import com.huawei.cdc.connect.pgsql.processor.TaskProcessor;
import com.huawei.cdc.connect.pgsql.processor.TaskProcessorFactory;
import com.huawei.cdc.connect.pgsql.wrapper.PgsqlChangeRecordEmitter;
import com.huawei.cdc.connect.pgsql.wrapper.PgsqlConnectorConfig;
import com.huawei.cdc.connect.pgsql.wrapper.PgsqlSchema;
import com.huawei.cdc.connect.pgsql.wrapper.PgsqlTaskContext;
import com.huawei.cdc.connect.pgsql.wrapper.core.pipeline.source.spi.PgsqlEventMetadataProvider;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.postgresql.PostgresChangeEventSourceCoordinator;
import io.debezium.connector.postgresql.PostgresChangeEventSourceFactory;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresConnectorTask;
import io.debezium.connector.postgresql.PostgresErrorHandler;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresTaskContext;
import io.debezium.connector.postgresql.PostgresTopicSelector;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock;
import io.debezium.util.SchemaNameAdjuster;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/cdc/connect/pgsql/PgsqlSourceTask.class */
public class PgsqlSourceTask extends SourceTask {
    static final Logger log = LoggerFactory.getLogger(PgsqlSourceTask.class);
    private ConnectorConfig config;
    private String tables;
    private static final String CONTEXT_NAME = "postgres-connector-task";
    private volatile PostgresTaskContext taskContext;
    private volatile ChangeEventQueue<DataChangeEvent> queue;
    private volatile PostgresConnection jdbcConnection;
    private volatile PostgresConnection heartbeatConnection;
    private volatile PostgresChangeEventSourceCoordinator coordinator;
    private volatile PostgresSchema schema;
    private TaskProcessor taskProcessor;
    RecordFormatProcessor processor;
    private int totalRecords;
    private PgsqlTransactionInfo processingTransaction;
    private PgsqlTransactionInfo successfulTransaction;
    private Scheduler scheduler;

    public String version() {
        return "1.1.0";
    }

    public void start(Map<String, String> map) {
        this.config = new ConnectorConfig(map);
        this.tables = map.get(PgsqlSchemaConst.TABLES);
        this.totalRecords = 0;
        log.info("Begin to start Postgres source task.");
        this.scheduler = new Scheduler(PgsqlSourceTask.class, Duration.ofMillis(Integer.parseInt(CommonConfiguration.CDL_SCHEDULER_TIMEOUT_MS)));
        try {
            this.scheduler.execute(this::startPgsqlSourceTask, "Starting PG Source task");
        } catch (Exception e) {
            log.error("Error occur when start Postgres source connector.");
            throw new RuntimeException(e.getCause());
        }
    }

    private void startPgsqlSourceTask() {
        String str;
        log.info("Init Postgres source task.");
        initProcessor();
        PgsqlConnectorConfig pgsqlConnectorConfig = new PgsqlConnectorConfig(getPostgresConnectorConfig(this.config, this.tables));
        PostgresConnectorTask postgresConnectorTask = new PostgresConnectorTask();
        TopicSelector<TableId> create = PostgresTopicSelector.create(pgsqlConnectorConfig);
        setTables(this.tables);
        Snapshotter snapshotter = pgsqlConnectorConfig.getSnapshotter();
        if (snapshotter == null) {
            this.taskProcessor.postError("HIGH", "TASK FAILED", new Exception("Unable to load snapshotter, if using custom snapshot mode, double check your settings"), this.config.getJobExecutionId());
            log.error("Unable to load snapshotter, if using custom snapshot mode, double check your settings");
            throw new ConnectException("Unable to load snapshotter, if using custom snapshot mode, double check your settings");
        }
        SchemaNameAdjuster create2 = SchemaNameAdjuster.create(log);
        createJdbcConnection(pgsqlConnectorConfig);
        this.heartbeatConnection = new PostgresConnection(pgsqlConnectorConfig.jdbcConfig());
        this.schema = new PgsqlSchema(pgsqlConnectorConfig, this.jdbcConnection.getTypeRegistry(), this.jdbcConnection.getDatabaseCharset(), create);
        this.taskContext = new PgsqlTaskContext(pgsqlConnectorConfig, this.schema, create);
        SlotState slotInfo = getSlotInfo(pgsqlConnectorConfig);
        PostgresOffsetContext postgresOffsetContext = (PostgresOffsetContext) getPreviousOffset(this.config, this.jdbcConnection, slotInfo, new PostgresOffsetContext.Loader(pgsqlConnectorConfig));
        Clock system = Clock.system();
        initSnapshotter(snapshotter, postgresOffsetContext, pgsqlConnectorConfig, slotInfo);
        ReplicationConnection replicationConnection = null;
        SlotCreationResult slotCreationResult = null;
        if (snapshotter.shouldStream()) {
            replicationConnection = postgresConnectorTask.createReplicationConnection(this.taskContext, snapshotter.exportSnapshot(), snapshotter.shouldSnapshot(), 6, Duration.ofSeconds(10L));
            if (slotInfo != null) {
                slotCreationResult = null;
            } else if (replicationConnection != null) {
                try {
                    slotCreationResult = (SlotCreationResult) replicationConnection.createReplicationSlot().orElse(null);
                } catch (SQLException e) {
                    str = "Creation of replication slot failed";
                    str = e.getMessage().contains("already exists") ? str + "; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each." : "Creation of replication slot failed";
                    this.taskProcessor.postError("HIGH", "TASK FAILED", e, this.config.getJobExecutionId());
                    log.error(str, e);
                    throw new ConnectException(str, e);
                }
            }
        }
        try {
            this.jdbcConnection.commit();
            this.queue = getQueue(pgsqlConnectorConfig);
            PostgresErrorHandler postgresErrorHandler = new PostgresErrorHandler(pgsqlConnectorConfig.getLogicalName(), this.queue);
            PgsqlEventMetadataProvider pgsqlEventMetadataProvider = new PgsqlEventMetadataProvider();
            EventDispatcher<TableId> dispatcher = getDispatcher(pgsqlConnectorConfig, create, pgsqlEventMetadataProvider, Heartbeat.create(Duration.ofMillis(0L), create.getHeartbeatTopic(), pgsqlConnectorConfig.getLogicalName()), create2);
            this.coordinator = new PostgresChangeEventSourceCoordinator(postgresOffsetContext, postgresErrorHandler, PostgresConnector.class, pgsqlConnectorConfig, new PostgresChangeEventSourceFactory(pgsqlConnectorConfig, snapshotter, this.jdbcConnection, postgresErrorHandler, dispatcher, system, this.schema, this.taskContext, replicationConnection, slotCreationResult, slotInfo), new DefaultChangeEventSourceMetricsFactory(), dispatcher, this.schema, snapshotter, slotInfo);
            this.coordinator.start(this.taskContext, this.queue, pgsqlEventMetadataProvider);
            log.info("Success in starting Postgres Connector.");
        } catch (SQLException e2) {
            this.taskProcessor.postError("HIGH", "TASK FAILED", e2, this.config.getJobExecutionId());
            log.error("Error while commit in jdbcConnection {}", String.valueOf(e2));
            throw new ConnectException(e2);
        }
    }

    private void initProcessor() {
        this.taskProcessor = TaskProcessorFactory.getInstance().getTaskProcessor(this.config);
        initTaskProcessor();
        this.processingTransaction = new PgsqlTransactionInfo();
        this.successfulTransaction = new PgsqlTransactionInfo();
        this.processor = new RecordFormatProcessor(this.config);
    }

    private void setTables(String str) {
        SourceTaskInfoCache.putTable(this.config.getConnectorName(), this.config.getIntTaskId(), str);
    }

    private void initTaskProcessor() {
        try {
            this.taskProcessor.init(this.config, "TASK");
        } catch (Exception e) {
            log.error("Error initializing taskProcessor: {}", e.getMessage());
            this.taskProcessor.postError("HIGH", "TASK FAILED", e, this.config.getJobExecutionId());
            throw new ConnectException(e);
        }
    }

    private EventDispatcher<TableId> getDispatcher(PgsqlConnectorConfig pgsqlConnectorConfig, TopicSelector<TableId> topicSelector, PgsqlEventMetadataProvider pgsqlEventMetadataProvider, Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster) {
        return new EventDispatcher<>(pgsqlConnectorConfig, topicSelector, this.schema, this.queue, pgsqlConnectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, PgsqlChangeRecordEmitter::updateSchema, pgsqlEventMetadataProvider, heartbeat, schemaNameAdjuster);
    }

    private ChangeEventQueue<DataChangeEvent> getQueue(PgsqlConnectorConfig pgsqlConnectorConfig) {
        return new ChangeEventQueue.Builder().pollInterval(pgsqlConnectorConfig.getPollInterval()).maxBatchSize(pgsqlConnectorConfig.getMaxBatchSize()).maxQueueSize(pgsqlConnectorConfig.getMaxQueueSize()).loggingContextSupplier(() -> {
            return this.taskContext.configureLoggingContext(CONTEXT_NAME);
        }).build();
    }

    private void initSnapshotter(Snapshotter snapshotter, PostgresOffsetContext postgresOffsetContext, PgsqlConnectorConfig pgsqlConnectorConfig, SlotState slotState) {
        if (postgresOffsetContext == null) {
            log.info("No previous offset found");
            snapshotter.init(pgsqlConnectorConfig, (OffsetState) null, slotState);
        } else {
            log.info("Found previous offset {}", postgresOffsetContext);
            snapshotter.init(pgsqlConnectorConfig, postgresOffsetContext.asOffsetState(), slotState);
        }
    }

    private SlotState getSlotInfo(PgsqlConnectorConfig pgsqlConnectorConfig) {
        SlotState slotState = null;
        try {
            if (log.isInfoEnabled()) {
                log.info(this.jdbcConnection.serverInfo().toString());
            }
            slotState = this.jdbcConnection.getReplicationSlotState(pgsqlConnectorConfig.getSlotName(), pgsqlConnectorConfig.plugin().getPostgresPluginName());
        } catch (SQLException e) {
            log.warn("unable to load info of replication slot, trying to create the slot");
        }
        if (this.config.isDropSlot() && slotState != null && slotState.slotIsActive()) {
            this.jdbcConnection.dropReplicationSlot(this.config.getSlotName());
            slotState = null;
        }
        return slotState;
    }

    private void createJdbcConnection(PgsqlConnectorConfig pgsqlConnectorConfig) {
        this.jdbcConnection = new PostgresConnection(pgsqlConnectorConfig.jdbcConfig(), true);
        try {
            this.jdbcConnection.setAutoCommit(false);
        } catch (SQLException e) {
            handleSensitiveInfo(e);
        }
    }

    private void handleSensitiveInfo(SQLException sQLException) {
        log.error(String.valueOf(sQLException));
        this.taskProcessor.postError("HIGH", "TASK FAILED", sQLException, this.config.getJobExecutionId());
        throw new ConnectException("Error creating debezium PostgresConnection " + sQLException.getMessage());
    }

    private Configuration getPostgresConnectorConfig(ConnectorConfig connectorConfig, String str) {
        HashSet hashSet = new HashSet();
        for (String str2 : str.split(",")) {
            hashSet.add(connectorConfig.getSchema() + PgsqlSchemaConst.DOT + str2.toLowerCase(Locale.ENGLISH));
        }
        return Configuration.empty().withSystemProperties(Function.identity()).edit().with("connector.class", "com.huawei.cdc.connect.pgsql.PgsqlSourceConnector").with(PostgresConnectorConfig.HOSTNAME, connectorConfig.getDbHostName()).with(PostgresConnectorConfig.PORT, connectorConfig.getDbPort()).with(PostgresConnectorConfig.USER, connectorConfig.getDbUser()).with(PostgresConnectorConfig.PASSWORD, connectorConfig.getDbUserPad()).with(PostgresConnectorConfig.SERVER_NAME, connectorConfig.getDbNameAlias()).with(RelationalDatabaseConnectorConfig.TABLE_INCLUDE_LIST, StringUtils.join(hashSet, ",")).with(PostgresConnectorConfig.DATABASE_NAME, connectorConfig.getDbNameAlias()).with(PostgresConnectorConfig.PLUGIN_NAME, connectorConfig.getPluginName()).with(PostgresConnectorConfig.SLOT_NAME, connectorConfig.getSlotName()).with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, connectorConfig.getPublicationAutocreateMode()).with(ConnectorConfig.PUBLICATION_NAME, connectorConfig.getPublicationName()).with(PostgresConnectorConfig.SNAPSHOT_MODE, connectorConfig.getSnapshotMode()).with(RelationalDatabaseConnectorConfig.SCHEMA_INCLUDE_LIST, connectorConfig.getSchema()).with(CommonConnectorConfig.BINARY_HANDLING_MODE, PgsqlSchemaConst.DEFAULT_BINARY_HANDLING).with(RelationalDatabaseConnectorConfig.DECIMAL_HANDLING_MODE, PgsqlSchemaConst.DEFAULT_DECIMAL_HANDLING).with(RelationalDatabaseConnectorConfig.TIME_PRECISION_MODE, PgsqlSchemaConst.DEFAULT_TIME_HANDLING).build();
    }

    public List<SourceRecord> poll() throws InterruptedException {
        log.debug("polling records...");
        ArrayList arrayList = new ArrayList();
        if (this.taskProcessor.getTopicTableMapping() == null && !ConnectorStatusUtil.isConnectorStop(this.config.getConnectorName())) {
            log.info("Resume the task: Initializing task processor");
            initProcessor();
        }
        while (!this.taskProcessor.getDDLRecordQueue().isEmpty()) {
            SourceRecord poll = this.taskProcessor.getDDLRecordQueue().poll();
            if (poll != null) {
                arrayList.add(poll);
            }
        }
        List poll2 = this.queue.poll();
        if (poll2.isEmpty()) {
            if (!arrayList.isEmpty()) {
                this.totalRecords += arrayList.size();
                log.info("Task: {} Completed DDL schema records: {}", this.config.getIntTaskId(), Integer.valueOf(this.totalRecords));
            }
            return arrayList;
        }
        List<SourceRecord> list = (List) poll2.stream().map((v0) -> {
            return v0.getRecord();
        }).collect(Collectors.toList());
        recordProcessingTransaction(list);
        SourceTaskInfoCache.putIdentifier(this.config.getConnectorName(), this.config.getIntTaskId(), this.processingTransaction.getTransactionInfo(), this.processingTransaction.getTimestamp());
        List<SourceRecord> processSourceRecords = this.processor.processSourceRecords(this.jdbcConnection, list);
        if (processSourceRecords.size() > 0) {
            recordSuccessfulTransaction(processSourceRecords);
            SourceTaskInfoCache.putSuccessfulIdentifier(this.config.getConnectorName(), this.config.getIntTaskId(), this.successfulTransaction.getTransactionInfo(), this.successfulTransaction.getTimestamp());
            processRecordsBasedOnStartIdentifier(processSourceRecords, arrayList);
            this.totalRecords += arrayList.size();
            log.debug("Task: {} Completed Source records: {}", this.config.getIntTaskId(), Integer.valueOf(this.totalRecords));
        }
        return arrayList;
    }

    private void processRecordsBasedOnStartIdentifier(List<SourceRecord> list, List<SourceRecord> list2) {
        if (this.config.getStartTime().longValue() != 0) {
            processBasedOnStartTime(list, list2);
        } else {
            list2.addAll(list);
        }
    }

    private void processBasedOnStartTime(List<SourceRecord> list, List<SourceRecord> list2) {
        long longValue = this.config.getStartTime().longValue();
        for (SourceRecord sourceRecord : list) {
            if (((Timestamp) ((Struct) sourceRecord.value()).get("TIMESTAMP")).getTime() >= longValue) {
                list2.add(sourceRecord);
            }
        }
    }

    private void recordProcessingTransaction(List<SourceRecord> list) {
        Struct struct;
        if (list == null || (struct = (Struct) list.get(0).value()) == null) {
            return;
        }
        Struct struct2 = struct.getStruct(PgsqlSchemaConst.SOURCE);
        this.processingTransaction.setTransactionInfo(String.valueOf(struct2.getInt64(PgsqlSchemaConst.LSN)), String.valueOf(struct2.getInt64(PgsqlSchemaConst.TXID)));
        if (struct2.getInt64(PgsqlSchemaConst.TS_MS).longValue() != 0) {
            this.processingTransaction.setTimestamp(struct2.getInt64(PgsqlSchemaConst.TS_MS).longValue());
        }
    }

    private void recordSuccessfulTransaction(List<SourceRecord> list) {
        if (list != null) {
            Struct struct = (Struct) list.get(list.size() - 1).value();
            String str = "";
            String str2 = "";
            for (Struct struct2 : ((Struct) struct.get("transaction")).getArray("properties")) {
                String string = struct2.getString("name");
                if (PgsqlSchemaConst.LSN.equals(string)) {
                    str = String.valueOf(struct2.getInt64("value"));
                } else if (PgsqlSchemaConst.TXID.equals(string)) {
                    str2 = String.valueOf(struct2.getInt64("value"));
                }
            }
            this.successfulTransaction.setTransactionInfo(str, str2);
            if (struct.get("TIMESTAMP") != null) {
                this.successfulTransaction.setTimestamp((Timestamp) struct.get("TIMESTAMP"));
            }
        }
    }

    public void stop() {
        log.info("Begin to stop PG source task.");
        try {
            this.scheduler.execute(this::stopPgsqlSourceTask, "Stopping PG Source task ");
            if (this.scheduler != null) {
                try {
                    this.scheduler.close();
                } catch (Throwable th) {
                    log.warn("Failed to close {}", this.scheduler.getClass().getSimpleName());
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e.getCause());
        }
    }

    private void stopPgsqlSourceTask() {
        try {
            if (this.coordinator != null) {
                this.coordinator.stop();
            }
            ConnectorClient connectorClient = new ConnectorClient();
            if (this.processingTransaction != null && this.successfulTransaction != null && this.config != null) {
                connectorClient.updateFailedIdentifier(this.processingTransaction.getTransactionInfo(), this.successfulTransaction.getTransactionInfo(), this.config.getConnectorName(), this.config.getIntTaskId(), PgsqlSchemaConst.POSTGRESQL);
                connectorClient.shutdown();
            }
            if (this.taskProcessor != null) {
                this.taskProcessor.detach(this.config.getIntTaskId());
            }
            this.taskProcessor = null;
            if (this.heartbeatConnection != null) {
                this.heartbeatConnection.close();
            }
            if (this.schema != null) {
                this.schema.close();
            }
            if (this.jdbcConnection != null) {
                try {
                    this.jdbcConnection.setAutoCommit(true);
                } catch (SQLException e) {
                    log.warn("Jdbc Connection: Error while setting auto commit to true.");
                }
                if (this.config.isDropSlot()) {
                    this.jdbcConnection.dropReplicationSlot(this.config.getSlotName());
                    if (!this.config.isUseExistPublication()) {
                        this.jdbcConnection.dropPublication(this.config.getPublicationName());
                    }
                }
                this.jdbcConnection.close();
            }
            log.info("Success in stopping PG source task.");
        } catch (InterruptedException e2) {
            Thread.interrupted();
            log.error("Interrupted while stopping coordinator", e2);
            this.taskProcessor.postError("HIGH", "TASK FAILED", e2, this.config.getJobExecutionId());
            throw new ConnectException("Interrupted while stopping the coordinator , failing the task");
        }
    }

    protected OffsetContext getPreviousOffset(ConnectorConfig connectorConfig, PostgresConnection postgresConnection, SlotState slotState, OffsetContext.Loader loader) {
        Map partition = loader.getPartition();
        Map map = null;
        if (this.config.getStartTime().longValue() == 0) {
            if (this.config.getLSN() != 0 && this.config.getTXID() != 0) {
                map = new HashMap();
                map.put("transaction_id", null);
                map.put("lsn_proc", Long.valueOf(this.config.getLSN()));
                map.put("lsn_commit", Long.valueOf(this.config.getLSN()));
                map.put(PgsqlSchemaConst.LSN, Long.valueOf(this.config.getLSN()));
                map.put(PgsqlSchemaConst.TXID, Long.valueOf(this.config.getTXID()));
                map.put("ts_usec", Long.valueOf(Clock.system().currentTimeInMillis()));
            } else if (slotState == null || this.config.isStartFromBeginning()) {
                map = (Map) this.context.offsetStorageReader().offsets(Collections.singleton(partition)).get(partition);
            } else {
                map = new HashMap();
                map.put("transaction_id", null);
                try {
                    long currentXLogLocation = postgresConnection.currentXLogLocation();
                    map.put("lsn_proc", Long.valueOf(currentXLogLocation));
                    map.put("lsn_commit", Long.valueOf(currentXLogLocation));
                    map.put(PgsqlSchemaConst.LSN, Long.valueOf(currentXLogLocation));
                    map.put(PgsqlSchemaConst.TXID, postgresConnection.currentTransactionId());
                    map.put("ts_usec", Long.valueOf(Clock.system().currentTimeInMillis()));
                } catch (SQLException e) {
                    log.error("Failed to get current lsn. Database processing error.", e.getMessage());
                    return null;
                }
            }
        }
        if (map == null) {
            return null;
        }
        OffsetContext load = loader.load(map);
        log.info("Found previous offset {}", load);
        return load;
    }
}
