package com.huawei.cdc.connect.mysql.processor;

import com.google.common.primitives.Bytes;
import com.huawei.cdc.common.conf.CommonConfiguration;
import com.huawei.cdc.common.lob.ProcessLobData;
import com.huawei.cdc.common.metadata.models.HeartbeatData;
import com.huawei.cdc.common.metadata.util.UniqueIdHelper;
import com.huawei.cdc.common.util.CommonUtil;
import com.huawei.cdc.common.util.StructUtils;
import com.huawei.cdc.connect.mysql.config.ConnectorConfig;
import com.huawei.cdc.connect.mysql.exception.TableDetailsNotFoundException;
import com.huawei.cdc.connect.mysql.util.BinlogPosition;
import com.huawei.cdc.connect.mysql.util.CDCMysqlProperties;
import com.huawei.cdc.connect.mysql.util.CommonConstants;
import com.huawei.cdc.connect.mysql.util.DebeziumRecordConstants;
import com.huawei.cdc.connect.mysql.util.MySQLQueries;
import com.huawei.cdc.connect.mysql.util.MySqlMetricsData;
import com.huawei.cdc.connect.mysql.util.TopicPartitionData;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.io.ParseException;
import com.vividsolutions.jts.io.WKBReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceRecord;
import org.geotools.gml3.GML;
import org.geotools.gml3.GMLConfiguration;
import org.geotools.xml.Encoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/cdc/connect/mysql/processor/MysqlRecordFormatProcessor.class */
public class MysqlRecordFormatProcessor {
    ConnectorConfig config;
    public static final Logger log = LoggerFactory.getLogger(MysqlRecordFormatProcessor.class);
    private final TaskProcessor taskProcessor;
    private boolean heartbeatEnabled;
    private int heartbeatFrequency;
    private final ConcurrentHashMap<String, String> processedTables;
    private final AtomicBoolean metricsChangeFlag;
    private final AtomicLong dataProcessedIncremental;
    private final AtomicInteger tablesProcessedIncremental;
    private final Object metricsLock;
    private final MysqlDDLFormatProcessor ddlProcessor;
    public static final String WKB_TYPE = "wkb";
    public static final String SPATIAL_SCHEMA = "io.debezium.data.geometry";
    public static final String REPLACE_GML_METADATA = "(<\\?xml version=\"1.0\" encoding=\"UTF-8\"\\?>)";
    private final String binlogFileName;
    private final long binlogPosition;
    private boolean insertSupported;
    private boolean updateSupported;
    private boolean deleteSupported;
    private final Set<String> assignedTables;
    private long lastHeartbeat = 0;
    private final Schema txnPropertySchema = SchemaBuilder.struct().field(CDCMysqlProperties.CONNECTOR_NAME, Schema.STRING_SCHEMA).field("value", Schema.OPTIONAL_STRING_SCHEMA).build();
    private final Schema transactionSchema = SchemaBuilder.struct().name("transaction").field("properties", SchemaBuilder.array(this.txnPropertySchema).build()).build();
    private final boolean isLobUpload = isLobUpload();

    public MysqlRecordFormatProcessor(TaskProcessor taskProcessor, MySqlMetricsData mySqlMetricsData, ConnectorConfig connectorConfig, BinlogPosition binlogPosition, Set<String> set) {
        this.taskProcessor = taskProcessor;
        this.config = connectorConfig;
        this.processedTables = mySqlMetricsData.getProcessedTables();
        this.metricsChangeFlag = mySqlMetricsData.getMetricsChangeFlag();
        this.tablesProcessedIncremental = mySqlMetricsData.getTablesProcessedIncremental();
        this.dataProcessedIncremental = mySqlMetricsData.getDataProcessedIncremental();
        this.metricsLock = mySqlMetricsData.getMetricsLock();
        this.binlogFileName = binlogPosition.getFileName();
        this.binlogPosition = binlogPosition.getPosition();
        if ("true".equalsIgnoreCase(CommonConfiguration.ENABLE_HEARTBEAT)) {
            this.heartbeatEnabled = true;
            this.heartbeatFrequency = Integer.parseInt(CommonConfiguration.HEARTBEAT_FREQUENCY);
        }
        initializeMode();
        this.assignedTables = set;
        this.ddlProcessor = new MysqlDDLFormatProcessor(taskProcessor, set, connectorConfig);
    }

    private void initializeMode() {
        for (String str : this.config.getMode()) {
            if (str.trim().equalsIgnoreCase("insert")) {
                this.insertSupported = true;
            } else if (str.trim().equalsIgnoreCase("update")) {
                this.updateSupported = true;
            } else if (str.trim().equalsIgnoreCase("delete")) {
                this.deleteSupported = true;
            }
        }
    }

    private long getPosition(Struct struct) {
        return struct.getStruct(DebeziumRecordConstants.DBZ_SOURCE).getInt64("pos").longValue();
    }

    private String getBinlogFileName(Struct struct) {
        return struct.getStruct(DebeziumRecordConstants.DBZ_SOURCE).getString("file");
    }

    private String getHeartbeat() {
        if (this.lastHeartbeat == 0 || (System.currentTimeMillis() - this.lastHeartbeat) / 1000 >= this.heartbeatFrequency) {
            return UniqueIdHelper.getId();
        }
        return null;
    }

    private void processDDLRecord(List<SourceRecord> list, SourceRecord sourceRecord) {
        List<Struct> dDLKafkaRecord;
        Struct struct = (Struct) sourceRecord.value();
        if (this.taskProcessor.startDatabaseTimestamp > getTimeStamp(struct).getTime()) {
            return;
        }
        if ((this.config.isStartFromBeginning() || !this.binlogFileName.equalsIgnoreCase(getBinlogFileName(struct)) || this.binlogPosition < getPosition(struct)) && (dDLKafkaRecord = this.ddlProcessor.getDDLKafkaRecord(sourceRecord)) != null) {
            for (Struct struct2 : dDLKafkaRecord) {
                TopicPartitionData dDLTopicPartitionForTable = this.taskProcessor.getDDLTopicPartitionForTable(struct2.getString("OBJECT_NAME"));
                list.add(dDLTopicPartitionForTable.isMultiPartitioned() ? new SourceRecord(sourceRecord.sourcePartition(), sourceRecord.sourceOffset(), dDLTopicPartitionForTable.getTopicName(), struct2.schema(), struct2) : new SourceRecord(sourceRecord.sourcePartition(), sourceRecord.sourceOffset(), dDLTopicPartitionForTable.getTopicName(), Integer.valueOf(dDLTopicPartitionForTable.getPartitionNumber()), struct2.schema(), struct2));
            }
        }
    }

    private void processDMLRecord(List<SourceRecord> list, SourceRecord sourceRecord, SourceRecord sourceRecord2, long j) {
        Struct struct = (Struct) sourceRecord.value();
        if (this.config.isStartFromBeginning() || !this.binlogFileName.equalsIgnoreCase(getBinlogFileName(struct)) || this.binlogPosition < getPosition(struct)) {
            try {
                try {
                    boolean z = false;
                    Struct struct2 = (Struct) sourceRecord.key();
                    Struct transactionStruct = getTransactionStruct(struct);
                    Struct uniqueStruct = getUniqueStruct(struct2, struct);
                    Struct beforeStruct = getBeforeStruct(struct);
                    Struct dataStruct = getDataStruct(struct, uniqueStruct);
                    if (sourceRecord2 != null && checkForPrimaryKeyChangeSecond(sourceRecord)) {
                        Struct struct3 = (Struct) sourceRecord2.key();
                        Struct struct4 = (Struct) sourceRecord2.value();
                        uniqueStruct = getUniqueStruct(struct3, struct4);
                        beforeStruct = getBeforeStruct(struct4);
                        z = true;
                    }
                    if (checkSupportedDMLOperations(struct.getString(DebeziumRecordConstants.DBZ_OPERATION), z)) {
                        String str = null;
                        if (this.heartbeatEnabled) {
                            str = getHeartbeat();
                        }
                        Struct cDCMessage = getCDCMessage(struct, transactionStruct, uniqueStruct, beforeStruct, dataStruct, str, z);
                        TopicPartitionData topicPartitionForTable = this.taskProcessor.getTopicPartitionForTable(getTable(struct));
                        SourceRecord sourceRecord3 = topicPartitionForTable.isMultiPartitioned() ? new SourceRecord(sourceRecord.sourcePartition(), sourceRecord.sourceOffset(), topicPartitionForTable.getTopicName(), cDCMessage.schema(), cDCMessage) : new SourceRecord(sourceRecord.sourcePartition(), sourceRecord.sourceOffset(), topicPartitionForTable.getTopicName(), Integer.valueOf(topicPartitionForTable.getPartitionNumber()), cDCMessage.schema(), cDCMessage);
                        updateMetrics(getTable(struct), beforeStruct, dataStruct);
                        processHeartbeatData(struct, str, j);
                        list.add(sourceRecord3);
                    }
                } catch (TableDetailsNotFoundException e) {
                    log.error("processing record " + sourceRecord.toString() + " failed with following error :" + e.getMessage());
                    this.taskProcessor.postError("HIGH", "TASK FAILED", e, this.config.getJobExecutionId());
                    throw new IllegalArgumentException("Error while processing record", e);
                }
            } catch (IOException | ParseException e2) {
                log.error("Processing Spatial data for record " + sourceRecord.toString() + "failed with following error " + e2.getMessage());
                this.taskProcessor.postError("HIGH", "TASK FAILED", e2, this.config.getJobExecutionId());
                throw new IllegalArgumentException("Error while Processing Spatial data", e2);
            }
        }
    }

    private boolean checkSupportedDMLOperations(String str, boolean z) {
        if (z) {
            str = DebeziumRecordConstants.DBZ_UPDATE;
        }
        String str2 = str;
        boolean z2 = -1;
        switch (str2.hashCode()) {
            case 99:
                if (str2.equals(DebeziumRecordConstants.DBZ_INSERT)) {
                    z2 = false;
                    break;
                }
                break;
            case 100:
                if (str2.equals(DebeziumRecordConstants.DBZ_DELETE)) {
                    z2 = 2;
                    break;
                }
                break;
            case 117:
                if (str2.equals(DebeziumRecordConstants.DBZ_UPDATE)) {
                    z2 = true;
                    break;
                }
                break;
        }
        switch (z2) {
            case false:
                return this.insertSupported;
            case true:
                return this.updateSupported;
            case true:
                return this.deleteSupported;
            default:
                return false;
        }
    }

    public List<SourceRecord> processDebeziumRecords(List<SourceRecord> list) {
        ArrayList arrayList = new ArrayList();
        SourceRecord sourceRecord = null;
        for (SourceRecord sourceRecord2 : list) {
            Struct struct = (Struct) sourceRecord2.value();
            if (struct != null) {
                if (isDDLRecord(sourceRecord2)) {
                    if (this.config.isDDLCaptured()) {
                        processDDLRecord(arrayList, sourceRecord2);
                    }
                } else if (this.assignedTables.contains(getTable(struct))) {
                    if (checkForPrimaryKeyChangeFirst(sourceRecord2)) {
                        sourceRecord = sourceRecord2;
                    } else {
                        processDMLRecord(arrayList, sourceRecord2, sourceRecord, struct.getInt64(DebeziumRecordConstants.DBZ_TIMESTAMP).longValue());
                        sourceRecord = null;
                    }
                }
            }
        }
        return arrayList;
    }

    private boolean checkForPrimaryKeyChangeFirst(SourceRecord sourceRecord) {
        Headers headers = sourceRecord.headers();
        if (headers == null) {
            return false;
        }
        Iterator it = headers.iterator();
        while (it.hasNext()) {
            if (DebeziumRecordConstants.DBZ_NEW_KEY_ID.equals(((Header) it.next()).key())) {
                return true;
            }
        }
        return false;
    }

    private boolean checkForPrimaryKeyChangeSecond(SourceRecord sourceRecord) {
        Headers headers = sourceRecord.headers();
        if (headers == null) {
            return false;
        }
        Iterator it = headers.iterator();
        while (it.hasNext()) {
            if (DebeziumRecordConstants.DBZ_OLD_KEY_ID.equals(((Header) it.next()).key())) {
                return true;
            }
        }
        return false;
    }

    private boolean isDDLRecord(SourceRecord sourceRecord) {
        Iterator it = ((Struct) sourceRecord.value()).schema().fields().iterator();
        while (it.hasNext()) {
            if (DebeziumRecordConstants.DBZ_DDL.equals(((Field) it.next()).name())) {
                return true;
            }
        }
        return false;
    }

    private long updateStructDataMetrics(Struct struct, Struct struct2) {
        long j = 0;
        if (struct != null) {
            j = 0 + StructUtils.getSize(struct);
        }
        if (struct2 != null) {
            j += StructUtils.getSize(struct2);
        }
        return j;
    }

    private void updateMetricsForLOB(long j) {
        synchronized (this.metricsLock) {
            this.dataProcessedIncremental.addAndGet(j);
            this.metricsChangeFlag.set(true);
        }
    }

    private void updateMetrics(String str, Struct struct, Struct struct2) {
        boolean z = false;
        if (StringUtils.isNotBlank(str) && !this.processedTables.containsKey(str + CommonConstants.HYPHEN + this.config.getJobExecutionId())) {
            this.processedTables.put(str + CommonConstants.HYPHEN + this.config.getJobExecutionId(), CommonConstants.EMPTY);
            z = true;
        }
        updateMetrics(updateStructDataMetrics(struct, struct2), z);
    }

    private void updateMetrics(long j, boolean z) {
        synchronized (this.metricsLock) {
            if (z) {
                this.tablesProcessedIncremental.incrementAndGet();
            }
            this.dataProcessedIncremental.addAndGet(j);
            this.metricsChangeFlag.set(true);
        }
    }

    private void processHeartbeatData(Struct struct, String str, long j) {
        if (str != null) {
            this.lastHeartbeat = System.currentTimeMillis();
            this.taskProcessor.postHeartbeat(getHeartBeatData(struct, str, j));
        }
    }

    private HeartbeatData getHeartBeatData(Struct struct, String str, long j) {
        HeartbeatData heartbeatData = new HeartbeatData();
        heartbeatData.setLazyUid(str);
        heartbeatData.setSubmissionId(this.config.getJobExecutionId());
        heartbeatData.setSourceSchema(getDatabaseName(struct));
        heartbeatData.setSourceEntity(getTable(struct));
        heartbeatData.setSourceTaskId(this.config.getConnectorName() + "_" + this.config.getIntTaskId());
        heartbeatData.setSourceCommitTime(CommonUtil.convertTimestampToSystemDate(j));
        heartbeatData.setKafkaCommitTime(CommonUtil.convertTimestampToSystemDate(this.lastHeartbeat));
        heartbeatData.setSourceConnectionId(this.config.getConnectionId());
        heartbeatData.setCreatedUser((String) null);
        heartbeatData.setUpdatedUser((String) null);
        return heartbeatData;
    }

    public Schema getCDCMessageSchema(Struct struct, Struct struct2, Struct struct3, Struct struct4, Struct struct5, String str) {
        if (str != null) {
            return SchemaBuilder.struct().name(getSchemaName(struct)).field("DATA_STORE", Schema.STRING_SCHEMA).field("SEG_OWNER", Schema.STRING_SCHEMA).field(MySQLQueries.TABLE_COLUMN, Schema.STRING_SCHEMA).field("TIMESTAMP", Timestamp.SCHEMA).field("OPERATION", Schema.STRING_SCHEMA).field("LOB_COLUMNS", Schema.OPTIONAL_STRING_SCHEMA).field("transaction", struct2.schema()).field("unique", struct4.schema()).field("data", struct3 == null ? CommonConstants.EMPTY_SCHEMA : struct3.schema()).field(DebeziumRecordConstants.DBZ_BEFORE, struct5 == null ? CommonConstants.EMPTY_SCHEMA : struct5.schema()).field("HEARTBEAT_IDENTIFIER", Schema.OPTIONAL_STRING_SCHEMA).build();
        }
        return SchemaBuilder.struct().name(getSchemaName(struct)).field("DATA_STORE", Schema.STRING_SCHEMA).field("SEG_OWNER", Schema.STRING_SCHEMA).field(MySQLQueries.TABLE_COLUMN, Schema.STRING_SCHEMA).field("TIMESTAMP", Timestamp.SCHEMA).field("OPERATION", Schema.STRING_SCHEMA).field("LOB_COLUMNS", Schema.OPTIONAL_STRING_SCHEMA).field("transaction", struct2.schema()).field("unique", struct4.schema()).field("data", struct3 == null ? CommonConstants.EMPTY_SCHEMA : struct3.schema()).field(DebeziumRecordConstants.DBZ_BEFORE, struct5 == null ? CommonConstants.EMPTY_SCHEMA : struct5.schema()).build();
    }

    private Struct getCDCMessage(Struct struct, Struct struct2, Struct struct3, Struct struct4, Struct struct5, String str, boolean z) throws TableDetailsNotFoundException {
        Schema cDCMessageSchema = getCDCMessageSchema(struct, struct2, struct5, struct3, struct4, str);
        return str != null ? new Struct(cDCMessageSchema).put("DATA_STORE", CommonConstants.MYSQL_DATA_STORE).put("SEG_OWNER", getDatabaseName(struct)).put(MySQLQueries.TABLE_COLUMN, getTable(struct)).put("TIMESTAMP", getTimeStamp(struct)).put("OPERATION", getOperation(struct, z)).put("LOB_COLUMNS", getLOBColumns(struct)).put("transaction", struct2).put("unique", struct3).put("data", struct5).put(DebeziumRecordConstants.DBZ_BEFORE, struct4).put("HEARTBEAT_IDENTIFIER", str) : new Struct(cDCMessageSchema).put("DATA_STORE", CommonConstants.MYSQL_DATA_STORE).put("SEG_OWNER", getDatabaseName(struct)).put(MySQLQueries.TABLE_COLUMN, getTable(struct)).put("TIMESTAMP", getTimeStamp(struct)).put("OPERATION", getOperation(struct, z)).put("LOB_COLUMNS", getLOBColumns(struct)).put("transaction", struct2).put("unique", struct3).put("data", struct5).put(DebeziumRecordConstants.DBZ_BEFORE, struct4);
    }

    private String getTable(Struct struct) {
        return struct.getStruct(DebeziumRecordConstants.DBZ_SOURCE).getString(DebeziumRecordConstants.DBZ_TABLE);
    }

    private java.sql.Timestamp getTimeStamp(Struct struct) {
        return new java.sql.Timestamp(struct.getStruct(DebeziumRecordConstants.DBZ_SOURCE).getInt64(DebeziumRecordConstants.DBZ_TIMESTAMP).longValue());
    }

    private String getDatabaseName(Struct struct) {
        return struct.getStruct(DebeziumRecordConstants.DBZ_SOURCE).getString(DebeziumRecordConstants.DBZ_DATABASE);
    }

    protected String getLOBColumns(Struct struct) throws TableDetailsNotFoundException {
        Struct struct2 = struct.getStruct(DebeziumRecordConstants.DBZ_AFTER) == null ? struct.getStruct(DebeziumRecordConstants.DBZ_BEFORE) : struct.getStruct(DebeziumRecordConstants.DBZ_AFTER);
        StringJoiner stringJoiner = new StringJoiner(",");
        for (Field field : struct2.schema().fields()) {
            if (isBinaryUploadType(field, struct) || isTextUploadType(field, struct)) {
                stringJoiner.add(field.name());
            }
        }
        return stringJoiner.toString();
    }

    private String getOperation(Struct struct, boolean z) {
        return z ? "UPDATE" : getOperation(struct);
    }

    private String getOperation(Struct struct) {
        String str;
        String string = struct.getString(DebeziumRecordConstants.DBZ_OPERATION);
        boolean z = -1;
        switch (string.hashCode()) {
            case 99:
                if (string.equals(DebeziumRecordConstants.DBZ_INSERT)) {
                    z = false;
                    break;
                }
                break;
            case 100:
                if (string.equals(DebeziumRecordConstants.DBZ_DELETE)) {
                    z = true;
                    break;
                }
                break;
            case 117:
                if (string.equals(DebeziumRecordConstants.DBZ_UPDATE)) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                str = "INSERT";
                break;
            case true:
                str = "DELETE";
                break;
            case true:
                str = "UPDATE";
                break;
            default:
                str = CommonConstants.EMPTY;
                break;
        }
        return str;
    }

    public String getTransaction(Struct struct) {
        StringBuilder sb = new StringBuilder();
        Struct struct2 = struct.getStruct(DebeziumRecordConstants.DBZ_SOURCE);
        String string = struct2.getString("gtid");
        String string2 = struct2.getString("file");
        Long int64 = struct2.getInt64("pos");
        if (!StringUtils.isBlank(string)) {
            sb.append(DebeziumRecordConstants.DBZ_SOURCE).append(string.replace(CommonConstants.HYPHEN, CommonConstants.DOT).replace(CommonConstants.COLON, CommonConstants.DOT)).append(CommonConstants.DOT);
        }
        sb.append("file").append(string2).append(CommonConstants.DOT);
        sb.append("pos").append(int64);
        return sb.toString();
    }

    private Struct getTransactionStruct(Struct struct) {
        Struct struct2 = struct.getStruct(DebeziumRecordConstants.DBZ_SOURCE);
        return new Struct(this.transactionSchema).put("properties", Arrays.asList(new Struct(this.txnPropertySchema).put(CDCMysqlProperties.CONNECTOR_NAME, "file").put("value", getValue(struct2.getString("file"))), new Struct(this.txnPropertySchema).put(CDCMysqlProperties.CONNECTOR_NAME, "pos").put("value", getValue(struct2.getInt64("pos"))), new Struct(this.txnPropertySchema).put(CDCMysqlProperties.CONNECTOR_NAME, "gtid").put("value", getValue(struct2.getString("gtid")))));
    }

    private String getValue(Object obj) {
        return obj == null ? CommonConstants.EMPTY : obj instanceof String ? (String) obj : String.valueOf(obj);
    }

    private String getSchemaName(Struct struct) {
        Struct struct2 = struct.getStruct(DebeziumRecordConstants.DBZ_SOURCE);
        return struct2.getString(DebeziumRecordConstants.DBZ_DATABASE) + CommonConstants.DOT + struct2.getString(DebeziumRecordConstants.DBZ_TABLE);
    }

    private Struct getBeforeStruct(Struct struct) throws TableDetailsNotFoundException, IOException, ParseException {
        if ("INSERT".equals(getOperation(struct))) {
            return null;
        }
        SchemaBuilder name = SchemaBuilder.struct().name(DebeziumRecordConstants.DBZ_BEFORE);
        for (Field field : struct.schema().field(DebeziumRecordConstants.DBZ_BEFORE).schema().fields()) {
            if ("INSERT".equals(getOperation(struct))) {
                name.field(field.name(), new SchemaBuilder(decodeSchema(field.schema()).type()).optional().build());
            } else {
                name.field(field.name(), decodeSchema(field.schema()));
            }
        }
        Struct struct2 = new Struct(name.optional().build());
        Struct struct3 = struct.getStruct(DebeziumRecordConstants.DBZ_BEFORE);
        if (struct3 != null) {
            for (Field field2 : struct3.schema().fields()) {
                if (isBinaryUploadType(field2, struct) || isTextUploadType(field2, struct)) {
                    struct2.put(field2, (Object) null);
                } else if (isSpatialType(field2.schema())) {
                    struct2.put(field2.name(), getGMLFromWKB(struct3.getStruct(field2.name()).getBytes(WKB_TYPE)));
                } else {
                    struct2.put(field2.name(), decodeStruct(field2.schema(), struct3.get(field2)));
                }
            }
        }
        return struct2;
    }

    private boolean isSpatialType(Schema schema) {
        return schema.toString().contains(SPATIAL_SCHEMA);
    }

    protected boolean isLobUpload() {
        String lobDirectUpload = this.config.getLobDirectUpload();
        return StringUtils.isNotBlank(lobDirectUpload) && ("OBS".equalsIgnoreCase(lobDirectUpload) || "HDFS".equalsIgnoreCase(lobDirectUpload));
    }

    private void processBinaryData(Struct struct, Struct struct2, Field field, Struct struct3, Struct struct4) throws TableDetailsNotFoundException {
        Object obj = struct.get(field);
        ByteBuffer wrap = obj instanceof ByteBuffer ? (ByteBuffer) obj : ByteBuffer.wrap((byte[]) obj);
        byte[] bArr = new byte[wrap.remaining()];
        wrap.get(bArr);
        if (!this.isLobUpload || !isBinaryUploadType(field, struct3)) {
            struct2.put(field, bArr);
            return;
        }
        String objectKey = getObjectKey(field.name(), struct3, struct4);
        ProcessLobData processLobData = new ProcessLobData(this.config);
        List asList = Bytes.asList(bArr);
        updateMetricsForLOB(asList.size());
        processLobData.addBlobData(field.name(), asList, true, objectKey, struct2);
    }

    private void processTextData(Struct struct, Field field, String str, Struct struct2, Struct struct3) throws TableDetailsNotFoundException {
        if (!this.isLobUpload || !isTextUploadType(field, struct2)) {
            struct.put(field, str);
            return;
        }
        String objectKey = getObjectKey(field.name(), struct2, struct3);
        byte[] bytes = str.getBytes();
        ProcessLobData processLobData = new ProcessLobData(this.config);
        List asList = Bytes.asList(bytes);
        updateMetricsForLOB(asList.size());
        processLobData.addClobData(field.name(), asList, true, objectKey, struct);
    }

    private boolean isBinaryUploadType(Field field, Struct struct) throws TableDetailsNotFoundException {
        Map<String, String> tableColumnTypes = this.taskProcessor.getTableColumnTypes(getTable(struct));
        return tableColumnTypes.get(field.name()).equals(CommonConstants.MEDIUM_BLOB) || tableColumnTypes.get(field.name()).equals(CommonConstants.LONG_BLOB);
    }

    private boolean isTextUploadType(Field field, Struct struct) throws TableDetailsNotFoundException {
        Map<String, String> tableColumnTypes = this.taskProcessor.getTableColumnTypes(getTable(struct));
        return tableColumnTypes.get(field.name()).equals(CommonConstants.MEDIUM_TEXT) || tableColumnTypes.get(field.name()).equals(CommonConstants.LONG_TEXT);
    }

    private void prepareSchemaBuilder(SchemaBuilder schemaBuilder, Field field, Struct struct) throws TableDetailsNotFoundException {
        if (this.isLobUpload && field.schema().type() == Schema.Type.BYTES && isBinaryUploadType(field, struct)) {
            schemaBuilder.field(field.name(), Schema.OPTIONAL_STRING_SCHEMA);
        } else if (!"DELETE".equals(getOperation(struct))) {
            schemaBuilder.field(field.name(), decodeSchema(field.schema()));
        } else {
            schemaBuilder.field(field.name(), new SchemaBuilder(decodeSchema(field.schema()).type()).optional().build());
        }
    }

    private Struct getDataStruct(Struct struct, Struct struct2) throws TableDetailsNotFoundException, IOException, ParseException {
        if ("DELETE".equals(getOperation(struct))) {
            return null;
        }
        SchemaBuilder name = SchemaBuilder.struct().name("data");
        Iterator it = struct.schema().field(DebeziumRecordConstants.DBZ_AFTER).schema().fields().iterator();
        while (it.hasNext()) {
            prepareSchemaBuilder(name, (Field) it.next(), struct);
        }
        Struct struct3 = new Struct(name.optional().build());
        Struct struct4 = struct.getStruct(DebeziumRecordConstants.DBZ_AFTER);
        if (struct4 != null) {
            for (Field field : struct4.schema().fields()) {
                Schema schema = field.schema();
                if (schema.type() != Schema.Type.BYTES || "org.apache.kafka.connect.data.Decimal".equals(schema.name())) {
                    if (isSpatialType(field.schema()) && struct4.getStruct(field.name()) != null) {
                        struct3.put(field.name(), getGMLFromWKB(struct4.getStruct(field.name()).getBytes(WKB_TYPE)));
                    } else if (field.schema().type() != Schema.Type.STRING || "io.debezium.time.ZonedTimestamp".equals(field.schema().name())) {
                        struct3.put(field.name(), decodeStruct(field.schema(), struct4.get(field)));
                    } else if (struct4.get(field) == null) {
                        struct3.put(field, (Object) null);
                    } else {
                        processTextData(struct3, field, String.valueOf(struct4.get(field)), struct, struct2);
                    }
                } else if (struct4.get(field) == null) {
                    struct3.put(field, (Object) null);
                } else {
                    processBinaryData(struct4, struct3, field, struct, struct2);
                }
            }
        }
        return struct3;
    }

    private String getGMLFromWKB(byte[] bArr) throws ParseException, IOException {
        try {
            Geometry read = new WKBReader().read(bArr);
            Encoder encoder = new Encoder(new GMLConfiguration());
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                encoder.encode(read, GML._GML, byteArrayOutputStream);
                return byteArrayOutputStream.toString().replaceFirst(REPLACE_GML_METADATA, CommonConstants.EMPTY);
            } catch (IOException e) {
                log.error("Error while conveting WKB (Well Known Binary) for Spatial Data {}", e.getMessage());
                this.taskProcessor.postError("LOW", "RUNNING", e, this.config.getJobExecutionId());
                throw new IOException("Error while conveting WKB (Well Known Binary) for Spatial Data {}", e);
            }
        } catch (ParseException e2) {
            log.error("Error while parsing WKB (Well Known Binary) for Spatial Data {}", e2.getMessage());
            this.taskProcessor.postError("LOW", "RUNNING", e2, this.config.getJobExecutionId());
            throw new ParseException("Error while parsing WKB (Well Known Binary) for Spatial Data {}", e2);
        }
    }

    private Schema decodeSchema(Schema schema) {
        return (this.config.isTimeZoneRequired() && isTimeZoneType(schema.name())) ? schema.isOptional() ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA : isSpatialType(schema) ? Schema.OPTIONAL_STRING_SCHEMA : schema;
    }

    private boolean isTimeZoneType(String str) {
        return "org.apache.kafka.connect.data.Date".equals(str) || "org.apache.kafka.connect.data.Time".equals(str) || "org.apache.kafka.connect.data.Timestamp".equals(str) || "io.debezium.time.ZonedTimestamp".equals(str);
    }

    private Object decodeStruct(Schema schema, Object obj) {
        if (obj == null || !this.config.isTimeZoneRequired() || !isTimeZoneType(schema.name())) {
            return obj;
        }
        String databaseTimezone = this.taskProcessor.getDatabaseTimezone();
        return "io.debezium.time.ZonedTimestamp".equals(schema.name()) ? obj + CommonConstants.OPEN_BRACKET + databaseTimezone + CommonConstants.CLOSE_BRACKET : new java.sql.Timestamp(((Date) obj).getTime()).toString() + CommonConstants.OPEN_BRACKET + databaseTimezone + CommonConstants.CLOSE_BRACKET;
    }

    protected boolean isObsUpload() {
        String lobDirectUpload = this.config.getLobDirectUpload();
        return StringUtils.isNotBlank(lobDirectUpload) && "OBS".equalsIgnoreCase(lobDirectUpload);
    }

    private String getKeyValues(Struct struct) {
        StringBuilder sb = new StringBuilder();
        for (Field field : struct.schema().fields()) {
            sb.append(field.name());
            if (struct.get(field) == null || !(field.schema().equals(Timestamp.SCHEMA) || field.schema().equals(Timestamp.builder().optional().schema()))) {
                sb.append(struct.get(field));
            } else {
                sb.append(new SimpleDateFormat("yyyyMMddHHmmss").format((Date) new java.sql.Timestamp(((Date) struct.get(field)).getTime())));
            }
        }
        return sb.toString().replace(CommonConstants.HYPHEN, CommonConstants.EMPTY).replace(CommonConstants.COLON, CommonConstants.EMPTY);
    }

    private String getObjectKey(String str, Struct struct, Struct struct2) {
        String str2 = CommonConstants.EMPTY;
        if (isLobUpload()) {
            str2 = (isObsUpload() ? this.config.getObsLobPattern() : this.config.getHdfsLobPattern()).replace("$SCHEMANAME", getDatabaseName(struct)).replace("$TABLENAME", getTable(struct)).replace("$COLUMN", str).replace("$CHANGETIME", new SimpleDateFormat("yyyyMMddHHmmss").format((Date) getTimeStamp(struct))).replace("$UNIQUE", getKeyValues(struct2)).replace("$TRANSACTION", getTransaction(struct));
        }
        return str2;
    }

    private Schema buildPrimaryKeySchema(Struct struct, Struct struct2) throws TableDetailsNotFoundException {
        SchemaBuilder name = SchemaBuilder.struct().name("unique");
        Set<String> alternatePrimaryKeyColumns = this.taskProcessor.getAlternatePrimaryKeyColumns(getTable(struct2));
        Map<String, String> tableColumnTypes = this.taskProcessor.getTableColumnTypes(getTable(struct2));
        Struct struct3 = struct2.getString(DebeziumRecordConstants.DBZ_OPERATION).equals(DebeziumRecordConstants.DBZ_INSERT) ? struct2.getStruct(DebeziumRecordConstants.DBZ_AFTER) : struct2.getStruct(DebeziumRecordConstants.DBZ_BEFORE);
        boolean z = true;
        if (alternatePrimaryKeyColumns != null) {
            for (Field field : struct3.schema().fields()) {
                if (alternatePrimaryKeyColumns.contains(field.name()) && includeColumnForUnique(field, tableColumnTypes)) {
                    name.field(field.name(), decodeSchema(field.schema()));
                    z = false;
                }
            }
        }
        if (z) {
            if (struct != null) {
                for (Field field2 : struct.schema().fields()) {
                    if (includeColumnForUnique(field2, tableColumnTypes)) {
                        name.field(field2.name(), decodeSchema(field2.schema()));
                    }
                }
            } else {
                for (Field field3 : struct3.schema().fields()) {
                    if (includeColumnForUnique(field3, tableColumnTypes)) {
                        name.field(field3.name(), decodeSchema(field3.schema()));
                    }
                }
            }
        }
        return name.optional().build();
    }

    private boolean includeColumnForUnique(Field field, Map<String, String> map) {
        String str = map.get(field.name());
        return str == null || !(isSpatialType(field.schema()) || str.equals(CommonConstants.MEDIUM_BLOB) || str.equals(CommonConstants.LONG_BLOB) || str.equals(CommonConstants.MEDIUM_TEXT) || str.equals(CommonConstants.LONG_TEXT));
    }

    private Struct getUniqueStruct(Struct struct, Struct struct2) throws TableDetailsNotFoundException {
        Schema buildPrimaryKeySchema = buildPrimaryKeySchema(struct, struct2);
        Struct struct3 = struct2.getString(DebeziumRecordConstants.DBZ_OPERATION).equals(DebeziumRecordConstants.DBZ_INSERT) ? struct2.getStruct(DebeziumRecordConstants.DBZ_AFTER) : struct2.getStruct(DebeziumRecordConstants.DBZ_BEFORE);
        Struct struct4 = new Struct(buildPrimaryKeySchema);
        for (Field field : buildPrimaryKeySchema.fields()) {
            struct4.put(field.name(), decodeStruct(struct3.schema().field(field.name()).schema(), struct3.get(field.name())));
        }
        return struct4;
    }
}
