package com.huawei.cdc.connect.pgsql.processor;

import com.huawei.cdc.common.conf.CommonConfiguration;
import com.huawei.cdc.common.lob.LobData;
import com.huawei.cdc.common.metadata.models.HeartbeatData;
import com.huawei.cdc.common.metadata.util.UniqueIdHelper;
import com.huawei.cdc.common.util.CommonUtil;
import com.huawei.cdc.common.util.StructUtils;
import com.huawei.cdc.connect.pgsql.config.ConnectorConfig;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.io.ParseException;
import com.vividsolutions.jts.io.WKBReader;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceRecord;
import org.geotools.gml3.GML;
import org.geotools.gml3.GMLConfiguration;
import org.geotools.xml.Encoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/cdc/connect/pgsql/processor/RecordFormatProcessor.class */
public class RecordFormatProcessor {
    public static final String SPATIAL_SCHEMA = "io.debezium.data.geometry";
    public static final String REPLACE_GML_METADATA = "(<\\?xml version=\"1.0\" encoding=\"UTF-8\"\\?>)";
    String operation;
    Schema arrayschema;
    Schema propertiesSchema;
    Schema transactionSchema;
    TaskProcessor processor;
    ConnectorConfig config;
    Map<String, String> lobColumnInfo;
    Struct dataStruct;
    String tableName;
    Struct beforeDataStruct;
    LobUpload lobUpload;
    private boolean heartbeatEnabled;
    private int heartbeatFrequency;
    String heartbeatIdentifier;
    private boolean metricsChangeFlag;
    private static final int METRIC_UPDATE_INTERVAL = 20;
    private boolean insertSupported;
    private boolean updateSupported;
    private boolean deleteSupported;
    private String partition;
    static final Logger log = LoggerFactory.getLogger(RecordFormatProcessor.class);
    private static final ConcurrentHashMap<String, String> PROCESSED_TABLES = new ConcurrentHashMap<>();
    SourceRecord previousRecord = null;
    boolean isPrimaryKeyUpdate = false;
    Schema primaryKeySchema = null;
    Struct primaryKeyStruct = null;
    private long lastHeartbeat = 0;
    private long lastMetricUpdate = 0;
    private int tablesProcessedIncremental = 0;
    private long dataProcessedIncremental = 0;
    private long recordsProcessedIncremental = 0;
    private final Object metricsLock = new Object();

    public RecordFormatProcessor(ConnectorConfig connectorConfig) {
        this.config = connectorConfig;
        initializeMode();
        this.processor = TaskProcessorFactory.getInstance().getTaskProcessor(connectorConfig);
        if ("TRUE".equalsIgnoreCase(CommonConfiguration.ENABLE_HEARTBEAT)) {
            this.heartbeatEnabled = true;
            this.heartbeatFrequency = Integer.parseInt(CommonConfiguration.HEARTBEAT_FREQUENCY);
            this.heartbeatIdentifier = null;
        }
    }

    private void initializeMode() {
        for (String str : this.config.getMode()) {
            if (str.trim().equalsIgnoreCase("insert")) {
                this.insertSupported = true;
            } else if (str.trim().equalsIgnoreCase("update")) {
                this.updateSupported = true;
            } else if (str.trim().equalsIgnoreCase("delete")) {
                this.deleteSupported = true;
            }
        }
    }

    private void init() {
        this.arrayschema = SchemaBuilder.struct().field("name", Schema.STRING_SCHEMA).field("value", Schema.INT64_SCHEMA).build();
        this.propertiesSchema = SchemaBuilder.array(this.arrayschema).build();
        this.transactionSchema = SchemaBuilder.struct().name("transaction").field("properties", this.propertiesSchema).build();
    }

    private boolean checkForPrimaryKeyChange(SourceRecord sourceRecord, String str) {
        Headers headers = sourceRecord.headers();
        if (headers == null) {
            return false;
        }
        Iterator it = headers.iterator();
        while (it.hasNext()) {
            if (str.equals(((Header) it.next()).key())) {
                return true;
            }
        }
        return false;
    }

    public List<SourceRecord> processSourceRecords(PostgresConnection postgresConnection, List<SourceRecord> list) {
        Struct createKafkaRow;
        init();
        TopicTableMapping topicTableMapping = this.processor.getTopicTableMapping();
        ArrayList arrayList = new ArrayList();
        for (SourceRecord sourceRecord : list) {
            if (sourceRecord.value() != null || sourceRecord.valueSchema() != null) {
                this.isPrimaryKeyUpdate = false;
                removeTrailingSpacesFromCHARCol(sourceRecord);
                if (checkForPrimaryKeyChange(sourceRecord, PgsqlSchemaConst.DBZ_NEW_KEY_ID)) {
                    this.previousRecord = sourceRecord;
                } else {
                    if (this.previousRecord != null && checkForPrimaryKeyChange(sourceRecord, PgsqlSchemaConst.DBZ_OLD_KEY_ID)) {
                        this.isPrimaryKeyUpdate = true;
                    }
                    Struct struct = (Struct) sourceRecord.value();
                    if (checkSupportedDMLOperations(struct.getString(PgsqlSchemaConst.OP))) {
                        SchemaBuilder name = SchemaBuilder.struct().name("unique");
                        this.primaryKeySchema = null;
                        this.primaryKeyStruct = null;
                        LobData lobData = new LobData();
                        this.lobColumnInfo = new HashMap();
                        this.lobUpload = new LobUpload();
                        Map<String, String> lobColumns = this.lobUpload.getLobColumns(postgresConnection, this.config, struct, (Struct) sourceRecord.key(), this.processor.getAltPrimaryKeyCols(getTableName(struct)));
                        try {
                            createUniqueField(sourceRecord, name, lobColumns);
                            if (!this.config.getLobDirectUpload().isEmpty()) {
                                this.lobColumnInfo = this.lobUpload.upload(lobColumns, struct, this.config, this.primaryKeyStruct, lobData);
                            }
                            Struct transactionDataStruct = getTransactionDataStruct(sourceRecord);
                            Schema dataSchema = getDataSchema(sourceRecord);
                            this.dataStruct = new Struct(dataSchema);
                            Schema beforeSchema = getBeforeSchema(sourceRecord);
                            this.beforeDataStruct = new Struct(beforeSchema);
                            long longValue = struct.getInt64(PgsqlSchemaConst.TS_MS).longValue();
                            try {
                                processOperations(struct);
                                String str = struct.getStruct(PgsqlSchemaConst.SOURCE).get(PgsqlSchemaConst.SCHEMA) + PgsqlSchemaConst.DOT + struct.getStruct(PgsqlSchemaConst.SOURCE).get("table");
                                if (this.heartbeatEnabled) {
                                    this.heartbeatIdentifier = getHeartbeat();
                                }
                                if (this.heartbeatIdentifier != null) {
                                    createKafkaRow = createKafkaRow(str, getSchemaType(str, dataSchema, beforeSchema, sourceRecord, this.heartbeatIdentifier), sourceRecord, transactionDataStruct, this.heartbeatIdentifier);
                                    this.lastHeartbeat = System.currentTimeMillis();
                                    this.processor.postHeartbeat(getHeartBeatData(struct, longValue));
                                } else {
                                    createKafkaRow = createKafkaRow(str, getSchemaType(str, dataSchema, beforeSchema, sourceRecord, this.heartbeatIdentifier), sourceRecord, transactionDataStruct, this.heartbeatIdentifier);
                                }
                                updateMetrics(createKafkaRow, this.beforeDataStruct, this.dataStruct);
                                updateMetrics(lobData);
                                arrayList.add(addKafkaRecord(topicTableMapping, sourceRecord, createKafkaRow));
                                this.previousRecord = null;
                            } catch (IOException | ParseException e) {
                                log.error("Error while Processing Spatial data", e);
                                this.processor.postError("HIGH", "TASK FAILED", e, this.config.getJobExecutionId());
                                throw new IllegalArgumentException("Error while Processing Spatial data", e);
                            }
                        } catch (IOException | ParseException e2) {
                            log.error("Error while Processing Spatial data", e2);
                            this.processor.postError("HIGH", "TASK FAILED", e2, this.config.getJobExecutionId());
                            throw new IllegalArgumentException("Error while Processing Spatial data", e2);
                        }
                    } else {
                        continue;
                    }
                }
            }
        }
        return arrayList;
    }

    private boolean checkSupportedDMLOperations(String str) {
        if (this.isPrimaryKeyUpdate) {
            str = "u";
        }
        String str2 = str;
        boolean z = -1;
        switch (str2.hashCode()) {
            case 99:
                if (str2.equals("c")) {
                    z = false;
                    break;
                }
                break;
            case 100:
                if (str2.equals("d")) {
                    z = 2;
                    break;
                }
                break;
            case 117:
                if (str2.equals("u")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (this.insertSupported) {
                    return true;
                }
                log.info("Skip INSERT operation record based on mode config(allowed DML operations) = {}", this.config.getMode());
                return false;
            case true:
                if (this.updateSupported) {
                    return true;
                }
                this.previousRecord = null;
                log.info("Skip UPDATE operation record based on mode config(allowed DML operations) = {}", this.config.getMode());
                return false;
            case true:
                if (this.deleteSupported) {
                    return true;
                }
                log.info("Skip DELETE operation record based on mode config(allowed DML operations) = {}", this.config.getMode());
                return false;
            default:
                return false;
        }
    }

    private void updateMetrics(Struct struct, Struct struct2, Struct struct3) {
        String string = struct.getString("TABLE_NAME");
        boolean z = false;
        if (StringUtils.isNotBlank(string) && !PROCESSED_TABLES.containsKey(string + "-" + this.config.getJobExecutionId())) {
            PROCESSED_TABLES.put(string + "-" + this.config.getJobExecutionId(), "");
            z = true;
        }
        updateMetrics(z, updateStructDataMetrics(struct2, struct3));
    }

    private void updateMetrics(boolean z, long j) {
        synchronized (this.metricsLock) {
            if (z) {
                this.tablesProcessedIncremental++;
            }
            this.recordsProcessedIncremental++;
            this.dataProcessedIncremental += j;
            this.metricsChangeFlag = true;
        }
    }

    private long updateStructDataMetrics(Struct struct, Struct struct2) {
        long j = 0;
        if (struct != null) {
            j = 0 + StructUtils.getSize(struct);
        }
        if (struct2 != null) {
            j += StructUtils.getSize(struct2);
        }
        return j;
    }

    private void updateMetrics(LobData lobData) {
        if (lobData != null) {
            long lobDataSize = lobData.getLobDataSize();
            synchronized (this.metricsLock) {
                this.dataProcessedIncremental += lobDataSize;
                this.metricsChangeFlag = true;
            }
        }
    }

    private HeartbeatData getHeartBeatData(Struct struct, long j) {
        HeartbeatData heartbeatData = new HeartbeatData();
        heartbeatData.setLazyUid(this.heartbeatIdentifier);
        heartbeatData.setSubmissionId(this.config.getJobExecutionId());
        heartbeatData.setSourceSchema(getSegOwner(struct));
        heartbeatData.setSourceEntity(getTableName(struct));
        heartbeatData.setSourceTaskId(this.config.getConnectorName() + "_" + this.config.getIntTaskId());
        heartbeatData.setSourceCommitTime(CommonUtil.convertTimestampToSystemDate(j));
        heartbeatData.setKafkaCommitTime(CommonUtil.convertTimestampToSystemDate(this.lastHeartbeat));
        heartbeatData.setSourceConnectionId(this.config.getConnectionId());
        heartbeatData.setSourceConnectionId(this.config.getConnectionId());
        heartbeatData.setCreatedUser((String) null);
        heartbeatData.setUpdatedUser((String) null);
        return heartbeatData;
    }

    private String getHeartbeat() {
        if (this.lastHeartbeat == 0 || ChronoUnit.SECONDS.between(getDateTime(this.lastHeartbeat), getDateTime(System.currentTimeMillis())) >= this.heartbeatFrequency) {
            return UniqueIdHelper.getId();
        }
        return null;
    }

    private LocalDateTime getDateTime(long j) {
        return LocalDateTime.ofInstant(Instant.ofEpochMilli(j), TimeZone.getDefault().toZoneId());
    }

    private void removeTrailingSpacesFromCHARCol(SourceRecord sourceRecord) {
        Struct struct = ((Struct) sourceRecord.value()).getStruct("before");
        if (struct != null) {
            remove(struct);
        }
        Struct struct2 = ((Struct) sourceRecord.value()).getStruct("after");
        if (struct2 != null) {
            remove(struct2);
        }
    }

    private void remove(Struct struct) {
        for (Field field : struct.schema().fields()) {
            if (field.schema().type().equals(Schema.Type.STRING) && struct.get(field) != null) {
                struct.put(field, struct.get(field).toString().trim());
            }
        }
    }

    private void processOperations(Struct struct) throws IOException, ParseException {
        String string = struct.getString(PgsqlSchemaConst.OP);
        Struct struct2 = struct.getStruct("before");
        Struct struct3 = struct.getStruct("after");
        if (this.isPrimaryKeyUpdate) {
            string = "u";
            struct2 = ((Struct) this.previousRecord.value()).getStruct("before");
        }
        String str = string;
        boolean z = -1;
        switch (str.hashCode()) {
            case 99:
                if (str.equals("c")) {
                    z = false;
                    break;
                }
                break;
            case 100:
                if (str.equals("d")) {
                    z = 2;
                    break;
                }
                break;
            case 117:
                if (str.equals("u")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                processInsertOperation(struct3);
                return;
            case true:
                processUpdateOperation(struct2, struct3);
                return;
            case true:
                processDeleteOperation(struct2);
                return;
            default:
                log.error("Unexpected value: " + string);
                throw new IllegalStateException("Unexpected value: " + string);
        }
    }

    private void processDeleteOperation(Struct struct) throws IOException, ParseException {
        this.operation = "DELETE";
        this.dataStruct = null;
        for (Field field : struct.schema().fields()) {
            if (!this.lobColumnInfo.isEmpty() && this.lobColumnInfo.containsKey(field.name())) {
                this.beforeDataStruct.put(field.name(), (Object) null);
            } else if (field.schema().toString().contains(SPATIAL_SCHEMA)) {
                this.beforeDataStruct.put(field.name(), getGMLFromWKB(struct.getStruct(field.name()).getBytes("wkb")));
            } else {
                this.beforeDataStruct.put(field.name(), decodeStruct(field.schema(), struct.get(field.name())));
            }
        }
    }

    private void processUpdateOperation(Struct struct, Struct struct2) throws IOException, ParseException {
        this.operation = "UPDATE";
        if (struct != null) {
            for (Field field : struct.schema().fields()) {
                if (!this.lobColumnInfo.isEmpty() && this.lobColumnInfo.containsKey(field.name())) {
                    this.beforeDataStruct.put(field.name(), (Object) null);
                } else if (field.schema().toString().contains(SPATIAL_SCHEMA)) {
                    this.beforeDataStruct.put(field.name(), getGMLFromWKB(struct.getStruct(field.name()).getBytes("wkb")));
                } else {
                    this.beforeDataStruct.put(field.name(), decodeStruct(field.schema(), struct.get(field.name())));
                }
            }
        } else {
            this.beforeDataStruct = null;
        }
        for (Field field2 : struct2.schema().fields()) {
            if (!this.lobColumnInfo.isEmpty() && this.lobColumnInfo.containsKey(field2.name())) {
                this.dataStruct.put(field2.name(), this.lobColumnInfo.get(field2.name()));
            } else if (field2.schema().toString().contains(SPATIAL_SCHEMA)) {
                this.dataStruct.put(field2.name(), getGMLFromWKB(struct2.getStruct(field2.name()).getBytes("wkb")));
            } else {
                this.dataStruct.put(field2.name(), decodeStruct(field2.schema(), struct2.get(field2.name())));
            }
        }
    }

    private void processInsertOperation(Struct struct) throws ParseException, IOException {
        this.operation = "INSERT";
        this.beforeDataStruct = null;
        for (Field field : struct.schema().fields()) {
            if (!this.lobColumnInfo.isEmpty() && this.lobColumnInfo.containsKey(field.name())) {
                this.dataStruct.put(field.name(), this.lobColumnInfo.get(field.name()));
            } else if (field.schema().toString().contains(SPATIAL_SCHEMA)) {
                this.dataStruct.put(field.name(), getGMLFromWKB(struct.getStruct(field.name()).getBytes("wkb")));
            } else {
                this.dataStruct.put(field.name(), decodeStruct(field.schema(), struct.get(field.name())));
            }
        }
    }

    private String getGMLFromWKB(byte[] bArr) throws ParseException, IOException {
        try {
            Geometry read = new WKBReader().read(bArr);
            Encoder encoder = new Encoder(new GMLConfiguration());
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                encoder.encode(read, GML._GML, byteArrayOutputStream);
                return byteArrayOutputStream.toString().replaceFirst(REPLACE_GML_METADATA, "");
            } catch (IOException e) {
                log.error("Error while conveting WKB (Well Known Binary) for Spatial Data {}", e.getMessage());
                this.processor.postError("HIGH", "TASK FAILED", e, this.config.getJobExecutionId());
                throw new IOException("Error while conveting WKB (Well Known Binary) for Spatial Data {}", e);
            }
        } catch (ParseException e2) {
            log.error("Error while parsing WKB (Well Known Binary) for Spatial Data {}", e2.getMessage());
            this.processor.postError("HIGH", "TASK FAILED", e2, this.config.getJobExecutionId());
            throw new ParseException("Error while parsing WKB (Well Known Binary) for Spatial Data {}", e2);
        }
    }

    private SourceRecord addKafkaRecord(TopicTableMapping topicTableMapping, SourceRecord sourceRecord, Struct struct) {
        TopicPartition topicPartition = topicTableMapping.getTopicPartition((String) ((Struct) sourceRecord.value()).getStruct(PgsqlSchemaConst.SOURCE).get("table"));
        return topicTableMapping.isMultiplePartitionsTopic(topicPartition.topic()) ? new SourceRecord(sourceRecord.sourcePartition(), sourceRecord.sourceOffset(), topicPartition.topic(), struct.schema(), struct) : new SourceRecord(sourceRecord.sourcePartition(), sourceRecord.sourceOffset(), topicPartition.topic(), Integer.valueOf(topicPartition.partition()), struct.schema(), struct);
    }

    private void createUniqueField(SourceRecord sourceRecord, SchemaBuilder schemaBuilder, Map<String, String> map) throws IOException, ParseException {
        Struct struct = (Struct) sourceRecord.value();
        if (this.previousRecord != null && this.previousRecord.key() != null) {
            populateUniqueFields(this.previousRecord.keySchema().fields(), (Struct) this.previousRecord.key(), schemaBuilder, null, map);
            return;
        }
        if (sourceRecord.key() != null) {
            populateUniqueFields(sourceRecord.keySchema().fields(), (Struct) sourceRecord.key(), schemaBuilder, null, map);
            return;
        }
        if ("c".equals(((Struct) sourceRecord.value()).getString(PgsqlSchemaConst.OP))) {
            populateUniqueFields(sourceRecord.valueSchema().field("after").schema().fields(), struct.getStruct("after"), schemaBuilder, this.processor.getAltPrimaryKeyCols(getTableName(struct)), map);
        } else if (((Struct) sourceRecord.value()).getStruct("before") != null) {
            populateUniqueFields(sourceRecord.valueSchema().field("before").schema().fields(), struct.getStruct("before"), schemaBuilder, this.processor.getAltPrimaryKeyCols(getTableName(struct)), map);
        }
    }

    private void populateUniqueFields(List<Field> list, Struct struct, SchemaBuilder schemaBuilder, Set<String> set, Map<String, String> map) throws IOException, ParseException {
        boolean z = true;
        if (set != null && set.size() != 0) {
            for (Field field : list) {
                if (set.contains(field.name())) {
                    if (isComplexData(map, field)) {
                        log.warn("Column {} is of complex data type. Ignoring as primary key column", field.name());
                    } else {
                        schemaBuilder.field(field.name(), decodeSchema(field.schema()));
                        z = false;
                    }
                }
            }
            this.primaryKeySchema = schemaBuilder.optional().build();
            this.primaryKeyStruct = new Struct(this.primaryKeySchema);
            for (Field field2 : struct.schema().fields()) {
                if (set.contains(field2.name()) && !isComplexData(map, field2)) {
                    if (field2.schema().type().equals(Schema.Type.STRING)) {
                        this.primaryKeyStruct.put(field2.name(), struct.get(field2.name()) == null ? null : struct.get(field2.name()).toString().trim());
                    } else {
                        this.primaryKeyStruct.put(field2.name(), decodeStruct(field2.schema(), struct.get(field2.name())));
                    }
                }
            }
        }
        if (z) {
            populateRowAsUniqueFields(list, schemaBuilder, struct, map);
        }
    }

    private boolean isTimeZoneType(String str) {
        return "org.apache.kafka.connect.data.Date".equals(str) || "org.apache.kafka.connect.data.Time".equals(str) || "org.apache.kafka.connect.data.Timestamp".equals(str) || "io.debezium.time.ZonedTimestamp".equals(str) || "io.debezium.time.ZonedTime".equals(str) || "io.debezium.time.MicroDuration".equals(str);
    }

    private Schema decodeSchema(Schema schema) {
        return (this.config.isTimeZoneRequired() && isTimeZoneType(schema.name())) ? schema.isOptional() ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA : schema;
    }

    private Object decodeStruct(Schema schema, Object obj) {
        if (obj == null || !this.config.isTimeZoneRequired() || !isTimeZoneType(schema.name())) {
            return obj;
        }
        String databaseTimezone = this.processor.getDatabaseTimezone();
        return ("io.debezium.time.ZonedTimestamp".equals(schema.name()) || "io.debezium.time.ZonedTime".equals(schema.name())) ? obj + PgsqlSchemaConst.OPEN_BRACKET + databaseTimezone + PgsqlSchemaConst.CLOSE_BRACKET : "io.debezium.time.MicroDuration".equals(schema.name()) ? new Timestamp(((Long) obj).longValue()).toString() + PgsqlSchemaConst.OPEN_BRACKET + databaseTimezone + PgsqlSchemaConst.CLOSE_BRACKET : new Timestamp(((Date) obj).getTime()).toString() + PgsqlSchemaConst.OPEN_BRACKET + databaseTimezone + PgsqlSchemaConst.CLOSE_BRACKET;
    }

    private boolean isComplexData(Map<String, String> map, Field field) {
        return map.containsKey(field.name()) || field.schema().toString().contains(SPATIAL_SCHEMA);
    }

    private void populateRowAsUniqueFields(List<Field> list, SchemaBuilder schemaBuilder, Struct struct, Map<String, String> map) throws IOException, ParseException {
        for (Field field : list) {
            if (isComplexData(map, field)) {
                log.warn("Column {} is of dataType {}, Ignoring complex data type as primary key column", field.name(), map.get(field.name()));
            } else {
                schemaBuilder.field(field.name(), decodeSchema(field.schema()));
            }
        }
        this.primaryKeySchema = schemaBuilder.optional().build();
        this.primaryKeyStruct = new Struct(this.primaryKeySchema);
        for (Field field2 : struct.schema().fields()) {
            if (!isComplexData(map, field2)) {
                if (field2.schema().type().equals(Schema.Type.STRING)) {
                    this.primaryKeyStruct.put(field2.name(), struct.get(field2.name()) == null ? null : struct.get(field2.name()).toString().trim());
                } else {
                    this.primaryKeyStruct.put(field2.name(), decodeStruct(field2.schema(), struct.get(field2.name())));
                }
            }
        }
    }

    private Schema getBeforeSchema(SourceRecord sourceRecord) {
        SchemaBuilder name = SchemaBuilder.struct().name("before");
        for (Field field : sourceRecord.valueSchema().field("before").schema().fields()) {
            if (this.lobColumnInfo.containsKey(field.name()) || field.schema().toString().contains(SPATIAL_SCHEMA)) {
                name.field(field.name(), Schema.OPTIONAL_STRING_SCHEMA);
            } else {
                name.field(field.name(), decodeSchema(field.schema()));
            }
        }
        return name.optional().build();
    }

    private Schema getDataSchema(SourceRecord sourceRecord) {
        SchemaBuilder name = SchemaBuilder.struct().name("data");
        for (Field field : sourceRecord.valueSchema().field("after").schema().fields()) {
            if (this.lobColumnInfo.containsKey(field.name()) || field.schema().toString().contains(SPATIAL_SCHEMA)) {
                name.field(field.name(), Schema.OPTIONAL_STRING_SCHEMA);
            } else {
                name.field(field.name(), decodeSchema(field.schema()));
            }
        }
        return name.optional().build();
    }

    private Struct getTransactionDataStruct(SourceRecord sourceRecord) {
        return new Struct(this.transactionSchema).put("properties", Arrays.asList(new Struct(this.arrayschema).put("name", PgsqlSchemaConst.LSN).put("value", ((Struct) sourceRecord.value()).getStruct(PgsqlSchemaConst.SOURCE).get(PgsqlSchemaConst.LSN)), new Struct(this.arrayschema).put("name", PgsqlSchemaConst.TXID).put("value", ((Struct) sourceRecord.value()).getStruct(PgsqlSchemaConst.SOURCE).get(PgsqlSchemaConst.TXID))));
    }

    private Schema getSchemaType(String str, Schema schema, Schema schema2, SourceRecord sourceRecord, String str2) {
        resolveTableAndPartition(getTableName((Struct) sourceRecord.value()));
        SchemaBuilder field = SchemaBuilder.struct().name(str).field("DATA_STORE", Schema.STRING_SCHEMA).field("SEG_OWNER", Schema.STRING_SCHEMA).field("TABLE_NAME", Schema.STRING_SCHEMA);
        if (this.partition != null) {
            field.field("PARTITION_NAME", Schema.STRING_SCHEMA);
        }
        field.field("TIMESTAMP", org.apache.kafka.connect.data.Timestamp.SCHEMA).field("OPERATION", Schema.STRING_SCHEMA).field("LOB_COLUMNS", Schema.OPTIONAL_STRING_SCHEMA).field("transaction", this.transactionSchema).field("unique", this.primaryKeySchema).field("data", schema).field("before", schema2);
        if (str2 != null) {
            field.field("HEARTBEAT_IDENTIFIER", Schema.OPTIONAL_STRING_SCHEMA);
        }
        return field.build();
    }

    private Struct createKafkaRow(String str, Schema schema, SourceRecord sourceRecord, Struct struct, String str2) {
        SchemaBuilder name = SchemaBuilder.struct().name(str);
        for (Field field : schema.fields()) {
            name.field(field.name(), field.schema());
        }
        Struct struct2 = (Struct) sourceRecord.value();
        Struct put = new Struct(name.build()).put("DATA_STORE", PgsqlSchemaConst.POSTGRESQL).put("SEG_OWNER", getSegOwner(struct2)).put("TABLE_NAME", this.tableName);
        if (this.partition != null) {
            put.put("PARTITION_NAME", this.partition);
        }
        put.put("TIMESTAMP", getTimeStamp(struct2)).put("OPERATION", this.operation).put("LOB_COLUMNS", getLobColumns()).put("transaction", struct).put("unique", this.primaryKeyStruct).put("data", this.dataStruct).put("before", this.beforeDataStruct);
        return str2 != null ? put.put("HEARTBEAT_IDENTIFIER", str2) : put;
    }

    private void resolveTableAndPartition(String str) {
        for (Map.Entry<String, Set<String>> entry : this.processor.getTablePartitions().entrySet()) {
            if (entry.getValue().contains(str)) {
                String key = entry.getKey();
                this.tableName = key;
                if (str.equalsIgnoreCase(key)) {
                    this.partition = null;
                    return;
                } else {
                    this.partition = str;
                    return;
                }
            }
        }
    }

    private Timestamp getTimeStamp(Struct struct) {
        return new Timestamp(struct.getStruct(PgsqlSchemaConst.SOURCE).getInt64(PgsqlSchemaConst.TS_MS).longValue());
    }

    private String getTableName(Struct struct) {
        return struct.getStruct(PgsqlSchemaConst.SOURCE).getString("table");
    }

    private String getSegOwner(Struct struct) {
        return struct.getStruct(PgsqlSchemaConst.SOURCE).getString(PgsqlSchemaConst.SCHEMA);
    }

    private String getLobColumns() {
        String str = null;
        if (this.lobColumnInfo != null && !this.lobColumnInfo.isEmpty()) {
            str = String.join(",", this.lobColumnInfo.keySet());
        }
        return str;
    }

    public void manageStatus() {
        if (this.lastMetricUpdate == 0 || ChronoUnit.SECONDS.between(getDateTime(this.lastMetricUpdate), getDateTime(System.currentTimeMillis())) >= 20) {
            this.lastMetricUpdate = System.currentTimeMillis();
            synchronized (this.metricsLock) {
                if (this.metricsChangeFlag) {
                    postCounts();
                }
            }
        }
    }

    private void postCounts() {
        synchronized (this.metricsLock) {
            this.processor.updateMetrics(this.config.getIntTaskId(), this.tablesProcessedIncremental, this.recordsProcessedIncremental, this.dataProcessedIncremental, this.config.getJobExecutionId());
            this.metricsChangeFlag = false;
            this.tablesProcessedIncremental = 0;
            this.dataProcessedIncremental = 0L;
            this.recordsProcessedIncremental = 0L;
        }
    }
}
