package com.huawei.cdc.connect.pgsql;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import com.huawei.cdc.common.Scheduler;
import com.huawei.cdc.common.conf.CommonConfiguration;
import com.huawei.cdc.common.util.ConnectorStatusUtil;
import com.huawei.cdc.connect.pgsql.config.ConnectorConfig;
import com.huawei.cdc.connect.pgsql.config.PublicationMode;
import com.huawei.cdc.connect.pgsql.processor.PgsqlSchemaConst;
import com.huawei.cdc.connect.pgsql.processor.TaskProcessor;
import com.huawei.cdc.connect.pgsql.processor.TaskProcessorFactory;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.util.ConnectorUtils;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.util.HostSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/cdc/connect/pgsql/PgsqlSourceConnector.class */
public class PgsqlSourceConnector extends SourceConnector {
    private static final Logger log = LoggerFactory.getLogger(PgsqlSourceConnector.class);
    private ConnectorConfig config;
    private TaskProcessor taskProcessor;
    private String ddlTopic;
    private Scheduler scheduler;

    public String version() {
        return "1.1.0";
    }

    public void start(Map<String, String> map) {
        this.config = new ConnectorConfig(map);
        this.ddlTopic = CommonConfiguration.CDL_SCHEMA_DDL + "_" + this.config.getConnectorName();
        this.scheduler = new Scheduler(PgsqlSourceConnector.class, Duration.ofMillis(Integer.parseInt(CommonConfiguration.CDL_SCHEDULER_TIMEOUT_MS)));
        log.info("Begin to start PG source Connector.");
        try {
            this.scheduler.execute(this::startPgSourceConnector, "Starting PG Source Connector");
        } catch (Exception e) {
            log.error("Error occur when start pgsql source connector.");
            throw new RuntimeException(e.getCause());
        }
    }

    private void startPgSourceConnector() {
        try {
            this.taskProcessor = TaskProcessorFactory.getInstance().getTaskProcessor(this.config);
            log.info("Initial Postgres source Connector.");
            this.taskProcessor.init(this.config, "CONNECTOR");
            updateConnectorStatus("cdlstart");
            log.info("Success in starting Postgres source Connector.");
        } catch (Exception e) {
            log.error("Error during init processor in Postgres Source Connector.");
            if (this.taskProcessor != null) {
                this.taskProcessor.postError("HIGH", "CONNECTOR FAILED", e, this.config.getJobExecutionId());
                this.taskProcessor.deleteCdlCache();
                try {
                    this.taskProcessor.deleteDdlTopic(this.ddlTopic);
                } catch (Exception e2) {
                    log.error("Unable to delete Kafka topic in {}", this.ddlTopic, e2);
                }
                this.taskProcessor.reset();
            }
            this.taskProcessor = null;
            updateConnectorStatus("cdlstop");
            throw new ConnectException(e);
        }
    }

    private void updateConnectorStatus(String str) {
        String str2 = "/api/v1/cdl/connectors/" + this.config.getConnectorName() + "/" + str;
        String[] split = CommonConfiguration.CDL_CONNECT_SERVERS.split(",");
        CompletableFuture.runAsync(() -> {
            for (String str3 : split) {
                ClientBuilder.newClient().target("http://" + str3).path(str2).request().post(Entity.json("")).close();
            }
        });
        log.debug("Connector Status Map = {}", ConnectorStatusUtil.getConnectorStopStatusMap());
    }

    public Config validate(Map<String, String> map) {
        ConnectorConfig connectorConfig = new ConnectorConfig(map);
        String topicTableMapping = connectorConfig.getTopicTableMapping();
        if (!StringUtils.isBlank(topicTableMapping)) {
            try {
                JsonElement parseString = JsonParser.parseString(topicTableMapping);
                if (parseString != null) {
                    JsonArray asJsonArray = parseString.getAsJsonArray();
                    if (asJsonArray.size() != 0) {
                        for (int i = 0; i < asJsonArray.size(); i++) {
                            if (asJsonArray.get(i).getAsJsonObject().has("noOfPartitions") && !connectorConfig.isMultiplePartitionEnabled()) {
                                log.error("Topic table Mapping has \"noOfPartitions\" which requires parameter \"multiple.topic.partitions.enable\" to be true");
                                throw new ConfigException("Topic table Mapping has \"noOfPartitions\" which requires parameter \"multiple.topic.partitions.enable\" to be true");
                            }
                        }
                    }
                }
            } catch (JsonSyntaxException e) {
                log.error("Error in parsing Topic Configuration: {}. Discarding topic table mapping", topicTableMapping, e);
                throw new ConfigException("Error in parsing Topic Configuration. Discarding topic table mapping");
            }
        }
        String tableAltPk = connectorConfig.getTableAltPk();
        if (!StringUtils.isBlank(tableAltPk)) {
            try {
                JsonParser.parseString(tableAltPk);
            } catch (JsonSyntaxException e2) {
                log.error("Error in parsing Alternate Primary Key Annotation: {}. " + tableAltPk);
                throw new ConfigException("Error in parsing Alternate Primary Key Annotation: {}. " + tableAltPk);
            }
        }
        if (StringUtils.isNotBlank(connectorConfig.getLobDirectUpload())) {
            if (connectorConfig.getLobDirectUpload().equalsIgnoreCase(PgsqlSchemaConst.LOB_UPLOAD_OBS)) {
                validateOBSParameters(connectorConfig);
            } else if (connectorConfig.getLobDirectUpload().equalsIgnoreCase(PgsqlSchemaConst.LOB_UPLOAD_HDFS)) {
                validateHDFSParameters(connectorConfig);
            }
        }
        HostSpec hostSpec = new HostSpec(connectorConfig.getDbHostName(), connectorConfig.getDbPort());
        Properties properties = new Properties();
        properties.put("password", connectorConfig.getDbUserPad());
        try {
            PgConnection pgConnection = new PgConnection(new HostSpec[]{hostSpec}, connectorConfig.getDbUser(), connectorConfig.getDbNameAlias(), properties, "");
            log.info("Successfully tested Connection to postgresQL DB with user {}", pgConnection.getUserName());
            pgConnection.close();
        } catch (SQLException e3) {
            handleValidationExcep(e3);
        }
        return super.validate(map);
    }

    private void validateOBSParameters(ConnectorConfig connectorConfig) {
        if (StringUtils.isBlank(connectorConfig.getObsBucket())) {
            log.error("Missing mandatory parameter \"obs.bucket\" ");
            throw new ConfigException("Missing mandatory parameter \"obs.bucket\" ");
        }
        if (StringUtils.isBlank(connectorConfig.getObsEndpoint())) {
            log.error("Missing mandatory parameter \"obs.endpoint\" ");
            throw new ConfigException("Missing mandatory parameter \"obs.endpoint\" ");
        }
        if (StringUtils.isBlank(connectorConfig.getObsSecretKey())) {
            log.error("Missing mandatory parameter \"obs.secret.key\" ");
            throw new ConfigException("Missing mandatory parameter \"obs.secret.key\" ");
        }
        if (StringUtils.isBlank(connectorConfig.getObsAuthenticationKey())) {
            log.error("Missing mandatory parameter \"obs.authentication.key\" ");
            throw new ConfigException("Missing mandatory parameter \"obs.authentication.key\" ");
        }
    }

    private void validateHDFSParameters(ConnectorConfig connectorConfig) {
        if (StringUtils.isBlank(connectorConfig.getHdfsAuthType())) {
            log.error("Missing mandatory parameter \"auth.type\" ");
            throw new ConfigException("Missing mandatory parameter \"auth.type\" ");
        }
        if (connectorConfig.getHdfsAuthType().equalsIgnoreCase("kerberos")) {
            if (StringUtils.isBlank(connectorConfig.getHdfsAuthPrincipal())) {
                log.error("Missing mandatory parameter \"auth.principal\" ");
                throw new ConfigException("Missing mandatory parameter \"auth.principal\" ");
            }
            if (StringUtils.isBlank(connectorConfig.getHdfsAuthKeytabfile())) {
                log.error("Missing mandatory parameter \"auth.keytabFile\" ");
                throw new ConfigException("Missing mandatory parameter \"auth.keytabFile\" ");
            }
        }
    }

    private void handleValidationExcep(SQLException sQLException) {
        log.error("Error while testing connection to PostgreSQL DB", sQLException);
        throw new ConnectException("Error while validating Postgre connection " + sQLException.getMessage());
    }

    public Class<? extends Task> taskClass() {
        return PgsqlSourceTask.class;
    }

    public List<Map<String, String>> taskConfigs(int i) {
        if (this.taskProcessor == null || this.config == null) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList(this.taskProcessor.getQualifiedTables());
        List groupPartitions = ConnectorUtils.groupPartitions(arrayList, Math.min(i, arrayList.size()));
        ArrayList arrayList2 = new ArrayList();
        for (int i2 = 0; i2 < groupPartitions.size(); i2++) {
            HashMap hashMap = new HashMap(this.config.originalsStrings());
            if (this.config.isUseExistPublication()) {
                hashMap.put(ConnectorConfig.PUBLICATION_AUTOCREATE_MODE, PublicationMode.DISABLED);
            } else {
                hashMap.put(ConnectorConfig.PUBLICATION_NAME, this.config.getPublicationName() + "_" + this.config.getSlotName() + "_task" + i2);
            }
            hashMap.put(ConnectorConfig.SLOT_NAME, this.config.getSlotName() + "_task" + i2);
            hashMap.put(PgsqlSchemaConst.TABLES, String.join(",", (Iterable<? extends CharSequence>) groupPartitions.get(i2)));
            hashMap.put(ConnectorConfig.INT_TASK_ID, String.valueOf(i2));
            log.info("Tables {} assigned to task {}", groupPartitions.get(i2), Integer.valueOf(i2));
            arrayList2.add(hashMap);
        }
        return arrayList2;
    }

    public ConfigDef config() {
        return ConnectorConfig.conf();
    }

    public void stop() {
        log.info("Begin to stop PG connector.");
        if (this.taskProcessor != null) {
            try {
                this.taskProcessor.deleteDdlTopic(this.ddlTopic);
            } catch (Exception e) {
                log.error("Unable to delete Kafka topic in {}", this.ddlTopic, e);
            }
        }
        updateConnectorStatus("cdlstop");
        this.taskProcessor = null;
        this.config = null;
        if (this.scheduler != null) {
            try {
                this.scheduler.close();
            } catch (Throwable th) {
                log.warn("Failed to close {}", this.scheduler.getClass().getSimpleName());
            }
        }
        log.info("Success in stopping Postgres connector.");
    }
}
