package com.huawei.cdc.connect.file.writer;

import com.google.common.base.Joiner;
import com.google.gson.JsonObject;
import com.huawei.cdc.common.VariablePreprocess;
import com.huawei.cdc.common.auth.hdfs.AuthUtil;
import com.huawei.cdc.common.auth.hdfs.HAClusterUtil;
import com.huawei.cdc.common.conf.CommonConfiguration;
import com.huawei.cdc.common.conf.ConfigUtil;
import com.huawei.cdc.common.metadata.client.ConnectorClient;
import com.huawei.cdc.common.sink.parser.SinkRecordParser;
import com.huawei.cdc.common.util.CommonUtil;
import com.huawei.cdc.common.util.SinkWriterUtils;
import com.huawei.cdc.connect.file.config.FileConnectorConfig;
import com.huawei.cdc.notification.INotification;
import com.obs.services.ObsClient;
import com.obs.services.ObsConfiguration;
import com.obs.services.exception.ObsException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import javafx.util.Pair;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/cdc/connect/file/writer/FileWriter.class */
public class FileWriter {
    private static ObsClient sourceObsClient;
    private static final int MAX_RETRIES = 3;
    public static final String CSV_FORMAT = ".csv";
    private String lobPath;
    private boolean insertSupported;
    private boolean updateSupported;
    private boolean deleteSupported;
    private String messageFormat;
    private boolean isOggType;
    private String tableNameString;
    private String operationTimestampString;
    private String afterDataString;
    private String uniqueDataString;
    private String lobColumnString;
    private String targetConnectionId;
    private static SinkRecordParser sinkRecordParser;
    private String baseFilePath;
    private FileAttribute<Set<PosixFilePermission>> fileAttributes;
    private String lobDownloadType;
    private FileSystem fs;
    private int processedRecords;
    private String jobExecutionId;
    private String targetTaskId;
    private static final Logger log = LoggerFactory.getLogger(FileWriter.class);
    private static boolean lobDownload = false;
    private static final String FILE_SINK_BASE_DIR = "CDL" + File.separator;
    private int cacheRefreshRate = 500;
    private String path = "${datastore}/schemas/${schemaName}/tables/${tableName}/data/${changeTime(yyyyMMddHHmmss)}/${tableName}.csv";
    private String tableSchemaPath = "${datastore}/schemas/${schemaName}/tables/${tableName}/metadata/${changeTime(yyyyMMddHHmmss)}/${tableName}.csv";
    private String indexSchemaPath = "${datastore}/schemas/${schemaName}/tables/${tableName}/metadata/${changeTime(yyyyMMddHHmmss)}/${tableName}_index.csv";
    private String schemaReplicationPath = "${datastore}/schemas/${schemaName}/metadata/${schemaName}.csv";
    private boolean isLobPathEnabled = false;
    private Map<String, PrintStream> streamMap = new HashMap();
    private SimpleDateFormat dataFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    private long currentOffset = -1;
    private Map<String, String> unCommitFiles = new HashMap();
    private Map<String, List<String>> tableCache = new HashMap();

    public void initialize(Map<String, String> map) throws IOException {
        FileConnectorConfig fileConnectorConfig = new FileConnectorConfig(map);
        sinkRecordParser = new SinkRecordParser();
        this.baseFilePath = null;
        if (StringUtils.isNotBlank(fileConnectorConfig.getPATH())) {
            this.baseFilePath = fileConnectorConfig.getPATH();
            constructFileSinkBasePath();
        } else if (StringUtils.isNotBlank(CommonConfiguration.SINK_FILE_PATH)) {
            this.baseFilePath = CommonConfiguration.SINK_FILE_PATH;
            constructFileSinkBasePath();
        } else {
            this.baseFilePath = FILE_SINK_BASE_DIR;
        }
        if (this.baseFilePath != null) {
            this.fileAttributes = PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r-x---"));
            changeBaseDirectoryPermission(this.fileAttributes);
            this.path = this.baseFilePath + this.path;
            this.tableSchemaPath = this.baseFilePath + this.tableSchemaPath;
            this.indexSchemaPath = this.baseFilePath + this.indexSchemaPath;
            this.schemaReplicationPath = this.baseFilePath + this.schemaReplicationPath;
        }
        if (StringUtils.isNotBlank(fileConnectorConfig.getTargetConnectionId())) {
            this.targetConnectionId = fileConnectorConfig.getTargetConnectionId();
        }
        if (StringUtils.isNotBlank(fileConnectorConfig.getJobExecutionId())) {
            this.jobExecutionId = fileConnectorConfig.getJobExecutionId();
        }
        if (StringUtils.isNotBlank(fileConnectorConfig.getCacheSize())) {
            this.cacheRefreshRate = Integer.parseInt(fileConnectorConfig.getCacheSize());
        }
        if (StringUtils.isNotBlank(fileConnectorConfig.getLobDirectDownload())) {
            this.lobDownloadType = fileConnectorConfig.getLobDirectDownload().toLowerCase(Locale.ROOT);
            lobDownload = this.lobDownloadType.equals("obs") || this.lobDownloadType.equals("hdfs");
        }
        initializeObsProperties(fileConnectorConfig);
        initHdfsCredentials(fileConnectorConfig);
        if (StringUtils.isNotBlank(fileConnectorConfig.getLobPath())) {
            this.isLobPathEnabled = true;
            this.lobPath = fileConnectorConfig.getLobPath();
        }
        initializeMode(fileConnectorConfig);
        this.targetTaskId = fileConnectorConfig.getConnectorName() + "_" + map.get("task.id");
    }

    private void initializeObsProperties(FileConnectorConfig fileConnectorConfig) {
        if (lobDownload && this.lobDownloadType.equals("obs")) {
            String sourceObsEndpoint = fileConnectorConfig.getSourceObsEndpoint();
            if (fileConnectorConfig.getSourceObsAuthenticationKey() == null || fileConnectorConfig.getSourceObsSecretKey() == null || sourceObsEndpoint == null) {
                return;
            }
            try {
                ObsConfiguration obsConfiguration = new ObsConfiguration();
                obsConfiguration.setSocketTimeout(30000);
                obsConfiguration.setConnectionTimeout(10000);
                obsConfiguration.setEndPoint(sourceObsEndpoint);
                ConfigUtil.setLog4j2XMLPath(this);
                sourceObsClient = new ObsClient(fileConnectorConfig.getSourceObsAuthenticationKey(), fileConnectorConfig.getSourceObsSecretKey(), obsConfiguration);
            } catch (ObsException e) {
                log.error("\nHTTP Code: " + e.getResponseCode() + "\nError Code:" + e.getErrorCode() + "\nError Message: " + e.getErrorMessage() + "\nRequest ID:" + e.getErrorRequestId() + "\nHost ID:" + e.getErrorHostId());
            }
        }
    }

    private void initHdfsCredentials(FileConnectorConfig fileConnectorConfig) {
        if (lobDownload && this.lobDownloadType.equals("hdfs")) {
            boolean equalsIgnoreCase = "kerberos".equalsIgnoreCase(fileConnectorConfig.getHdfsAuthType());
            try {
                AuthUtil.getAuthenticator(fileConnectorConfig.getHdfsAuthPrincipal(), fileConnectorConfig.getHdfsAuthKeytabfile(), equalsIgnoreCase);
                try {
                    this.fs = FileSystem.get(HAClusterUtil.getConf(equalsIgnoreCase));
                } catch (IOException e) {
                    log.error("error while connecting to hdfs filesystem with following message " + e.getMessage());
                }
            } catch (IOException e2) {
                if (e2.getMessage().contains("Auth Error")) {
                    log.error("Error while Authenticating Kerberos Credentials.  Failed with error: " + e2.toString());
                }
                throw new ConnectException("Kerberos Authentication Failed " + e2);
            }
        }
    }

    private void constructFileSinkBasePath() {
        if (!this.baseFilePath.endsWith(File.separator)) {
            this.baseFilePath += File.separator;
        }
        this.baseFilePath += FILE_SINK_BASE_DIR;
    }

    private void initializeMode(FileConnectorConfig fileConnectorConfig) {
        for (String str : fileConnectorConfig.getMODE().split(",")) {
            if (str.trim().equalsIgnoreCase("insert")) {
                this.insertSupported = true;
            } else if (str.trim().equalsIgnoreCase("update")) {
                this.updateSupported = true;
            } else if (str.trim().equalsIgnoreCase("delete")) {
                this.deleteSupported = true;
            }
        }
    }

    public void write(Collection<SinkRecord> collection, ConnectorClient connectorClient) {
        this.processedRecords = 0;
        for (SinkRecord sinkRecord : collection) {
            if (this.currentOffset == -1) {
                log.info("The start offset: {}", Long.valueOf(sinkRecord.kafkaOffset()));
            }
            this.currentOffset = sinkRecord.kafkaOffset();
            if (sinkRecord.valueSchema().type() == Schema.Type.STRUCT) {
                Struct struct = (Struct) sinkRecord.value();
                this.messageFormat = sinkRecordParser.getMessageFormat();
                this.isOggType = sinkRecordParser.getOggFormat(this.messageFormat).booleanValue();
                if (this.isOggType || struct.toString().contains("TIMESTAMP") || !struct.getString("DATA_STORE").equalsIgnoreCase("ORACLE")) {
                    this.tableNameString = sinkRecordParser.getTableField(Boolean.valueOf(this.isOggType));
                    this.operationTimestampString = sinkRecordParser.getOperationTimestampField(Boolean.valueOf(this.isOggType));
                    this.afterDataString = sinkRecordParser.getAfterDataField(Boolean.valueOf(this.isOggType));
                    this.uniqueDataString = sinkRecordParser.getUniqueDataField(Boolean.valueOf(this.isOggType));
                    this.lobColumnString = sinkRecordParser.getLobColumnField(Boolean.valueOf(this.isOggType));
                    sinkRecordParser.setFieldValues(this.tableNameString, Boolean.valueOf(this.isOggType));
                    processOperation(struct, sinkRecord);
                    sinkRecordParser.createSinkHeartbeat(connectorClient, struct, this.targetConnectionId, sinkRecordParser.getSourceValue(struct), this.jobExecutionId, this.targetTaskId);
                } else {
                    schemaReplication(sinkRecord);
                }
            }
        }
    }

    public void processOperation(Struct struct, SinkRecord sinkRecord) {
        if (this.messageFormat.equalsIgnoreCase("ogg")) {
            processOggOperation(struct, sinkRecord);
        } else {
            processDefaultOperation(struct, sinkRecord);
        }
    }

    public void processOggOperation(Struct struct, SinkRecord sinkRecord) {
        String string = struct.getString("op_type");
        boolean z = -1;
        switch (string.hashCode()) {
            case -2130463047:
                if (string.equals("INSERT")) {
                    z = false;
                    break;
                }
                break;
            case -1785516855:
                if (string.equals("UPDATE")) {
                    z = 2;
                    break;
                }
                break;
            case 2012838315:
                if (string.equals("DELETE")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (this.insertSupported) {
                    this.processedRecords++;
                    insert(sinkRecord);
                    return;
                }
                return;
            case true:
                if (this.deleteSupported) {
                    this.processedRecords++;
                    delete(sinkRecord);
                    return;
                }
                return;
            case true:
                if (this.updateSupported) {
                    this.processedRecords++;
                    update(sinkRecord);
                    return;
                }
                return;
            default:
                log.info("Operation {} not supported", string);
                return;
        }
    }

    public void processDefaultOperation(Struct struct, SinkRecord sinkRecord) {
        String string = struct.getString("OPERATION");
        boolean z = -1;
        switch (string.hashCode()) {
            case -2130463047:
                if (string.equals("INSERT")) {
                    z = false;
                    break;
                }
                break;
            case -1844901079:
                if (string.equals("ALTER_INDEX")) {
                    z = 7;
                    break;
                }
                break;
            case -1835131355:
                if (string.equals("ALTER_TABLE")) {
                    z = 4;
                    break;
                }
                break;
            case -1785516855:
                if (string.equals("UPDATE")) {
                    z = 2;
                    break;
                }
                break;
            case -1641903262:
                if (string.equals("DROP_INDEX")) {
                    z = 8;
                    break;
                }
                break;
            case -1632133538:
                if (string.equals("DROP_TABLE")) {
                    z = 5;
                    break;
                }
                break;
            case 807500719:
                if (string.equals("CREATE_INDEX")) {
                    z = 6;
                    break;
                }
                break;
            case 817270443:
                if (string.equals("CREATE_TABLE")) {
                    z = MAX_RETRIES;
                    break;
                }
                break;
            case 2012838315:
                if (string.equals("DELETE")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (this.insertSupported) {
                    this.processedRecords++;
                    insert(sinkRecord);
                    return;
                }
                return;
            case true:
                if (this.deleteSupported) {
                    this.processedRecords++;
                    delete(sinkRecord);
                    return;
                }
                return;
            case true:
                if (this.updateSupported) {
                    this.processedRecords++;
                    update(sinkRecord);
                    return;
                }
                return;
            case MAX_RETRIES /* 3 */:
            case true:
            case true:
                this.processedRecords++;
                ddlData(sinkRecord, this.tableSchemaPath);
                return;
            case true:
            case true:
            case true:
                this.processedRecords++;
                ddlData(sinkRecord, this.indexSchemaPath);
                return;
            default:
                log.info("Operation {} not supported", string);
                return;
        }
    }

    public long getCurrentOffset() {
        return this.currentOffset;
    }

    private PrintStream getBlobWriter(String str) {
        try {
            Path path = Paths.get(str, new String[0]);
            if (Files.notExists(path, new LinkOption[0])) {
                Files.createDirectories(path.getParent(), this.fileAttributes);
            }
            return new PrintStream(Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.APPEND), false, StandardCharsets.UTF_8.name());
        } catch (IOException e) {
            throw new ConnectException("Error while opening File stream", e);
        }
    }

    private String writeLobData(String str, Field field, Struct struct, InputStream inputStream, String str2, String str3) {
        Throwable th;
        int i = 0;
        String concat = this.isLobPathEnabled ? this.lobPath.concat(str + "_" + field.name() + str2 + "_" + str3) : SinkWriterUtils.getLobFileName(str, struct, field, str2, str3, this.unCommitFiles, this.path);
        while (true) {
            try {
                PrintStream blobWriter = getBlobWriter(concat);
                Throwable th2 = null;
                try {
                    try {
                        blobWriter.println(IOUtils.toString(inputStream));
                        if (blobWriter != null) {
                            if (0 == 0) {
                                blobWriter.close();
                                break;
                            }
                            try {
                                blobWriter.close();
                                break;
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            break;
                        }
                    } catch (Throwable th4) {
                        th2 = th4;
                        throw th4;
                        break;
                    }
                } finally {
                    if (blobWriter == null) {
                        break;
                    }
                    if (th == null) {
                        break;
                    }
                    try {
                        break;
                    } catch (Throwable th5) {
                    }
                }
            } catch (Exception e) {
                i++;
                SinkWriterUtils.logConnectionError(e, log, i, MAX_RETRIES);
            }
        }
        return null;
    }

    private String writeClobData(String str, Field field, Struct struct, String str2, String str3, String str4) {
        Throwable th;
        int i = 0;
        String concat = this.isLobPathEnabled ? this.lobPath.concat(str + "_" + field.name() + str3 + "_" + str4) : SinkWriterUtils.getLobFileName(str, struct, field, str3, str4, this.unCommitFiles, this.path);
        while (true) {
            try {
                PrintStream blobWriter = getBlobWriter(concat);
                Throwable th2 = null;
                try {
                    try {
                        blobWriter.println(str2);
                        if (blobWriter != null) {
                            if (0 == 0) {
                                blobWriter.close();
                                break;
                            }
                            try {
                                blobWriter.close();
                                break;
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            break;
                        }
                    } catch (Throwable th4) {
                        th2 = th4;
                        throw th4;
                        break;
                    }
                } finally {
                    if (blobWriter == null) {
                        break;
                    }
                    if (th == null) {
                        break;
                    }
                    try {
                        break;
                    } catch (Throwable th5) {
                    }
                }
            } catch (Exception e) {
                i++;
                SinkWriterUtils.logConnectionError(e, log, i, MAX_RETRIES);
            }
        }
        return null;
    }

    private String writeBlobData(String str, Field field, Struct struct, byte[] bArr, String str2, String str3) {
        Throwable th;
        int i = 0;
        String concat = this.isLobPathEnabled ? this.lobPath.concat(str + "_" + field.name() + str2 + "_" + str3) : SinkWriterUtils.getLobFileName(str, struct, field, str2, str3, this.unCommitFiles, this.path);
        while (true) {
            try {
                PrintStream blobWriter = getBlobWriter(concat);
                Throwable th2 = null;
                try {
                    try {
                        blobWriter.println(Arrays.toString(bArr));
                        if (blobWriter != null) {
                            if (0 == 0) {
                                blobWriter.close();
                                break;
                            }
                            try {
                                blobWriter.close();
                                break;
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            break;
                        }
                    } catch (Throwable th4) {
                        th2 = th4;
                        throw th4;
                        break;
                    }
                } finally {
                    if (blobWriter == null) {
                        break;
                    }
                    if (th == null) {
                        break;
                    }
                    try {
                        break;
                    } catch (Throwable th5) {
                    }
                }
            } catch (Exception e) {
                i++;
                SinkWriterUtils.logConnectionError(e, log, i, MAX_RETRIES);
            }
        }
        return null;
    }

    private boolean checkIfFileExists(String str, Struct struct) {
        try {
            return Files.exists(Paths.get(new VariablePreprocess().preprocess(str, struct, true), new String[0]), new LinkOption[0]);
        } catch (Exception e) {
            throw new ConnectException("Error while checking File stream", e);
        }
    }

    private PrintStream getWriter(String str, Struct struct, String str2) {
        PrintStream printStream;
        if (this.streamMap.containsKey(str)) {
            printStream = this.streamMap.get(str);
        } else {
            String preprocess = new VariablePreprocess().preprocess(this.path, struct, true);
            try {
                Path path = Paths.get(preprocess, new String[0]);
                if (Files.notExists(path, new LinkOption[0])) {
                    Files.createDirectories(path.getParent(), this.fileAttributes);
                    String format = CSVFormat.DEFAULT.format(SinkWriterUtils.getColumnDetails(struct, str2, this.isOggType));
                    List<String> list = this.tableCache.get(str);
                    list.add(format + "\n");
                    this.tableCache.put(str, list);
                }
                printStream = new PrintStream(Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.APPEND), false, StandardCharsets.UTF_8.name());
                log.info("New file: " + preprocess);
                this.streamMap.put(str, printStream);
                this.unCommitFiles.put(str, preprocess);
            } catch (Exception e) {
                throw new ConnectException("Error while opening File stream", e);
            }
        }
        return printStream;
    }

    private void writeData(String str, String str2, Struct struct, String str3) {
        Throwable th;
        int i = 0;
        if (this.tableCache.get(str) == null) {
            this.tableCache.put(str, new ArrayList());
            if (this.unCommitFiles.get(str) == null) {
                getWriter(str, struct, str3);
            }
        }
        if (this.tableCache.get(str).size() < this.cacheRefreshRate) {
            SinkWriterUtils.getFromCacheAndAppendData(this.tableCache, str, str2);
            return;
        }
        String join = Joiner.on("").join(this.tableCache.get(str));
        this.tableCache.remove(str);
        while (true) {
            try {
                PrintStream writer = getWriter(str, struct, str3);
                Throwable th2 = null;
                try {
                    try {
                        writer.print(join + "\n" + str2);
                        if (writer != null) {
                            if (0 == 0) {
                                writer.close();
                                break;
                            }
                            try {
                                writer.close();
                                break;
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            break;
                        }
                    } catch (Throwable th4) {
                        th2 = th4;
                        throw th4;
                        break;
                    }
                } finally {
                    if (writer == null) {
                        break;
                    } else if (th == null) {
                        break;
                    } else {
                        try {
                            break;
                        } catch (Throwable th5) {
                        }
                    }
                }
            } catch (Exception e) {
                i++;
                SinkWriterUtils.logConnectionError(e, log, i, MAX_RETRIES);
            }
        }
    }

    private PrintStream getDDLWriter(String str, Struct struct) {
        String preprocess = new VariablePreprocess().preprocess(str, struct, true);
        try {
            Path path = Paths.get(preprocess, new String[0]);
            if (Files.notExists(path, new LinkOption[0])) {
                Files.createDirectories(path.getParent(), this.fileAttributes);
            }
            PrintStream printStream = new PrintStream(Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.APPEND), false, StandardCharsets.UTF_8.name());
            log.info("New file: " + preprocess);
            return printStream;
        } catch (Exception e) {
            throw new ConnectException("Error while opening File stream", e);
        }
    }

    private void writeDDLData(PrintStream printStream, String str) {
        int i = 0;
        while (true) {
            try {
                printStream.println(str);
                printStream.close();
                return;
            } catch (Exception e) {
                i++;
                SinkWriterUtils.logConnectionError(e, log, i, MAX_RETRIES);
            }
        }
    }

    private void insert(SinkRecord sinkRecord) {
        Struct struct = (Struct) sinkRecord.value();
        String transactionIdentifier = sinkRecordParser.getTransactionIdentifier(struct);
        Struct struct2 = struct.getStruct(sinkRecordParser.getAfterDataField(Boolean.valueOf(this.isOggType)));
        Struct struct3 = struct.getStruct(sinkRecordParser.getUniqueDataField(Boolean.valueOf(this.isOggType)));
        writeData(sinkRecordParser.getTableName(struct), CSVFormat.DEFAULT.format(getDataValueRow(struct2, struct, SinkWriterUtils.getUniqueValues(struct3, struct3.schema()), transactionIdentifier, sinkRecordParser.getOperation(struct), "after")), struct, this.afterDataString);
    }

    private void delete(SinkRecord sinkRecord) {
        Struct struct = (Struct) sinkRecord.value();
        String transactionIdentifier = sinkRecordParser.getTransactionIdentifier(struct);
        Struct struct2 = struct.getStruct("before");
        Struct struct3 = struct.getStruct(this.uniqueDataString);
        writeData(sinkRecordParser.getTableName(struct), CSVFormat.DEFAULT.format(getDataValueRow(struct2, struct, SinkWriterUtils.getUniqueValues(struct3, struct3.schema()), transactionIdentifier, sinkRecordParser.getOperation(struct), "before")), struct, "before");
    }

    protected void update(SinkRecord sinkRecord) {
        Struct struct = (Struct) sinkRecord.value();
        String transactionIdentifier = sinkRecordParser.getTransactionIdentifier(struct);
        Struct struct2 = struct.getStruct("before");
        Struct struct3 = struct.getStruct(this.uniqueDataString);
        String uniqueValues = SinkWriterUtils.getUniqueValues(struct3, struct3.schema());
        String format = CSVFormat.DEFAULT.format(getDataValueRow(struct2, struct, uniqueValues, transactionIdentifier, "UPDATE", "before"));
        String format2 = CSVFormat.DEFAULT.format(getDataValueRow(struct.getStruct(sinkRecordParser.getAfterDataField(Boolean.valueOf(this.isOggType))), struct, uniqueValues, transactionIdentifier, "INSERT", "after"));
        writeData(sinkRecordParser.getTableName(struct), format, struct, "before");
        writeData(sinkRecordParser.getTableName(struct), format2, struct, this.afterDataString);
    }

    private Object[] getDataValueRow(Struct struct, Struct struct2, String str, String str2, String str3, String str4) {
        String sourceTableField = sinkRecordParser.getSourceTableField(struct2, sinkRecordParser.getTableName(struct2));
        String format = this.dataFormat.format((Date) struct2.get(this.operationTimestampString));
        String format2 = this.dataFormat.format(new Date());
        String string = struct2.getString(this.lobColumnString);
        HashSet hashSet = string != null ? new HashSet(Arrays.asList(string.split(","))) : null;
        Object[] objectRow = sinkRecordParser.getObjectRow(struct.schema());
        int i = 0;
        for (Field field : struct.schema().fields()) {
            if (hashSet != null && hashSet.contains(field.name())) {
                Pair<Integer, Boolean> handleLobDataWrite = handleLobDataWrite(struct, field, struct2, str, str2, objectRow, i);
                i = ((Integer) handleLobDataWrite.getKey()).intValue();
                if (((Boolean) handleLobDataWrite.getValue()).booleanValue()) {
                }
            } else if ("io.debezium.data.VariableScaleDecimal".equals(field.schema().name())) {
                objectRow[i] = CommonUtil.decodeEncodedNumericOrDecimalValues(struct, field);
            } else {
                objectRow[i] = struct.get(field);
            }
            i++;
        }
        int i2 = i;
        int i3 = i + 1;
        objectRow[i2] = sourceTableField;
        int i4 = i3 + 1;
        objectRow[i3] = str3;
        int i5 = i4 + 1;
        objectRow[i4] = format;
        int i6 = i5 + 1;
        objectRow[i5] = format2;
        int i7 = i6 + 1;
        objectRow[i6] = str4;
        if (!this.isOggType) {
            objectRow[i7] = str2;
        }
        return objectRow;
    }

    public void schemaReplication(SinkRecord sinkRecord) {
        Struct struct = (Struct) sinkRecord.value();
        String replace = struct.getString("TABLE_NAME").replace(",", "_");
        String string = struct.getString("SEG_OWNER");
        String string2 = struct.getString("OPERATION");
        String replaceFirst = struct.getString("SQL_REDO").replaceFirst("\n", "");
        Struct struct2 = struct.getStruct("data");
        Object[] objArr = new Object[5];
        int i = 0 + 1;
        objArr[0] = replaceFirst;
        int i2 = i + 1;
        objArr[i] = string;
        int i3 = i2 + 1;
        objArr[i2] = replace;
        objArr[i3] = string2;
        objArr[i3 + 1] = struct2;
        String format = CSVFormat.DEFAULT.format(objArr);
        if (!checkIfFileExists(this.schemaReplicationPath, struct)) {
            format = CSVFormat.DEFAULT.format(SinkWriterUtils.getDDLColumnDetails(false)) + "\n" + format;
        }
        PrintStream dDLWriter = getDDLWriter(this.schemaReplicationPath, struct);
        Throwable th = null;
        try {
            try {
                writeDDLData(dDLWriter, format);
                if (dDLWriter != null) {
                    if (0 == 0) {
                        dDLWriter.close();
                        return;
                    }
                    try {
                        dDLWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (dDLWriter != null) {
                if (th != null) {
                    try {
                        dDLWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    dDLWriter.close();
                }
            }
            throw th4;
        }
    }

    public void ddlData(SinkRecord sinkRecord, String str) {
        Struct struct = (Struct) sinkRecord.value();
        String replace = struct.getString("OBJECT_NAME").replace(",", "_");
        String string = struct.getString("SEG_OWNER");
        String string2 = struct.getString("OPERATION");
        String transactionIdentifier = sinkRecordParser.getTransactionIdentifier(struct);
        String string3 = struct.getString("SQL_REDO");
        Struct struct2 = struct.getStruct("data");
        Object[] objArr = new Object[6];
        int i = 0 + 1;
        objArr[0] = string3;
        int i2 = i + 1;
        objArr[i] = string;
        int i3 = i2 + 1;
        objArr[i2] = replace;
        int i4 = i3 + 1;
        objArr[i3] = string2;
        objArr[i4] = transactionIdentifier;
        objArr[i4 + 1] = struct2;
        String format = CSVFormat.DEFAULT.format(objArr);
        if (!checkIfFileExists(str, struct)) {
            format = CSVFormat.DEFAULT.format(SinkWriterUtils.getDDLColumnDetails(true)) + "\n" + format;
        }
        PrintStream dDLWriter = getDDLWriter(str, struct);
        Throwable th = null;
        try {
            try {
                writeDDLData(dDLWriter, format);
                if (dDLWriter != null) {
                    if (0 == 0) {
                        dDLWriter.close();
                        return;
                    }
                    try {
                        dDLWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (dDLWriter != null) {
                if (th != null) {
                    try {
                        dDLWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    dDLWriter.close();
                }
            }
            throw th4;
        }
    }

    public void flush(INotification iNotification, String str) {
        for (Map.Entry<String, PrintStream> entry : this.streamMap.entrySet()) {
            try {
                String key = entry.getKey();
                PrintStream value = entry.getValue();
                Throwable th = null;
                try {
                    try {
                        if (this.tableCache.get(key) != null && this.tableCache.get(key).size() > 0) {
                            value.print(Joiner.on("").join(this.tableCache.get(key)));
                        }
                        if (value != null) {
                            if (0 != 0) {
                                try {
                                    value.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                value.close();
                            }
                        }
                        String str2 = this.unCommitFiles.get(entry.getKey());
                        log.info("Writing data to file {} ", str2);
                        if (iNotification != null) {
                            String buildMessage = buildMessage(key, str2);
                            log.info("Send data {} to channel {}", buildMessage, str);
                            iNotification.sendData(str, buildMessage);
                        }
                        this.unCommitFiles.remove(entry.getKey());
                        this.tableCache.remove(entry.getKey());
                    } finally {
                    }
                } finally {
                }
            } catch (Exception e) {
                throw new ConnectException("close stream", e);
            }
        }
        this.streamMap.clear();
    }

    private String buildMessage(String str, String str2) {
        JsonObject jsonObject = new JsonObject();
        jsonObject.addProperty("table", str);
        jsonObject.addProperty(FileConnectorConfig.PATH, str2);
        return jsonObject.toString();
    }

    public void close() {
        log.info("closing");
        for (Map.Entry<String, PrintStream> entry : this.streamMap.entrySet()) {
            try {
                try {
                    Files.delete(Paths.get(this.unCommitFiles.get(entry.getKey()), new String[0]));
                    entry.getValue().close();
                } catch (IOException e) {
                    log.error(e.getMessage());
                    throw new ConnectException("close stream", e);
                }
            } catch (Throwable th) {
                entry.getValue().close();
                throw th;
            }
        }
    }

    private Pair<Integer, Boolean> handleLobDataWrite(Struct struct, Field field, Struct struct2, String str, String str2, Object[] objArr, int i) {
        String tableName = sinkRecordParser.getTableName(struct2);
        String string = struct.get(field.name()) != null ? struct.getString(field.name()) : null;
        if (lobDownload && this.lobDownloadType.equals("obs")) {
            downloadLobFromObs(string, tableName, field, struct2, str, str2, objArr, i);
        } else if (lobDownload && this.lobDownloadType.equals("hdfs")) {
            downloadLobFromHdfs(string, tableName, field, struct2, str, str2, objArr, i);
        } else if (field.schema().type() == Schema.Type.STRING) {
            String string2 = struct.getString(field.name());
            if (string2 == null || string2.isEmpty()) {
                objArr[i] = null;
                return new Pair<>(Integer.valueOf(i), true);
            }
            objArr[i] = writeClobData(tableName, field, struct2, string2, str, str2);
        } else if (field.schema().type() == Schema.Type.BYTES) {
            byte[] bArr = (byte[]) struct.get(field);
            if (bArr == null || bArr.length == 0) {
                objArr[i] = null;
                return new Pair<>(Integer.valueOf(i), true);
            }
            objArr[i] = writeBlobData(tableName, field, struct2, bArr, str, str2);
        }
        return new Pair<>(Integer.valueOf(i), false);
    }

    private void downloadLobFromObs(String str, String str2, Field field, Struct struct, String str3, String str4, Object[] objArr, int i) {
        if (StringUtils.isNotBlank(str)) {
            String[] split = str.split("#", 2);
            if (split.length > 1) {
                try {
                    InputStream objectContent = sourceObsClient.getObject(split[0], split[1]).getObjectContent();
                    Throwable th = null;
                    try {
                        try {
                            objArr[i] = writeLobData(str2, field, struct, objectContent, str3, str4);
                            if (objectContent != null) {
                                if (0 != 0) {
                                    try {
                                        objectContent.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    objectContent.close();
                                }
                            }
                        } catch (Throwable th3) {
                            th = th3;
                            throw th3;
                        }
                    } finally {
                    }
                } catch (IOException e) {
                    log.error("Error initializing or closing resources", e);
                }
            }
        }
    }

    private void downloadLobFromHdfs(String str, String str2, Field field, Struct struct, String str3, String str4, Object[] objArr, int i) {
        if (StringUtils.isNotBlank(str)) {
            try {
                objArr[i] = writeLobData(str2, field, struct, this.fs.open(new org.apache.hadoop.fs.Path(str + CSV_FORMAT)), str3, str4);
            } catch (IOException e) {
                log.error("error while opening hdfs file " + e.getMessage());
            }
        }
    }

    private void changeBaseDirectoryPermission(FileAttribute<Set<PosixFilePermission>> fileAttribute) {
        Path path = Paths.get(this.baseFilePath, new String[0]);
        try {
            if (Files.notExists(path, new LinkOption[0])) {
                Files.createDirectories(path, fileAttribute);
            }
        } catch (IOException e) {
            throw new ConnectException("Error while opening File stream", e);
        }
    }

    public int getProcessedRecords() {
        return this.processedRecords;
    }
}
