package org.apache.flink.connector.jdbc.datasource.transactions.xa;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import javax.transaction.xa.Xid;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterState;
import org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider;
import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId;
import org.apache.flink.connector.jdbc.datasource.transactions.xa.exceptions.EmptyTransactionXaException;
import org.apache.flink.connector.jdbc.datasource.transactions.xa.exceptions.TransientXaException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/jdbc/datasource/transactions/xa/XaTransaction.class */
public class XaTransaction implements Serializable, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(XaTransaction.class);
    private final XaConnectionProvider xaConnectionProvider;
    private final JdbcExactlyOnceOptions exactlyOnceOptions;
    private transient List<TransactionId> preparedXids = new ArrayList();
    private transient Deque<TransactionId> hangingXids = new LinkedList();
    private transient TransactionId currentTid;
    private final TransactionId baseTransaction;

    public XaTransaction(JdbcExactlyOnceOptions jdbcExactlyOnceOptions, TransactionId transactionId, XaConnectionProvider xaConnectionProvider) {
        this.xaConnectionProvider = xaConnectionProvider;
        this.exactlyOnceOptions = jdbcExactlyOnceOptions;
        this.baseTransaction = transactionId;
    }

    public Xid getCurrentXid() {
        return this.currentTid;
    }

    public XaConnectionProvider getConnectionProvider() {
        return this.xaConnectionProvider;
    }

    public JdbcWriterState getState() {
        return JdbcWriterState.of(this.preparedXids, this.hangingXids);
    }

    public void open(JdbcWriterState jdbcWriterState) throws IOException {
        try {
            this.xaConnectionProvider.open();
            recoverState(jdbcWriterState);
            this.hangingXids = new LinkedList(failOrRollback(this.hangingXids).getForRetry());
            commitTx();
            if (this.exactlyOnceOptions.isDiscoverAndRollbackOnRecovery()) {
                recoverAndRollback();
            }
        } catch (Exception e) {
            ExceptionUtils.rethrowIOException(e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.currentTid != null && this.xaConnectionProvider.isOpen()) {
            try {
                LOG.debug("remove current transaction before closing, xid={}", this.currentTid.getXidValue());
                this.xaConnectionProvider.failAndRollback(this.currentTid);
            } catch (Exception e) {
                LOG.warn("unable to fail/rollback current transaction, xid={}", this.currentTid.getXidValue(), e);
            }
        }
        this.xaConnectionProvider.close();
        this.currentTid = null;
        this.hangingXids = null;
        this.preparedXids = null;
    }

    public void recoverState(JdbcWriterState jdbcWriterState) {
        this.hangingXids = new LinkedList(jdbcWriterState.getHanging());
        this.preparedXids = new ArrayList(jdbcWriterState.getPrepared());
        LOG.info("initialized state: prepared xids: {}, hanging xids: {}", Integer.valueOf(this.preparedXids.size()), Integer.valueOf(this.hangingXids.size()));
    }

    public void checkState() {
        Preconditions.checkState(this.currentTid != null, "current xid must not be null");
        Preconditions.checkState(!this.hangingXids.isEmpty() && this.hangingXids.peekLast().equals(this.currentTid), "inconsistent internal state");
    }

    public void createTx(long j) throws IOException {
        try {
            Preconditions.checkState(this.currentTid == null, "currentXid not null");
            this.currentTid = this.baseTransaction.withBranch(j);
            this.hangingXids.offerLast(this.currentTid);
            this.xaConnectionProvider.start(this.currentTid);
        } catch (Exception e) {
            ExceptionUtils.rethrowIOException(e);
        }
    }

    public void prepareTx() throws IOException {
        checkState();
        this.hangingXids.pollLast();
        try {
            this.xaConnectionProvider.endAndPrepare(this.currentTid);
            this.preparedXids.add(this.currentTid);
        } catch (Exception e) {
            ExceptionUtils.rethrowIOException(e);
        } catch (EmptyTransactionXaException e2) {
            LOG.info("empty XA transaction (skip), xid: {}, checkpoint {}", this.currentTid.getXidValue(), Long.valueOf(this.currentTid.getCheckpointId()));
        }
        this.currentTid = null;
    }

    public void commitTx() {
        List<TransactionId> list = this.preparedXids;
        this.preparedXids = new ArrayList();
        this.preparedXids.addAll(commitXids(list));
    }

    public void commitTxUntil(long j) {
        Tuple2<List<TransactionId>, List<TransactionId>> split = split(this.preparedXids, j);
        if (((List) split.f0).isEmpty()) {
            LOG.warn("nothing to commit up to checkpoint: {}", Long.valueOf(j));
        } else {
            this.preparedXids = (List) split.f1;
            this.preparedXids.addAll(commitXids((List) split.f0));
        }
    }

    public List<TransactionId> commitXids(List<TransactionId> list) {
        return commit(list, this.exactlyOnceOptions.isAllowOutOfOrderCommits(), this.exactlyOnceOptions.getMaxCommitAttempts()).getForRetry();
    }

    private Tuple2<List<TransactionId>, List<TransactionId>> split(List<TransactionId> list, long j) {
        return split(list, j, true);
    }

    private Tuple2<List<TransactionId>, List<TransactionId>> split(List<TransactionId> list, long j, boolean z) {
        ArrayList arrayList = new ArrayList(list.size() / 2);
        ArrayList arrayList2 = new ArrayList(list.size() / 2);
        list.forEach(transactionId -> {
            if (transactionId.getCheckpointId() < j || (transactionId.getCheckpointId() == j && z)) {
                arrayList.add(transactionId);
            } else {
                arrayList2.add(transactionId);
            }
        });
        return new Tuple2<>(arrayList, arrayList2);
    }

    private XaTransactionResult<TransactionId> commit(List<TransactionId> list, boolean z, int i) {
        XaTransactionResult<TransactionId> xaTransactionResult = new XaTransactionResult<>();
        int size = list.size();
        LOG.debug("commit {} transactions", Integer.valueOf(size));
        Iterator<TransactionId> it = list.iterator();
        while (it.hasNext() && (xaTransactionResult.hasNoFailures() || z)) {
            TransactionId next = it.next();
            it.remove();
            try {
                this.xaConnectionProvider.commit(next, next.getRestored());
                xaTransactionResult.succeeded(next);
            } catch (Exception e) {
                xaTransactionResult.failed(next, e);
            } catch (TransientXaException e2) {
                xaTransactionResult.failedTransiently(next.withAttemptsIncremented(), e2);
            }
        }
        xaTransactionResult.getForRetry().addAll(list);
        xaTransactionResult.throwIfAnyFailed("commit");
        throwIfAnyReachedMaxAttempts(xaTransactionResult, i);
        xaTransactionResult.getTransientFailure().ifPresent(exc -> {
            LOG.warn("failed to commit {} transactions out of {} (keep them to retry later)", new Object[]{Integer.valueOf(xaTransactionResult.getForRetry().size()), Integer.valueOf(size), exc});
        });
        return xaTransactionResult;
    }

    private XaTransactionResult<TransactionId> failOrRollback(Collection<TransactionId> collection) {
        XaTransactionResult<TransactionId> xaTransactionResult = new XaTransactionResult<>();
        if (collection.isEmpty()) {
            return xaTransactionResult;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("rolling back {} transactions: {}", Integer.valueOf(collection.size()), collection);
        }
        for (TransactionId transactionId : collection) {
            try {
                this.xaConnectionProvider.failAndRollback(transactionId);
                xaTransactionResult.succeeded(transactionId);
            } catch (Exception e) {
                LOG.warn("unable to fail/rollback transaction, xid={}: {}", transactionId, e.getMessage());
                xaTransactionResult.failed(transactionId, e);
            } catch (TransientXaException e2) {
                LOG.info("unable to fail/rollback transaction, xid={}: {}", transactionId, e2.getMessage());
                xaTransactionResult.failedTransiently(transactionId, e2);
            }
        }
        if (!xaTransactionResult.getForRetry().isEmpty()) {
            LOG.info("failed to roll back {} transactions", Integer.valueOf(xaTransactionResult.getForRetry().size()));
        }
        return xaTransactionResult;
    }

    private void recoverAndRollback() {
        Collection<Xid> recover = this.xaConnectionProvider.recover();
        if (recover.isEmpty()) {
            return;
        }
        LOG.warn("rollback {} recovered transactions", Integer.valueOf(recover.size()));
        for (Xid xid : recover) {
            if (this.baseTransaction.belongsTo(xid)) {
                try {
                    this.xaConnectionProvider.rollback(xid);
                } catch (Exception e) {
                    LOG.info("unable to rollback recovered transaction, xid={}", xid, e);
                }
            }
        }
    }

    private void throwIfAnyReachedMaxAttempts(XaTransactionResult<TransactionId> xaTransactionResult, int i) {
        ArrayList arrayList = null;
        for (TransactionId transactionId : xaTransactionResult.getForRetry()) {
            if (transactionId.getAttempts() >= i) {
                if (arrayList == null) {
                    arrayList = new ArrayList();
                }
                arrayList.add(transactionId);
            }
        }
        if (arrayList != null) {
            throw new RuntimeException(String.format("reached max number of commit attempts (%d) for transactions: %s", Integer.valueOf(i), arrayList));
        }
    }
}
