package com.huawei.flink.connector.jdbc.gaussdb.executor;

import com.huawei.flink.connector.jdbc.gaussdb.dialect.GaussDBDialect;
import com.huawei.flink.connector.jdbc.gaussdb.utils.GaussDBUtil;
import java.io.IOException;
import java.io.StringReader;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/flink/connector/jdbc/gaussdb/executor/GaussDBRowDataAppendOnlyByCopyStatementExecutor.class */
public class GaussDBRowDataAppendOnlyByCopyStatementExecutor implements JdbcBatchStatementExecutor<RowData> {
    private static final char DELIMITER_CHAR = 30;
    private static final char EOL_CHAR = '\n';
    private static final char QUOTE_CHAR = 29;
    private final String tableName;
    private final String[] fieldNames;
    private final boolean isEscapeStrValue;
    private final RowData.FieldGetter[] fieldGetters;
    private transient BaseConnection baseConn;
    private transient String copySql;
    private transient long appendCount;
    private transient long lastExecuteBatchTime;
    private static final Logger LOG = LoggerFactory.getLogger(GaussDBRowDataAppendOnlyByCopyStatementExecutor.class);
    private static final String DELIMITER = String.valueOf((char) 30);
    private static final String EOL = String.valueOf('\n');
    private static final String QUOTE = String.valueOf((char) 29);
    private final GaussDBDialect dialect = new GaussDBDialect();
    private final transient StringBuilder copyBuffer = new StringBuilder();

    public GaussDBRowDataAppendOnlyByCopyStatementExecutor(String str, String[] strArr, LogicalType[] logicalTypeArr, boolean z) {
        this.tableName = str;
        this.fieldNames = strArr;
        this.isEscapeStrValue = z;
        this.fieldGetters = new RowData.FieldGetter[logicalTypeArr.length];
        for (int i = 0; i < logicalTypeArr.length; i++) {
            this.fieldGetters[i] = RowData.createFieldGetter(logicalTypeArr[i], i);
        }
    }

    @Override // org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
    public void prepareStatements(Connection connection) throws SQLException {
        this.baseConn = (BaseConnection) connection;
        ArrayList<String> arrayList = new ArrayList<>(Arrays.asList(this.fieldNames));
        setGucParameter(this.baseConn);
        this.copySql = this.dialect.getCopyFromStdinStatement(this.tableName, DELIMITER, QUOTE, arrayList);
        this.lastExecuteBatchTime = System.currentTimeMillis();
    }

    @Override // org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
    public void addToBatch(RowData rowData) throws SQLException {
        for (int i = 0; i < rowData.getArity(); i++) {
            String valueOf = String.valueOf(this.fieldGetters[i].getFieldOrNull(rowData));
            if (!valueOf.isEmpty() && !valueOf.equalsIgnoreCase("null")) {
                if (valueOf.contains(QUOTE)) {
                    valueOf = GaussDBUtil.escapeQuote(valueOf, (char) 29);
                }
                if (this.isEscapeStrValue) {
                    this.copyBuffer.append(QUOTE);
                    this.copyBuffer.append(GaussDBUtil.escapeJava(valueOf));
                    this.copyBuffer.append(QUOTE);
                } else {
                    this.copyBuffer.append(QUOTE);
                    this.copyBuffer.append(valueOf);
                    this.copyBuffer.append(QUOTE);
                }
            }
            this.copyBuffer.append(DELIMITER);
        }
        this.copyBuffer.setLength(this.copyBuffer.length() - 1);
        this.copyBuffer.append(EOL);
        this.appendCount++;
    }

    @Override // org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
    public void executeBatch() throws SQLException {
        LOG.debug("[between two execute batch] threadId:" + Thread.currentThread().getId() + " wait data duration: " + (System.currentTimeMillis() - this.lastExecuteBatchTime) + "ms  accumulate size:" + this.appendCount);
        if (this.appendCount > 0) {
            internalExecuteBatch();
            this.appendCount = 0L;
        }
        this.lastExecuteBatchTime = System.currentTimeMillis();
    }

    void internalExecuteBatch() throws SQLException {
        long currentTimeMillis = System.currentTimeMillis();
        if (!this.baseConn.getAutoCommit()) {
            this.baseConn.setAutoCommit(true);
        }
        try {
            CopyManager copyManager = new CopyManager(this.baseConn);
            StringReader stringReader = new StringReader(this.copyBuffer.toString());
            copyManager.copyIn(this.copySql, stringReader);
            stringReader.close();
            LOG.debug("[Execute SQL duration] threadId:" + Thread.currentThread().getId() + " statement: " + this.copySql + " duration: " + (System.currentTimeMillis() - currentTimeMillis) + "ms.");
            this.copyBuffer.setLength(0);
            LOG.debug("[Execute batch duration] threadId:" + Thread.currentThread().getId() + " internalExecuteBatch: " + (System.currentTimeMillis() - currentTimeMillis) + "ms.");
        } catch (IOException e) {
            throw new SQLException(e.getMessage(), e);
        } catch (SQLException e2) {
            LOG.warn("[Execute SQL failed]: " + e2.getMessage());
            throw e2;
        }
    }

    private void setGucParameter(BaseConnection baseConnection) throws SQLException {
        GaussDBUtil.executeSql(baseConnection, "set query_dop=1", LOG);
    }

    @Override // org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor
    public void closeStatements() throws SQLException {
        if (this.baseConn != null) {
            this.baseConn.close();
            this.baseConn = null;
        }
    }
}
