package com.huawei.flink.connector.jdbc.gaussdb.executor;

import com.huawei.flink.connector.jdbc.gaussdb.dialect.GaussDBDialect;
import com.huawei.flink.connector.jdbc.gaussdb.utils.GaussDBUtil;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
import org.apache.flink.connector.jdbc.statement.StatementFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
import org.postgresql.core.BaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/flink/connector/jdbc/gaussdb/executor/GaussDBUpsertExistsCopyStatement.class */
public class GaussDBUpsertExistsCopyStatement implements JdbcBatchStatementExecutor<RowData> {
    private static final char DELIMITER_CHAR = 30;
    private static final char EOL_CHAR = '\n';
    private static final String MIN_COLUMN = "min";
    private static final String MAX_COLUMN = "max";
    private final String tableName;
    private final GaussDBDialect dialect;
    private final String[] keyFields;
    private final String[] fieldNames;
    private final ArrayList<Integer> jsonMapFieldsIndex;
    private final String mergeFilterKey;
    private final boolean isEscapeStrValue;
    private final RowData.FieldGetter[] fieldGetters;
    private transient long convertDuration;
    private transient long appendDuration;
    private transient long lastExecuteBatchTime;
    private transient Map<RowData, RowData> keyToRows;
    private transient BaseConnection baseConn;
    private transient String tempTableName;
    private transient ArrayList<String> tableColumnsList;
    private transient CaseInsensitiveMap rowMap;
    private transient String copySql;
    private transient String mergeSql;
    private transient String truncateSql;
    private transient PreparedStatement mergeStatement;
    private final Function<RowData, RowData> keyExtractor;
    private final Function<RowData, RowData> rowDataCopy;
    private FieldNamedPreparedStatement deleteStatement;
    private final StatementFactory deleteFactory;
    private final JdbcRowConverter deleteSetter;
    private static final Logger LOG = LoggerFactory.getLogger(GaussDBUpsertExistsCopyStatement.class);
    private static final String DELIMITER = String.valueOf((char) 30);
    private static final String EOL = String.valueOf('\n');
    private final long threadId = Thread.currentThread().getId();
    private final transient StringBuilder copyBuffer = new StringBuilder();
    private final transient ObjectReader jacksonReader = new ObjectMapper().readerFor(Map.class);

    public GaussDBUpsertExistsCopyStatement(GaussDBDialect gaussDBDialect, StatementFactory statementFactory, JdbcRowConverter jdbcRowConverter, String str, String[] strArr, String[] strArr2, String[] strArr3, String str2, boolean z, Function<RowData, RowData> function, Function<RowData, RowData> function2, LogicalType[] logicalTypeArr) {
        this.dialect = gaussDBDialect;
        this.tableName = str;
        this.keyFields = strArr;
        this.fieldNames = strArr2;
        this.jsonMapFieldsIndex = getJsonMapFieldsIndex(strArr2, strArr3);
        this.mergeFilterKey = str2;
        this.isEscapeStrValue = z;
        this.keyExtractor = function;
        this.rowDataCopy = function2;
        this.deleteSetter = jdbcRowConverter;
        this.deleteFactory = statementFactory;
        this.fieldGetters = new RowData.FieldGetter[logicalTypeArr.length];
        for (int i = 0; i < logicalTypeArr.length; i++) {
            this.fieldGetters[i] = RowData.createFieldGetter(logicalTypeArr[i], i);
        }
    }

    @Override // org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
    public void prepareStatements(Connection connection) throws SQLException {
        this.keyToRows = new HashMap();
        this.baseConn = (BaseConnection) connection;
        this.deleteStatement = this.deleteFactory.createStatement(this.baseConn);
        this.tempTableName = getTempTableName(this.tableName);
        GaussDBUtil.executeSql(this.baseConn, this.dialect.createTempTable(this.tableName, this.tempTableName), LOG);
        this.tableColumnsList = this.dialect.getTableColumnsList(this.baseConn, this.tempTableName);
        setGucParameter(this.baseConn);
        this.copySql = this.dialect.getCopyFromStdinStatement(this.tempTableName, DELIMITER, this.tableColumnsList);
        this.truncateSql = this.dialect.getTruncateTableStatement(this.tempTableName);
        this.mergeSql = mergeIntoDest(this.tableName, this.tempTableName, this.tableColumnsList);
        this.mergeStatement = connection.prepareStatement(this.mergeSql);
        this.rowMap = new CaseInsensitiveMap((int) ((this.tableColumnsList.size() / 0.75d) + 1.0d));
    }

    @Override // org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
    public void addToBatch(RowData rowData) {
        this.keyToRows.put(this.keyExtractor.apply(rowData), this.rowDataCopy.apply(rowData));
    }

    @Override // org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
    public void executeBatch() throws SQLException {
        LOG.debug("[between two execute batch] threadId:" + this.threadId + " wait data duration: " + (System.currentTimeMillis() - this.lastExecuteBatchTime) + "ms  accumulate size:" + this.keyToRows.size());
        if (this.keyToRows.size() > 0) {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                this.convertDuration = 0L;
                this.appendDuration = 0L;
                for (Map.Entry<RowData, RowData> entry : this.keyToRows.entrySet()) {
                    RowData key = entry.getKey();
                    RowData value = entry.getValue();
                    if (value.getRowKind() == RowKind.DELETE) {
                        this.deleteSetter.toExternal(key, this.deleteStatement);
                        this.deleteStatement.addBatch();
                    } else {
                        processOneRowInBatch(key, value);
                    }
                }
                LOG.debug("[Add rows in batch duration] threadId:" + this.threadId + " Generate batch: " + (System.currentTimeMillis() - currentTimeMillis) + "ms. convertDuration: " + this.convertDuration + " appendDuration: " + this.appendDuration);
                internalExecuteBatch();
                this.deleteStatement.executeBatch();
                this.keyToRows.clear();
            } catch (IOException e) {
                throw new SQLException(e.getMessage(), e);
            }
        }
        this.lastExecuteBatchTime = System.currentTimeMillis();
    }

    @Override // org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
    public void closeStatements() throws SQLException {
        if (this.deleteStatement != null) {
            this.deleteStatement.close();
            this.deleteStatement = null;
        }
        GaussDBUtil.executeSql(this.baseConn, this.dialect.getDropTableStatement(this.tempTableName), LOG);
        this.copyBuffer.setLength(0);
        if (this.mergeStatement != null) {
            this.mergeStatement.close();
            this.mergeStatement = null;
        }
    }

    void convertRowToHashMap(RowData rowData) throws JsonProcessingException {
        for (int i = 0; i < rowData.getArity(); i++) {
            Object fieldOrNull = this.fieldGetters[i].getFieldOrNull(rowData);
            if (this.jsonMapFieldsIndex.contains(Integer.valueOf(i))) {
                String valueOf = String.valueOf(fieldOrNull);
                if (valueOf.length() > 0 && !valueOf.equals("null")) {
                    this.rowMap.putAll((Map) this.jacksonReader.readValue(valueOf));
                }
            } else {
                this.rowMap.put(this.fieldNames[i], fieldOrNull);
            }
        }
    }

    void buildCopyBuffer() {
        Iterator<String> it = this.tableColumnsList.iterator();
        while (it.hasNext()) {
            String next = it.next();
            String valueOf = String.valueOf(this.rowMap.get(next));
            if (next.equals("exps")) {
                if (valueOf.length() > 0 && !valueOf.equals("null")) {
                    this.copyBuffer.append("{");
                    this.copyBuffer.append(valueOf.substring(1, valueOf.length() - 1).replaceAll("#", ","));
                    this.copyBuffer.append("}");
                }
            } else if (this.isEscapeStrValue) {
                this.copyBuffer.append(GaussDBUtil.escapeJava(valueOf));
            } else {
                this.copyBuffer.append(valueOf);
            }
            this.copyBuffer.append(DELIMITER);
        }
        this.copyBuffer.setLength(this.copyBuffer.length() - 1);
        this.copyBuffer.append(EOL);
    }

    private ArrayList<Integer> getJsonMapFieldsIndex(String[] strArr, String[] strArr2) {
        ArrayList<Integer> arrayList = new ArrayList<>();
        for (int i = 0; i < strArr.length; i++) {
            if (strArr2 != null && Arrays.asList(strArr2).contains(strArr[i])) {
                arrayList.add(Integer.valueOf(i));
            }
        }
        return arrayList;
    }

    void processOneRowInBatch(RowData rowData, RowData rowData2) throws JsonProcessingException {
        long currentTimeMillis = System.currentTimeMillis();
        convertRowToHashMap(rowData2);
        this.convertDuration += System.currentTimeMillis() - currentTimeMillis;
        long currentTimeMillis2 = System.currentTimeMillis();
        buildCopyBuffer();
        this.appendDuration += System.currentTimeMillis() - currentTimeMillis2;
        this.rowMap.clear();
    }

    void internalExecuteBatch() throws SQLException {
        long currentTimeMillis = System.currentTimeMillis();
        if (!this.baseConn.getAutoCommit()) {
            this.baseConn.setAutoCommit(true);
        }
        try {
            try {
                GaussDBUtil.executeCopyIn(this.baseConn, this.copySql, this.copyBuffer, LOG);
                this.copyBuffer.setLength(0);
                executeMerge(this.baseConn, this.mergeStatement);
                GaussDBUtil.executeSql(this.baseConn, this.truncateSql, LOG);
                LOG.debug("[Execute batch duration] threadId:" + Thread.currentThread().getId() + " internalExecuteBatch: " + (System.currentTimeMillis() - currentTimeMillis) + "ms.");
            } catch (IOException e) {
                throw new SQLException(e.getMessage(), e);
            }
        } catch (Throwable th) {
            this.copyBuffer.setLength(0);
            throw th;
        }
    }

    private void setGucParameter(BaseConnection baseConnection) throws SQLException {
        GaussDBUtil.executeSql(baseConnection, "set enable_mergejoin=off", LOG);
        GaussDBUtil.executeSql(baseConnection, "set enable_nestloop=off", LOG);
        GaussDBUtil.executeSql(baseConnection, "set enable_force_vector_engine=on", LOG);
        GaussDBUtil.executeSql(baseConnection, "set query_dop=1", LOG);
    }

    private String getTempTableName(String str) {
        return str.replace(".", "_") + "_" + UUID.randomUUID().toString().replaceAll("-", "").toLowerCase() + "_" + new Random().nextInt(Integer.MAX_VALUE);
    }

    private int executeMerge(BaseConnection baseConnection, PreparedStatement preparedStatement) throws SQLException {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            if (this.mergeFilterKey != null) {
                ResultSet execSQLQuery = baseConnection.execSQLQuery("SELECT min(" + this.mergeFilterKey + ") min, max(" + this.mergeFilterKey + ") max FROM " + this.tempTableName);
                while (execSQLQuery.next()) {
                    preparedStatement.setObject(1, execSQLQuery.getObject(MIN_COLUMN));
                    preparedStatement.setObject(2, execSQLQuery.getObject(MAX_COLUMN));
                }
                execSQLQuery.close();
            }
            int executeUpdate = preparedStatement.executeUpdate();
            LOG.debug("[Execute SQL duration] threadId:" + Thread.currentThread().getId() + " statement: " + this.mergeSql + " duration: " + (System.currentTimeMillis() - currentTimeMillis) + "ms.");
            return executeUpdate;
        } catch (SQLException e) {
            LOG.warn("[Execute SQL failed]: " + e.getMessage());
            throw e;
        }
    }

    private String mergeIntoDest(String str, String str2, ArrayList<String> arrayList) {
        String str3 = "src_temp_table_alias";
        String str4 = (String) Arrays.stream(this.keyFields).map(str5 -> {
            return str3 + "." + this.dialect.quoteIdentifier(str5.toLowerCase()) + "=" + str + "." + this.dialect.quoteIdentifier(str5.toLowerCase());
        }).collect(Collectors.joining(" and "));
        String str6 = (String) Arrays.stream(this.keyFields).map(str7 -> {
            return str2 + "." + this.dialect.quoteIdentifier(str7.toLowerCase());
        }).collect(Collectors.joining(", "));
        Stream stream = arrayList.stream();
        GaussDBDialect gaussDBDialect = this.dialect;
        gaussDBDialect.getClass();
        return "insert into " + str + "(" + ((String) stream.map(gaussDBDialect::quoteIdentifier).collect(Collectors.joining(", "))) + ")  select " + ((String) arrayList.stream().map(str8 -> {
            return str3 + "." + this.dialect.quoteIdentifier(str8);
        }).collect(Collectors.joining(", "))) + " from (select row_number() over(partition by " + str6 + " order by " + str6 + ") as rownum,* from " + str2 + ") src_temp_table_alias where src_temp_table_alias.rownum=1 and not exists (select 1 from " + str + " where " + str4 + (this.mergeFilterKey != null ? " and " + this.mergeFilterKey + " >= ? and " + this.mergeFilterKey + " <= ?" : "") + ")";
    }
}
