package com.huawei.cdc.connect.obs.writer;

import com.google.common.base.Joiner;
import com.google.gson.JsonObject;
import com.huawei.cdc.common.VariablePreprocess;
import com.huawei.cdc.common.auth.hdfs.AuthUtil;
import com.huawei.cdc.common.auth.hdfs.HAClusterUtil;
import com.huawei.cdc.common.conf.CommonConfiguration;
import com.huawei.cdc.common.conf.ConfigUtil;
import com.huawei.cdc.common.metadata.client.ConnectorClient;
import com.huawei.cdc.common.sink.parser.SinkRecordParser;
import com.huawei.cdc.common.storageutil.storageutilimpl.OBSStroageImpl;
import com.huawei.cdc.common.util.CommonUtil;
import com.huawei.cdc.common.util.SinkWriterUtils;
import com.huawei.cdc.connect.obs.config.ObsConnectorConfig;
import com.huawei.cdc.notification.INotification;
import com.obs.services.ObsClient;
import com.obs.services.ObsConfiguration;
import com.obs.services.exception.ObsException;
import com.obs.services.model.AppendObjectRequest;
import com.obs.services.model.InitiateMultipartUploadRequest;
import com.obs.services.model.InitiateMultipartUploadResult;
import com.obs.services.model.UploadPartResult;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import javafx.util.Pair;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/cdc/connect/obs/writer/ObsWriter.class */
public class ObsWriter {
    public static final String CSV_FORMAT = ".csv";
    private static String endPoint;
    private static ObsClient obsClient;
    private static String lobDownloadType;
    private static ObsClient sourceObsClient;
    private static final int MAX_RETRIES = 3;
    private FileSystem fs;
    private String messageFormat;
    private String tableNameString;
    private String lobColumnString;
    private Boolean isOggType;
    private String operationTimestampString;
    private String lobPath;
    private boolean insertSupported;
    private boolean updateSupported;
    private boolean deleteSupported;
    private String afterDataString;
    private String uniqueDataString;
    private String targetConnectionId;
    private static SinkRecordParser sinkRecordParser;
    private String baseFilePath;
    private int processedRecords;
    private String jobExecutionId;
    private String targetTaskId;
    private static final Logger log = LoggerFactory.getLogger(ObsWriter.class);
    private static final String OBS_SINK_BASE_DIR = "CDL" + File.separator;
    private static boolean lobDownload = false;
    private static String bucketName = "cdc-test";
    private int cacheRefreshRate = 500;
    private String path = "${datastore}/schemas/${schemaName}/tables/${tableName}/data/${changeTime(yyyyMMddHHmmss)}/${tableName}.csv";
    private String tableSchemaPath = "${datastore}/schemas/${schemaName}/tables/${tableName}/metadata/${changeTime(yyyyMMddHHmmss)}/${tableName}.csv";
    private String indexSchemaPath = "${datastore}/schemas/${schemaName}/tables/${tableName}/metadata/${changeTime(yyyyMMddHHmmss)}/${tableName}_index.csv";
    private String schemaReplicationPath = "${datastore}/schemas/${schemaName}/metadata/${schemaName}.csv";
    private boolean isLobPathEnabled = false;
    private SimpleDateFormat dataFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    private long currentOffset = -1;
    private Map<String, String> unCommitFiles = new HashMap();
    private Map<String, String> objectPathMap = new HashMap();
    private Map<String, List<String>> tableCache = new HashMap();

    public void initialize(Map<String, String> map) {
        ObsConnectorConfig obsConnectorConfig = new ObsConnectorConfig(map);
        sinkRecordParser = new SinkRecordParser();
        this.baseFilePath = null;
        if (StringUtils.isNotBlank(obsConnectorConfig.getObsPath())) {
            this.baseFilePath = obsConnectorConfig.getObsPath();
            constructObsSinkBasePath();
        } else if (StringUtils.isNotBlank(CommonConfiguration.SINK_FILE_PATH)) {
            this.baseFilePath = CommonConfiguration.SINK_FILE_PATH;
            constructObsSinkBasePath();
        } else {
            this.baseFilePath = OBS_SINK_BASE_DIR;
        }
        if (this.baseFilePath != null) {
            this.path = this.baseFilePath + this.path;
            this.tableSchemaPath = this.baseFilePath + this.tableSchemaPath;
            this.indexSchemaPath = this.baseFilePath + this.indexSchemaPath;
            this.schemaReplicationPath = this.baseFilePath + this.schemaReplicationPath;
        }
        if (StringUtils.isNotBlank(obsConnectorConfig.getTargetConnectionId())) {
            this.targetConnectionId = obsConnectorConfig.getTargetConnectionId();
        }
        if (StringUtils.isNotBlank(obsConnectorConfig.getJobExecutionId())) {
            this.jobExecutionId = obsConnectorConfig.getJobExecutionId();
        }
        initObsCredentials(obsConnectorConfig);
        if (StringUtils.isNotBlank(obsConnectorConfig.getLobDirectDownload())) {
            lobDownloadType = obsConnectorConfig.getLobDirectDownload().toLowerCase(Locale.ROOT);
            lobDownload = lobDownloadType.equals("obs") || lobDownloadType.equals("hdfs");
        }
        initHdfsCredentials(obsConnectorConfig);
        if (StringUtils.isNotBlank(obsConnectorConfig.getLobPath())) {
            this.isLobPathEnabled = true;
            this.lobPath = obsConnectorConfig.getLobPath();
        }
        if (StringUtils.isNotBlank(obsConnectorConfig.getCacheSize())) {
            try {
                this.cacheRefreshRate = Integer.parseInt(obsConnectorConfig.getCacheSize());
            } catch (NumberFormatException e) {
                log.error("Error while parsing \"cache.size\" value.  Please provide with proper value");
                throw new ConnectException("Error while parsing \"cache.size\" value.  Please provide with proper value", e);
            }
        }
        initObsProperties(obsConnectorConfig);
        initializeMode(obsConnectorConfig);
        this.targetTaskId = obsConnectorConfig.getConnectorName() + "_" + map.get("task.id");
    }

    private void constructObsSinkBasePath() {
        if (!this.baseFilePath.endsWith(File.separator)) {
            this.baseFilePath += File.separator;
        }
        this.baseFilePath += OBS_SINK_BASE_DIR;
    }

    private void initObsProperties(ObsConnectorConfig obsConnectorConfig) {
        try {
            ObsConfiguration obsConfiguration = new ObsConfiguration();
            obsConfiguration.setSocketTimeout(30000);
            obsConfiguration.setConnectionTimeout(10000);
            obsConfiguration.setEndPoint(endPoint);
            ConfigUtil.setLog4j2XMLPath(this);
            obsClient = new ObsClient(obsConnectorConfig.getObsAuthenticationKey(), obsConnectorConfig.getObsSecretKey(), obsConfiguration);
        } catch (ObsException e) {
            log.error("HTTP Code: " + e.getResponseCode());
            log.error("Error Code:" + e.getErrorCode());
            log.error("Error Message: " + e.getErrorMessage());
            log.error("Request ID:" + e.getErrorRequestId());
            log.error("Host ID:" + e.getErrorHostId());
        }
        if (lobDownload && lobDownloadType.equals("obs")) {
            String sourceObsEndpoint = obsConnectorConfig.getSourceObsEndpoint();
            if (obsConnectorConfig.getSourceObsAuthenticationKey() == null || obsConnectorConfig.getSourceObsSecretKey() == null || sourceObsEndpoint == null) {
                return;
            }
            try {
                ObsConfiguration obsConfiguration2 = new ObsConfiguration();
                obsConfiguration2.setSocketTimeout(30000);
                obsConfiguration2.setConnectionTimeout(10000);
                obsConfiguration2.setEndPoint(sourceObsEndpoint);
                ConfigUtil.setLog4j2XMLPath(this);
                sourceObsClient = new ObsClient(obsConnectorConfig.getSourceObsAuthenticationKey(), obsConnectorConfig.getSourceObsSecretKey(), obsConfiguration2);
            } catch (ObsException e2) {
                log.error("HTTP Code: " + e2.getResponseCode());
                log.error("Error Code:" + e2.getErrorCode());
                log.error("Error Message: " + e2.getErrorMessage());
                log.error("Request ID:" + e2.getErrorRequestId());
                log.error("Host ID:" + e2.getErrorHostId());
            }
        }
    }

    private void initializeMode(ObsConnectorConfig obsConnectorConfig) {
        for (String str : obsConnectorConfig.getMODE().split(",")) {
            if (str.trim().equalsIgnoreCase("insert")) {
                this.insertSupported = true;
            } else if (str.trim().equalsIgnoreCase("update")) {
                this.updateSupported = true;
            } else if (str.trim().equalsIgnoreCase("delete")) {
                this.deleteSupported = true;
            }
        }
    }

    public void initHdfsCredentials(ObsConnectorConfig obsConnectorConfig) {
        if (lobDownload && lobDownloadType.equals("hdfs")) {
            boolean equalsIgnoreCase = "kerberos".equalsIgnoreCase(obsConnectorConfig.getHdfsAuthType());
            try {
                AuthUtil.getAuthenticator(obsConnectorConfig.getHdfsAuthPrincipal(), obsConnectorConfig.getHdfsAuthKeytabfile(), equalsIgnoreCase);
                try {
                    this.fs = FileSystem.get(HAClusterUtil.getConf(equalsIgnoreCase));
                } catch (IOException e) {
                    log.error("error while connecting to hdfs filesystem with following message " + e.getMessage());
                }
            } catch (IOException e2) {
                if (e2.getMessage().contains("Auth Error")) {
                    log.error("Error while Authenticating Kerberos Credentials.  Failed with error: " + e2.toString());
                }
                throw new ConnectException("Kerberos Authentication Failed " + e2);
            }
        }
    }

    private void initObsCredentials(ObsConnectorConfig obsConnectorConfig) {
        if (obsConnectorConfig.getObsAuthenticationKey().equals("") || obsConnectorConfig.getObsAuthenticationKey().equals("null")) {
            log.error("Error in Consumer Config: access key should not be empty or null");
            throw new ConnectException("Error in Consumer Config: access key should not be empty or null");
        }
        if (obsConnectorConfig.getObsSecretKey().equals("") || obsConnectorConfig.getObsSecretKey().equals("null")) {
            log.error("Error in Consumer Config: source key should not be empty or null");
            throw new ConnectException("Error in Consumer Config: source key should not be empty or null");
        }
        endPoint = obsConnectorConfig.getObsEndpoint();
        bucketName = obsConnectorConfig.getObsBucket();
    }

    public void write(Collection<SinkRecord> collection, ConnectorClient connectorClient) {
        this.processedRecords = 0;
        for (SinkRecord sinkRecord : collection) {
            if (this.currentOffset == -1) {
                log.info("The start offset: {}", Long.valueOf(sinkRecord.kafkaOffset()));
            }
            this.currentOffset = sinkRecord.kafkaOffset();
            if (sinkRecord.valueSchema().type() == Schema.Type.STRUCT) {
                Struct struct = (Struct) sinkRecord.value();
                this.messageFormat = sinkRecordParser.getMessageFormat();
                this.isOggType = sinkRecordParser.getOggFormat(this.messageFormat);
                if (this.isOggType.booleanValue() || struct.toString().contains("TIMESTAMP") || !struct.getString("DATA_STORE").equalsIgnoreCase("ORACLE")) {
                    this.tableNameString = sinkRecordParser.getTableField(this.isOggType);
                    this.operationTimestampString = sinkRecordParser.getOperationTimestampField(this.isOggType);
                    this.afterDataString = sinkRecordParser.getAfterDataField(this.isOggType);
                    this.uniqueDataString = sinkRecordParser.getUniqueDataField(this.isOggType);
                    this.lobColumnString = sinkRecordParser.getLobColumnField(this.isOggType);
                    sinkRecordParser.setFieldValues(this.tableNameString, this.isOggType);
                    processOperation(struct, sinkRecord);
                    sinkRecordParser.createSinkHeartbeat(connectorClient, struct, this.targetConnectionId, sinkRecordParser.getSourceValue(struct), this.jobExecutionId, this.targetTaskId);
                } else {
                    schemaReplication(sinkRecord);
                }
            }
        }
    }

    public void processOperation(Struct struct, SinkRecord sinkRecord) {
        if (this.messageFormat.equalsIgnoreCase("ogg")) {
            processOggOperation(struct, sinkRecord);
        } else {
            processDefaultOperation(struct, sinkRecord);
        }
    }

    public void processOggOperation(Struct struct, SinkRecord sinkRecord) {
        String string = struct.getString("op_type");
        boolean z = -1;
        switch (string.hashCode()) {
            case -2130463047:
                if (string.equals("INSERT")) {
                    z = false;
                    break;
                }
                break;
            case -1785516855:
                if (string.equals("UPDATE")) {
                    z = 2;
                    break;
                }
                break;
            case 2012838315:
                if (string.equals("DELETE")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (this.insertSupported) {
                    this.processedRecords++;
                    insert(sinkRecord);
                    return;
                }
                return;
            case true:
                if (this.deleteSupported) {
                    this.processedRecords++;
                    delete(sinkRecord);
                    return;
                }
                return;
            case true:
                if (this.updateSupported) {
                    this.processedRecords++;
                    update(sinkRecord);
                    return;
                }
                return;
            default:
                log.info("Operation {} not supported", string);
                return;
        }
    }

    public void processDefaultOperation(Struct struct, SinkRecord sinkRecord) {
        String string = struct.getString("OPERATION");
        boolean z = -1;
        switch (string.hashCode()) {
            case -2130463047:
                if (string.equals("INSERT")) {
                    z = false;
                    break;
                }
                break;
            case -1844901079:
                if (string.equals("ALTER_INDEX")) {
                    z = 7;
                    break;
                }
                break;
            case -1835131355:
                if (string.equals("ALTER_TABLE")) {
                    z = 4;
                    break;
                }
                break;
            case -1785516855:
                if (string.equals("UPDATE")) {
                    z = 2;
                    break;
                }
                break;
            case -1641903262:
                if (string.equals("DROP_INDEX")) {
                    z = 8;
                    break;
                }
                break;
            case -1632133538:
                if (string.equals("DROP_TABLE")) {
                    z = 5;
                    break;
                }
                break;
            case 807500719:
                if (string.equals("CREATE_INDEX")) {
                    z = 6;
                    break;
                }
                break;
            case 817270443:
                if (string.equals("CREATE_TABLE")) {
                    z = MAX_RETRIES;
                    break;
                }
                break;
            case 2012838315:
                if (string.equals("DELETE")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (this.insertSupported) {
                    this.processedRecords++;
                    insert(sinkRecord);
                    return;
                }
                return;
            case true:
                if (this.deleteSupported) {
                    this.processedRecords++;
                    delete(sinkRecord);
                    return;
                }
                return;
            case true:
                if (this.updateSupported) {
                    this.processedRecords++;
                    update(sinkRecord);
                    return;
                }
                return;
            case MAX_RETRIES /* 3 */:
            case true:
            case true:
                this.processedRecords++;
                ddlData(sinkRecord, this.tableSchemaPath);
                return;
            case true:
            case true:
            case true:
                this.processedRecords++;
                ddlData(sinkRecord, this.indexSchemaPath);
                return;
            default:
                log.info("Operation {} not supported", string);
                return;
        }
    }

    public long getCurrentOffset() {
        return this.currentOffset;
    }

    private String getPath(String str, Struct struct, boolean z) {
        String str2;
        if (z && this.isLobPathEnabled) {
            return this.lobPath + str;
        }
        if (this.objectPathMap.containsKey(str)) {
            str2 = this.objectPathMap.get(str);
        } else {
            String preprocess = new VariablePreprocess().preprocess(this.path, struct, false);
            log.info("New file: " + preprocess);
            this.objectPathMap.put(str, preprocess);
            str2 = preprocess;
        }
        return str2;
    }

    private String getDDLPath(String str, Struct struct, boolean z, String str2) {
        String preprocess = new VariablePreprocess().preprocess(str2, struct, false);
        log.info("New file: " + preprocess);
        return preprocess;
    }

    private void uploadData(String str, AppendObjectRequest appendObjectRequest, String str2) {
        synchronized (ObsWriter.class) {
            if (obsClient.doesObjectExist(bucketName, str)) {
                appendObjectRequest.setPosition(obsClient.getObjectMetadata(bucketName, str).getNextPosition());
            } else {
                appendObjectRequest.setPosition(0L);
            }
            appendObjectRequest.setInput(new ByteArrayInputStream(str2.getBytes()));
            obsClient.appendObject(appendObjectRequest);
        }
        log.info("New file: " + str);
    }

    private void uploadDDLQueries(String str, String str2, String str3, Struct struct, boolean z) {
        int i = 0;
        String dDLPath = getDDLPath(str3, struct, false, str);
        AppendObjectRequest appendObjectRequest = new AppendObjectRequest();
        appendObjectRequest.setBucketName(bucketName);
        appendObjectRequest.setObjectKey(dDLPath);
        do {
            try {
                synchronized (ObsWriter.class) {
                    if (!obsClient.doesObjectExist(bucketName, dDLPath)) {
                        appendObjectRequest.setPosition(0L);
                        appendObjectRequest.setInput(new ByteArrayInputStream((CSVFormat.DEFAULT.format(SinkWriterUtils.getDDLColumnDetails(z)) + "\n").getBytes()));
                        obsClient.appendObject(appendObjectRequest);
                    }
                    appendObjectRequest.setPosition(obsClient.getObjectMetadata(bucketName, dDLPath).getNextPosition());
                    appendObjectRequest.setInput(new ByteArrayInputStream((str2 + "\n").getBytes()));
                    obsClient.appendObject(appendObjectRequest);
                }
                log.info("New file: " + dDLPath);
                return;
            } catch (ObsException e) {
                i++;
            }
        } while (!checkAndLog(e, i));
        throw new ConnectException("Error while uploading data to OBS", e);
    }

    private boolean checkAndLog(ObsException obsException, int i) {
        if (obsException.getResponseCode() == -1 && i <= MAX_RETRIES) {
            log.error("Error while upload Data to OBS with error: " + obsException.toString());
            return false;
        }
        log.error("HTTP Code: " + obsException.getResponseCode());
        log.error("Error Code:" + obsException.getErrorCode());
        log.error("Error Message: " + obsException.getErrorMessage());
        log.error("Request ID:" + obsException.getErrorRequestId());
        log.error("Host ID:" + obsException.getErrorHostId());
        return true;
    }

    private void writeData(String str, Struct struct, String str2, String str3) {
        boolean doesObjectExist;
        int i = 0;
        if (this.tableCache.get(str) == null) {
            this.tableCache.put(str, new ArrayList());
            String path = getPath(str, struct, false);
            if (this.unCommitFiles.get(str) == null) {
                synchronized (ObsWriter.class) {
                    doesObjectExist = obsClient.doesObjectExist(bucketName, path);
                }
                if (!doesObjectExist) {
                    String format = CSVFormat.DEFAULT.format(SinkWriterUtils.getColumnDetails(struct, str3, this.isOggType.booleanValue()));
                    List<String> list = this.tableCache.get(str);
                    list.add(format + "\n");
                    this.tableCache.put(str, list);
                }
                this.unCommitFiles.put(str, struct.getString(this.tableNameString));
            }
        }
        if (this.tableCache.get(str).size() < this.cacheRefreshRate) {
            List<String> list2 = this.tableCache.get(str);
            list2.add(str2 + "\n");
            this.tableCache.put(str, list2);
            return;
        }
        String str4 = Joiner.on("").join(this.tableCache.get(str)) + str2 + "\n";
        this.tableCache.remove(str);
        String path2 = getPath(str, struct, false);
        log.info("Cache Limit Reached. Writing Data from Table Cache to OBS. TablePath: " + path2);
        AppendObjectRequest appendObjectRequest = new AppendObjectRequest();
        appendObjectRequest.setBucketName(bucketName);
        appendObjectRequest.setObjectKey(path2);
        do {
            try {
                uploadData(path2, appendObjectRequest, str4);
                return;
            } catch (ObsException e) {
                i++;
            }
        } while (!checkAndLog(e, i));
        throw new ConnectException("Error while uploading data to OBS", e);
    }

    private void writeLOBData(Field field, String str, InputStream inputStream) {
        int i = 0;
        OBSStroageImpl oBSStroageImpl = new OBSStroageImpl();
        do {
            try {
                System.out.println("Step 1: initiate multipart upload \n");
                InitiateMultipartUploadRequest initiateMultipartUploadRequest = new InitiateMultipartUploadRequest();
                initiateMultipartUploadRequest.setBucketName(bucketName);
                initiateMultipartUploadRequest.setObjectKey(str);
                InitiateMultipartUploadResult initiateMultipartUpload = obsClient.initiateMultipartUpload(initiateMultipartUploadRequest);
                System.out.println("Step 2: upload part \n");
                UploadPartResult uploadPart = obsClient.uploadPart(bucketName, str, initiateMultipartUpload.getUploadId(), 1, inputStream);
                System.out.println("Step 3: complete multipart upload \n");
                obsClient.completeMultipartUpload(oBSStroageImpl.prepareMultiPartUpload(bucketName, str, initiateMultipartUpload, uploadPart));
                return;
            } catch (ObsException e) {
                i++;
            }
        } while (!checkAndLog(e, i));
        throw new ConnectException("Error while uploading data to OBS", e);
    }

    private void insert(SinkRecord sinkRecord) {
        Struct struct = (Struct) sinkRecord.value();
        String transactionIdentifier = sinkRecordParser.getTransactionIdentifier(struct);
        Struct struct2 = struct.getStruct(sinkRecordParser.getAfterDataField(this.isOggType));
        Struct struct3 = struct.getStruct(sinkRecordParser.getUniqueDataField(this.isOggType));
        writeData(sinkRecordParser.getTableName(struct), struct, CSVFormat.DEFAULT.format(getDataValueRow(struct2, struct, SinkWriterUtils.getUniqueValues(struct3, struct3.schema()), transactionIdentifier, sinkRecordParser.getOperation(struct), "after")), this.afterDataString);
    }

    private void delete(SinkRecord sinkRecord) {
        Struct struct = (Struct) sinkRecord.value();
        String transactionIdentifier = sinkRecordParser.getTransactionIdentifier(struct);
        Struct struct2 = struct.getStruct("before");
        Struct struct3 = struct.getStruct(this.uniqueDataString);
        writeData(sinkRecordParser.getTableName(struct), struct, CSVFormat.DEFAULT.format(getDataValueRow(struct2, struct, SinkWriterUtils.getUniqueValues(struct3, struct3.schema()), transactionIdentifier, sinkRecordParser.getOperation(struct), "before")), "before");
    }

    protected void update(SinkRecord sinkRecord) {
        Struct struct = (Struct) sinkRecord.value();
        String transactionIdentifier = sinkRecordParser.getTransactionIdentifier(struct);
        Struct struct2 = struct.getStruct("before");
        Struct struct3 = struct.getStruct(this.uniqueDataString);
        String uniqueValues = SinkWriterUtils.getUniqueValues(struct3, struct3.schema());
        String format = CSVFormat.DEFAULT.format(getDataValueRow(struct2, struct, uniqueValues, transactionIdentifier, "UPDATE", "before"));
        String format2 = CSVFormat.DEFAULT.format(getDataValueRow(struct.getStruct(sinkRecordParser.getAfterDataField(this.isOggType)), struct, uniqueValues, transactionIdentifier, "INSERT", "after"));
        writeData(sinkRecordParser.getTableName(struct), struct, format, "before");
        writeData(sinkRecordParser.getTableName(struct), struct, format2, this.afterDataString);
    }

    public Pair<Integer, Boolean> handleLobDataWrite(Struct struct, Field field, Struct struct2, String str, String str2, Object[] objArr, int i) {
        String replace = struct2.getString(this.tableNameString).replace(",", "_");
        if (lobDownload && lobDownloadType.equals("obs")) {
            String string = struct.get(field.name()) != null ? struct.getString(field.name()) : null;
            if (StringUtils.isNotBlank(string)) {
                String[] split = string.split("#", 2);
                if (split.length > 1) {
                    try {
                        InputStream objectContent = sourceObsClient.getObject(split[0], split[1]).getObjectContent();
                        Throwable th = null;
                        try {
                            String lobFileName = getLobFileName(getPath(replace, struct2, true), field, str, str2);
                            writeLOBData(field, lobFileName, objectContent);
                            objArr[i] = lobFileName;
                            if (objectContent != null) {
                                if (0 != 0) {
                                    try {
                                        objectContent.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    objectContent.close();
                                }
                            }
                        } finally {
                        }
                    } catch (IOException e) {
                        log.error("Error initializing or closing resources", e);
                    }
                }
            }
        } else if (lobDownload && lobDownloadType.equals("hdfs")) {
            String string2 = struct.get(field.name()) != null ? struct.getString(field.name()) : null;
            if (StringUtils.isNotBlank(string2)) {
                try {
                    FSDataInputStream open = this.fs.open(new Path(string2 + CSV_FORMAT));
                    String lobFileName2 = getLobFileName(getPath(replace, struct2, true), field, str, str2);
                    writeLOBData(field, lobFileName2, open);
                    objArr[i] = lobFileName2;
                } catch (IOException e2) {
                    log.error("error while opening hdfs file " + e2.getMessage());
                }
            }
        } else {
            i = ((Integer) lobDataWriteForBytesAndStringType(field, struct, objArr, replace, struct2, str, new Pair<>(Integer.valueOf(i), str2)).getKey()).intValue();
        }
        return new Pair<>(Integer.valueOf(i), false);
    }

    private Pair<Integer, Boolean> lobDataWriteForBytesAndStringType(Field field, Struct struct, Object[] objArr, String str, Struct struct2, String str2, Pair<Integer, String> pair) {
        int intValue = ((Integer) pair.getKey()).intValue();
        String str3 = (String) pair.getValue();
        if (field.schema().type() == Schema.Type.STRING) {
            String string = struct.getString(field.name());
            if (string == null || string.isEmpty()) {
                objArr[intValue] = null;
                return new Pair<>(Integer.valueOf(intValue), true);
            }
            byte[] bytes = string.getBytes();
            String lobFileName = getLobFileName(getPath(str, struct2, true), field, str2, str3);
            writeLOBData(field, lobFileName, new ByteArrayInputStream(bytes));
            objArr[intValue] = lobFileName;
        } else if (field.schema().type() == Schema.Type.BYTES) {
            byte[] bArr = (byte[]) struct.get(field);
            if (bArr == null || bArr.length == 0) {
                objArr[intValue] = null;
                return new Pair<>(Integer.valueOf(intValue), true);
            }
            String lobFileName2 = getLobFileName(getPath(str, struct2, true), field, str2, str3);
            writeLOBData(field, lobFileName2, new ByteArrayInputStream(bArr));
            objArr[intValue] = lobFileName2;
        }
        return new Pair<>(Integer.valueOf(intValue), false);
    }

    private Object[] getDataValueRow(Struct struct, Struct struct2, String str, String str2, String str3, String str4) {
        String sourceTableField = sinkRecordParser.getSourceTableField(struct2, sinkRecordParser.getTableName(struct2));
        String format = this.dataFormat.format((Date) struct2.get(this.operationTimestampString));
        String format2 = this.dataFormat.format(new Date());
        String string = struct2.getString(this.lobColumnString);
        HashSet hashSet = string != null ? new HashSet(Arrays.asList(string.split(","))) : null;
        Object[] objectRow = sinkRecordParser.getObjectRow(struct.schema());
        int i = 0;
        for (Field field : struct.schema().fields()) {
            if (hashSet != null && hashSet.contains(field.name())) {
                Pair<Integer, Boolean> handleLobDataWrite = handleLobDataWrite(struct, field, struct2, str, str2, objectRow, i);
                i = ((Integer) handleLobDataWrite.getKey()).intValue();
                if (((Boolean) handleLobDataWrite.getValue()).booleanValue()) {
                }
            } else if ("io.debezium.data.VariableScaleDecimal".equals(field.schema().name())) {
                objectRow[i] = CommonUtil.decodeEncodedNumericOrDecimalValues(struct, field);
            } else {
                objectRow[i] = struct.get(field);
            }
            i++;
        }
        int i2 = i;
        int i3 = i + 1;
        objectRow[i2] = sourceTableField;
        int i4 = i3 + 1;
        objectRow[i3] = str3;
        int i5 = i4 + 1;
        objectRow[i4] = format;
        int i6 = i5 + 1;
        objectRow[i5] = format2;
        int i7 = i6 + 1;
        objectRow[i6] = str4;
        if (!this.isOggType.booleanValue()) {
            int i8 = i7 + 1;
            objectRow[i7] = str2;
        }
        return objectRow;
    }

    public void schemaReplication(SinkRecord sinkRecord) {
        Struct struct = (Struct) sinkRecord.value();
        String replace = struct.getString("TABLE_NAME").replace(",", "_");
        String string = struct.getString("SEG_OWNER");
        String string2 = struct.getString("OPERATION");
        String replaceFirst = struct.getString("SQL_REDO").replaceFirst("\n", "");
        Struct struct2 = struct.getStruct("data");
        Object[] objArr = new Object[5];
        int i = 0 + 1;
        objArr[0] = replaceFirst;
        int i2 = i + 1;
        objArr[i] = string;
        int i3 = i2 + 1;
        objArr[i2] = replace;
        objArr[i3] = string2;
        objArr[i3 + 1] = struct2;
        uploadDDLQueries(this.schemaReplicationPath, CSVFormat.DEFAULT.format(objArr), replace, struct, false);
    }

    protected void ddlData(SinkRecord sinkRecord, String str) {
        Struct struct = (Struct) sinkRecord.value();
        String replace = struct.getString("OBJECT_NAME").replace(",", "_");
        String string = struct.getString("SEG_OWNER");
        String string2 = struct.getString("OPERATION");
        String transactionIdentifier = sinkRecordParser.getTransactionIdentifier(struct);
        String string3 = struct.getString("SQL_REDO");
        Struct struct2 = struct.getStruct("data");
        Object[] objArr = new Object[6];
        int i = 0 + 1;
        objArr[0] = string3;
        int i2 = i + 1;
        objArr[i] = string;
        int i3 = i2 + 1;
        objArr[i2] = replace;
        int i4 = i3 + 1;
        objArr[i3] = string2;
        objArr[i4] = transactionIdentifier;
        objArr[i4 + 1] = struct2;
        uploadDDLQueries(str, CSVFormat.DEFAULT.format(objArr), replace, struct, true);
    }

    private String buildMessage(String str, String str2) {
        JsonObject jsonObject = new JsonObject();
        jsonObject.addProperty("table", str);
        jsonObject.addProperty(ObsConnectorConfig.OBS_PATH, "s3a://" + bucketName + "/" + str2);
        return jsonObject.toString();
    }

    public void close() {
        log.info("closing");
    }

    public void flush(INotification iNotification, String str) {
        int i = 0;
        for (Map.Entry<String, String> entry : this.objectPathMap.entrySet()) {
            String key = entry.getKey();
            String value = entry.getValue();
            if (this.tableCache.get(key) != null && this.tableCache.get(key).size() > 0) {
                log.info("Writing Data from Table Cache to OBS during flush. TablePath: " + value);
                AppendObjectRequest appendObjectRequest = new AppendObjectRequest();
                appendObjectRequest.setBucketName(bucketName);
                appendObjectRequest.setObjectKey(value);
                do {
                    try {
                        uploadData(value, appendObjectRequest, Joiner.on("").join(this.tableCache.get(key)));
                    } catch (ObsException e) {
                        i++;
                    }
                } while (!checkAndLog(e, i));
                throw new ConnectException("Error while uploading data to OBS", e);
            }
            if (iNotification != null) {
                String buildMessage = buildMessage(key, value);
                log.info("Send data {} to channel {}", buildMessage, str);
                iNotification.sendData(str, buildMessage);
            }
            this.unCommitFiles.remove(entry.getKey());
            this.tableCache.remove(entry.getKey());
        }
        this.objectPathMap.clear();
    }

    private String getLobFileName(String str, Field field, String str2, String str3) {
        return str.concat("_" + field.name() + str2 + "_" + str3);
    }

    public int getProcessedRecords() {
        return this.processedRecords;
    }
}
