package com.huawei.cdc.connect.mysql.processor;

import com.huawei.cdc.common.conf.CommonConfiguration;
import com.huawei.cdc.connect.mysql.config.ConnectorConfig;
import com.huawei.cdc.connect.mysql.util.CDCMysqlProperties;
import com.huawei.cdc.connect.mysql.util.CommonConstants;
import com.huawei.cdc.connect.mysql.util.DebeziumRecordConstants;
import com.huawei.cdc.parser.java.mysql.MySQLDDLParser;
import com.huawei.cdc.parser.operations.ddl.DDLOperation;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/cdc/connect/mysql/processor/SchemaProcessor.class */
public class SchemaProcessor extends Thread {
    public static final Logger log = LoggerFactory.getLogger(SchemaProcessor.class);
    private final LinkedBlockingQueue<String> queryQueue;
    private final LinkedBlockingQueue<SourceRecord> ddlMessageQueue;
    private final MySQLDDLParser ddlParser;
    private final Struct transaction = prepareTransactionStruct();
    private final String schema;
    private final AtomicInteger stop;
    private final CountDownLatch completed;
    private final String connectorName;

    public SchemaProcessor(MySQLDDLParser mySQLDDLParser, LinkedBlockingQueue<String> linkedBlockingQueue, LinkedBlockingQueue<SourceRecord> linkedBlockingQueue2, ConnectorConfig connectorConfig, AtomicInteger atomicInteger, CountDownLatch countDownLatch, String str) {
        this.ddlParser = mySQLDDLParser;
        this.queryQueue = linkedBlockingQueue;
        this.ddlMessageQueue = linkedBlockingQueue2;
        this.stop = atomicInteger;
        this.schema = connectorConfig.getSchemaName();
        this.completed = countDownLatch;
        this.connectorName = str;
    }

    private Struct prepareTransactionStruct() {
        Schema build = SchemaBuilder.struct().field(CDCMysqlProperties.CONNECTOR_NAME, Schema.OPTIONAL_STRING_SCHEMA).field("value", Schema.OPTIONAL_STRING_SCHEMA).build();
        return new Struct(SchemaBuilder.struct().optional().name("transaction").field("properties", SchemaBuilder.array(build).optional().build()).build()).put("properties", Arrays.asList(new Struct(build).put(CDCMysqlProperties.CONNECTOR_NAME, "file").put("value", CommonConstants.EMPTY), new Struct(build).put(CDCMysqlProperties.CONNECTOR_NAME, "pos").put("value", CommonConstants.EMPTY), new Struct(build).put(CDCMysqlProperties.CONNECTOR_NAME, "gtid").put("value", CommonConstants.EMPTY)));
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (true) {
            try {
                String poll = this.queryQueue.poll();
                if (poll == null && this.stop.get() == 0) {
                    break;
                } else if (StringUtils.isNotBlank(poll)) {
                    this.ddlMessageQueue.put(processOperation((DDLOperation) this.ddlParser.parseStatement(poll.replace(CommonConstants.BACKTICK, CommonConstants.EMPTY), this.schema), poll));
                }
            } catch (Exception e) {
                log.error("Error caught during the processing of auto schema creation", e);
            }
        }
        this.completed.countDown();
    }

    private SourceRecord processOperation(DDLOperation dDLOperation, String str) {
        Struct struct = dDLOperation.toStruct();
        String name = dDLOperation.getName();
        Struct put = new Struct(SchemaBuilder.struct().name(this.schema + CommonConstants.DOT + name).field("DATA_STORE", Schema.STRING_SCHEMA).field("SEG_OWNER", Schema.OPTIONAL_STRING_SCHEMA).field("OBJECT_NAME", Schema.OPTIONAL_STRING_SCHEMA).field("SQL_REDO", Schema.OPTIONAL_STRING_SCHEMA).field("TIMESTAMP", Timestamp.SCHEMA).field("OPERATION", Schema.OPTIONAL_STRING_SCHEMA).field("transaction", this.transaction.schema()).field("data", struct.schema()).build()).put("DATA_STORE", CommonConstants.MYSQL_DATA_STORE).put("SEG_OWNER", this.schema).put("OBJECT_NAME", name).put("SQL_REDO", processSql(str)).put("TIMESTAMP", new java.sql.Timestamp(System.currentTimeMillis())).put("OPERATION", dDLOperation.getOperation()).put("transaction", this.transaction).put("data", struct);
        return new SourceRecord(Collections.singletonMap(CommonConstants.MYSQL_DATA_STORE, DebeziumRecordConstants.DBZ_DDL), Collections.singletonMap("source-offset", CommonConstants.ZERO), getDDLTopicName(), put.schema(), put);
    }

    private String getDDLTopicName() {
        return CommonConfiguration.CDL_SCHEMA_DDL + "_" + this.connectorName;
    }

    private String processSql(String str) {
        return str.replace(CommonConstants.BACKTICK, CommonConstants.EMPTY).replaceAll("//.*|(\"(?:\\\\[^\"]|\\\\\"|.)*?\")|(?s)/\\*.*?\\*/", CommonConstants.SPACE).trim();
    }
}
