package com.huawei.cdc.connect.pgsql.wrapper;

import io.debezium.connector.postgresql.PostgresChangeRecordEmitter;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.util.Clock;
import java.sql.SQLException;
import java.util.Optional;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/cdc/connect/pgsql/wrapper/PgsqlChangeRecordEmitter.class */
public class PgsqlChangeRecordEmitter extends PostgresChangeRecordEmitter {
    static final Logger logger = LoggerFactory.getLogger(PgsqlChangeRecordEmitter.class);
    private static PgsqlSchema schema;
    private static PgsqlConnectorConfig connectorConfig;
    private static PostgresConnection connection;

    public PgsqlChangeRecordEmitter(OffsetContext offsetContext, Clock clock, PgsqlConnectorConfig pgsqlConnectorConfig, PgsqlSchema pgsqlSchema, PostgresConnection postgresConnection, ReplicationMessage replicationMessage) {
        super(offsetContext, clock, pgsqlConnectorConfig, pgsqlSchema, postgresConnection, replicationMessage);
        schema = pgsqlSchema;
        connectorConfig = pgsqlConnectorConfig;
        connection = postgresConnection;
    }

    public static Optional<DataCollectionSchema> updateSchema(TableId tableId, ChangeRecordEmitter changeRecordEmitter) {
        return ((PgsqlChangeRecordEmitter) changeRecordEmitter).newTable(tableId);
    }

    private Optional<DataCollectionSchema> newTable(TableId tableId) {
        logger.debug("Schema for table '{}' is missing", tableId);
        refreshTableFromDatabase(tableId);
        TableSchema schemaFor = schema.schemaFor(tableId);
        if (schemaFor == null) {
            logger.warn("cannot load schema for table '{}'", tableId);
            return Optional.empty();
        }
        logger.debug("refreshed DB schema to include table '{}'", tableId);
        return Optional.of(schemaFor);
    }

    private void refreshTableFromDatabase(TableId tableId) {
        try {
            schema.refresh(connection, tableId, connectorConfig.skipRefreshSchemaOnMissingToastableData());
        } catch (SQLException e) {
            throw new ConnectException("Database error while refresing table schema");
        }
    }
}
