package org.apache.flink.connector.jdbc.datasource.connections.xa;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import javax.annotation.concurrent.NotThreadSafe;
import javax.sql.XAConnection;
import javax.sql.XADataSource;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.jdbc.datasource.transactions.xa.exceptions.EmptyTransactionXaException;
import org.apache.flink.connector.jdbc.datasource.transactions.xa.exceptions.XaError;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@NotThreadSafe
/* loaded from: input_file:org/apache/flink/connector/jdbc/datasource/connections/xa/SimpleXaConnectionProvider.class */
public class SimpleXaConnectionProvider implements XaConnectionProvider {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(SimpleXaConnectionProvider.class);
    private static final int MAX_RECOVER_CALLS = 100;
    private final Supplier<XADataSource> dataSourceSupplier;
    private final Integer timeoutSec;
    private transient XAResource xaResource;
    private transient Connection connection;
    private transient XAConnection xaConnection;

    @VisibleForTesting
    public static SimpleXaConnectionProvider from(XADataSource xADataSource) {
        return from(xADataSource, (Integer) null);
    }

    public static SimpleXaConnectionProvider from(XADataSource xADataSource, Integer num) {
        return from((Supplier<XADataSource>) () -> {
            return xADataSource;
        }, num);
    }

    public static SimpleXaConnectionProvider from(Supplier<XADataSource> supplier, Integer num) {
        return new SimpleXaConnectionProvider(supplier, num);
    }

    private SimpleXaConnectionProvider(Supplier<XADataSource> supplier, Integer num) {
        this.dataSourceSupplier = (Supplier) Preconditions.checkNotNull(supplier);
        this.timeoutSec = num;
    }

    @Override // org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider
    public void open() throws SQLException {
        Preconditions.checkState(!isOpen(), "already connected");
        this.xaConnection = this.dataSourceSupplier.get().getXAConnection();
        this.xaResource = this.xaConnection.getXAResource();
        if (this.timeoutSec != null) {
            try {
                this.xaResource.setTransactionTimeout(this.timeoutSec.intValue());
            } catch (XAException e) {
                throw new SQLException((Throwable) e);
            }
        }
        this.connection = this.xaConnection.getConnection();
        this.connection.setReadOnly(false);
        this.connection.setAutoCommit(false);
        Preconditions.checkState(!this.connection.getAutoCommit());
    }

    @Override // org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider, java.lang.AutoCloseable
    public void close() throws SQLException {
        if (this.connection != null) {
            this.connection.close();
            this.connection = null;
        }
        try {
            this.xaConnection.close();
        } catch (SQLException e) {
            LOG.warn("unable to close XA connection", e);
        }
        this.xaResource = null;
    }

    @Override // org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider
    public Connection getConnection() {
        Preconditions.checkNotNull(this.connection);
        return this.connection;
    }

    @Override // org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider
    public boolean isOpen() {
        return this.xaResource != null;
    }

    @Override // org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider
    public boolean isConnectionValid() throws SQLException {
        return isOpen() && this.connection.isValid(this.connection.getNetworkTimeout());
    }

    @Override // org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider
    public Connection getOrEstablishConnection() throws SQLException {
        if (!isOpen()) {
            open();
        }
        return this.connection;
    }

    @Override // org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider
    public void closeConnection() {
        try {
            close();
        } catch (SQLException e) {
            LOG.warn("Connection close failed.", e);
        }
    }

    @Override // org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider
    public Connection reestablishConnection() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider
    public boolean isClickHouseConnection() {
        return false;
    }

    @Override // org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider
    public void start(Xid xid) {
        execute(XaCommand.fromRunnable("start", xid, () -> {
            this.xaResource.start(xid, 0);
        }));
    }

    @Override // org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider
    public void endAndPrepare(Xid xid) {
        execute(XaCommand.fromRunnable("end", xid, () -> {
            this.xaResource.end(xid, 67108864);
        }));
        int intValue = ((Integer) execute(new XaCommand("prepare", xid, () -> {
            return Integer.valueOf(this.xaResource.prepare(xid));
        }))).intValue();
        if (intValue == 3) {
            throw new EmptyTransactionXaException(xid);
        }
        if (intValue != 0) {
            throw new FlinkRuntimeException(formatErrorMessage("prepare", xid, "response: " + intValue));
        }
    }

    @Override // org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider
    public void failAndRollback(Xid xid) {
        execute(XaCommand.fromRunnable("end (fail)", xid, () -> {
            this.xaResource.end(xid, 536870912);
            this.xaResource.rollback(xid);
        }, xAException -> {
            if (xAException.errorCode >= MAX_RECOVER_CALLS) {
                rollback(xid);
            } else {
                LOG.warn(formatErrorMessage("end (fail)", xid, Integer.valueOf(xAException.errorCode), new String[0]));
            }
        }));
    }

    @Override // org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider
    public void commit(Xid xid, boolean z) {
        execute(XaCommand.fromRunnableRecoverByWarn("commit", xid, () -> {
            this.xaResource.commit(xid, false);
        }, xAException -> {
            return buildCommitErrorDesc(xAException, z);
        }));
    }

    @Override // org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider
    public void rollback(Xid xid) {
        execute(XaCommand.fromRunnableRecoverByWarn("rollback", xid, () -> {
            this.xaResource.rollback(xid);
        }, this::buildRollbackErrorDesc));
    }

    private void forget(Xid xid) {
        execute(XaCommand.fromRunnableRecoverByWarn("forget", xid, () -> {
            this.xaResource.forget(xid);
        }, xAException -> {
            return Optional.of("manual cleanup may be required");
        }));
    }

    @Override // org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider
    public Collection<Xid> recover() {
        return (Collection) execute(new XaCommand("recover", null, () -> {
            List<Xid> recover = recover(16777216);
            int i = 0;
            while (recover.addAll(recover(0))) {
                try {
                    Preconditions.checkState(i < MAX_RECOVER_CALLS, "too many xa_recover() calls");
                    i++;
                } finally {
                    recover(8388608);
                }
            }
            return recover;
        }));
    }

    private List<Xid> recover(int i) throws XAException {
        return Arrays.asList(this.xaResource.recover(i));
    }

    private <T> T execute(XaCommand<T> xaCommand) throws FlinkRuntimeException {
        Preconditions.checkState(isOpen(), "not connected");
        try {
            return xaCommand.execute();
        } catch (FlinkRuntimeException e) {
            throw e;
        } catch (Exception e2) {
            throw wrapException(xaCommand.getName(), xaCommand.getXid().orElse(null), e2);
        } catch (XAException e3) {
            if (XaError.isHeurErrorCode(e3.errorCode)) {
                xaCommand.getXid().ifPresent(this::forget);
            }
            return xaCommand.recover(e3);
        }
    }

    private static FlinkRuntimeException wrapException(String str, Xid xid, Exception exc) {
        return XaError.wrapException(str, xid, exc);
    }

    private Optional<String> buildCommitErrorDesc(XAException xAException, boolean z) {
        return XaError.buildCommitErrorDesc(xAException, z);
    }

    private Optional<String> buildRollbackErrorDesc(XAException xAException) {
        return XaError.buildRollbackErrorDesc(xAException);
    }

    private static String formatErrorMessage(String str, Xid xid, String... strArr) {
        return formatErrorMessage(str, xid, null, strArr);
    }

    private static String formatErrorMessage(String str, Xid xid, Integer num, String... strArr) {
        return XaError.errorMessage(str, xid, num, strArr);
    }
}
