package org.apache.flink.connector.jdbc.xa;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import javax.transaction.xa.Xid;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.connector.jdbc.xa.XaFacade;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@Deprecated
/* loaded from: input_file:org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.class */
public class JdbcXaSinkFunction<T> extends AbstractRichFunction implements CheckpointedFunction, CheckpointListener, SinkFunction<T>, AutoCloseable, InputTypeConfigurable {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcXaSinkFunction.class);
    private final XaFacade xaFacade;
    private final XaGroupOps xaGroupOps;
    private final XidGenerator xidGenerator;
    private final JdbcOutputFormat<T, T, JdbcBatchStatementExecutor<T>> outputFormat;
    private final XaSinkStateHandler stateHandler;
    private final JdbcExactlyOnceOptions options;
    private JdbcOutputSerializer<T> serializer;
    private transient List<CheckpointAndXid> preparedXids;
    private transient Deque<Xid> hangingXids;
    private transient Xid currentXid;

    public JdbcXaSinkFunction(String str, JdbcStatementBuilder<T> jdbcStatementBuilder, XaFacade xaFacade, JdbcExecutionOptions jdbcExecutionOptions, JdbcExactlyOnceOptions jdbcExactlyOnceOptions) {
        this(new JdbcOutputFormat(xaFacade, jdbcExecutionOptions, () -> {
            return JdbcBatchStatementExecutor.simple(str, jdbcStatementBuilder);
        }), xaFacade, XidGenerator.semanticXidGenerator(), new XaSinkStateHandlerImpl(), jdbcExactlyOnceOptions, new XaGroupOpsImpl(xaFacade));
    }

    public JdbcXaSinkFunction(JdbcOutputFormat<T, T, JdbcBatchStatementExecutor<T>> jdbcOutputFormat, XaFacade xaFacade, XidGenerator xidGenerator, XaSinkStateHandler xaSinkStateHandler, JdbcExactlyOnceOptions jdbcExactlyOnceOptions, XaGroupOps xaGroupOps) {
        this.preparedXids = new ArrayList();
        this.hangingXids = new LinkedList();
        Preconditions.checkArgument(jdbcOutputFormat.getExecutionOptions().getMaxRetries() == 0, "JDBC XA sink requires maxRetries equal to 0, otherwise it could cause duplicates. See issue FLINK-22311 for details.");
        this.xaFacade = (XaFacade) Preconditions.checkNotNull(xaFacade);
        this.xidGenerator = (XidGenerator) Preconditions.checkNotNull(xidGenerator);
        this.outputFormat = (JdbcOutputFormat) Preconditions.checkNotNull(jdbcOutputFormat);
        this.stateHandler = (XaSinkStateHandler) Preconditions.checkNotNull(xaSinkStateHandler);
        this.options = (JdbcExactlyOnceOptions) Preconditions.checkNotNull(jdbcExactlyOnceOptions);
        this.xaGroupOps = xaGroupOps;
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        JdbcXaSinkFunctionState load = this.stateHandler.load(functionInitializationContext);
        this.hangingXids = new LinkedList(load.getHanging());
        this.preparedXids = new ArrayList(load.getPrepared());
        LOG.info("initialized state: prepared xids: {}, hanging xids: {}", Integer.valueOf(this.preparedXids.size()), Integer.valueOf(this.hangingXids.size()));
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.serializer.withObjectReuseEnabled(getRuntimeContext().getExecutionConfig().isObjectReuseEnabled());
        this.xidGenerator.open();
        this.xaFacade.open();
        this.hangingXids = new LinkedList(this.xaGroupOps.failOrRollback(this.hangingXids).getForRetry());
        commitUpToCheckpoint(Optional.empty());
        if (this.options.isDiscoverAndRollbackOnRecovery()) {
            this.xaGroupOps.recoverAndRollback(JobSubtask.of(getRuntimeContext()), this.xidGenerator);
        }
        beginTx(0L);
        this.outputFormat.open(this.serializer);
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        LOG.debug("snapshot state, checkpointId={}", Long.valueOf(functionSnapshotContext.getCheckpointId()));
        prepareCurrentTx(functionSnapshotContext.getCheckpointId());
        beginTx(functionSnapshotContext.getCheckpointId() + 1);
        this.stateHandler.store(JdbcXaSinkFunctionState.of(this.preparedXids, this.hangingXids));
    }

    public void notifyCheckpointComplete(long j) {
        commitUpToCheckpoint(Optional.of(Long.valueOf(j)));
    }

    public void invoke(T t, SinkFunction.Context context) throws IOException {
        Preconditions.checkState(this.currentXid != null, "current xid must not be null");
        if (LOG.isTraceEnabled()) {
            LOG.trace("invoke, xid: {}, value: {}", this.currentXid, t);
        }
        this.outputFormat.writeRecord(t);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        super.close();
        if (this.currentXid != null && this.xaFacade.isOpen()) {
            try {
                LOG.debug("remove current transaction before closing, xid={}", this.currentXid);
                this.xaFacade.failAndRollback(this.currentXid);
            } catch (Exception e) {
                LOG.warn("unable to fail/rollback current transaction, xid={}", this.currentXid, e);
            }
        }
        this.xaFacade.close();
        this.xidGenerator.close();
        this.currentXid = null;
        this.hangingXids = null;
        this.preparedXids = null;
    }

    private void prepareCurrentTx(long j) throws IOException {
        Preconditions.checkState(this.currentXid != null, "no current xid");
        Preconditions.checkState(!this.hangingXids.isEmpty() && this.hangingXids.peekLast().equals(this.currentXid), "inconsistent internal state");
        this.hangingXids.pollLast();
        this.outputFormat.flush();
        try {
            this.xaFacade.endAndPrepare(this.currentXid);
            this.preparedXids.add(CheckpointAndXid.createNew(j, this.currentXid));
        } catch (Exception e) {
            ExceptionUtils.rethrowIOException(e);
        } catch (XaFacade.EmptyXaTransactionException e2) {
            LOG.info("empty XA transaction (skip), xid: {}, checkpoint {}", this.currentXid, Long.valueOf(j));
        }
        this.currentXid = null;
    }

    private void beginTx(long j) throws Exception {
        Preconditions.checkState(this.currentXid == null, "currentXid not null");
        this.currentXid = this.xidGenerator.generateXid(JobSubtask.of(getRuntimeContext()), j);
        this.hangingXids.offerLast(this.currentXid);
        this.xaFacade.start(this.currentXid);
        if (j > 0) {
            this.outputFormat.updateExecutor(false);
        }
    }

    private void commitUpToCheckpoint(Optional<Long> optional) {
        Tuple2<List<CheckpointAndXid>, List<CheckpointAndXid>> split = split(this.preparedXids, optional, true);
        if (((List) split.f0).isEmpty()) {
            optional.ifPresent(l -> {
                LOG.warn("nothing to commit up to checkpoint: {}", l);
            });
        } else {
            this.preparedXids = (List) split.f1;
            this.preparedXids.addAll(this.xaGroupOps.commit((List) split.f0, this.options.isAllowOutOfOrderCommits(), this.options.getMaxCommitAttempts()).getForRetry());
        }
    }

    private Tuple2<List<CheckpointAndXid>, List<CheckpointAndXid>> split(List<CheckpointAndXid> list, Optional<Long> optional, boolean z) {
        return (Tuple2) optional.map(l -> {
            return split(this.preparedXids, l.longValue(), z);
        }).orElse(new Tuple2(list, new ArrayList()));
    }

    private Tuple2<List<CheckpointAndXid>, List<CheckpointAndXid>> split(List<CheckpointAndXid> list, long j, boolean z) {
        ArrayList arrayList = new ArrayList(list.size() / 2);
        ArrayList arrayList2 = new ArrayList(list.size() / 2);
        list.forEach(checkpointAndXid -> {
            if (checkpointAndXid.checkpointId < j || (checkpointAndXid.checkpointId == j && z)) {
                arrayList.add(checkpointAndXid);
            } else {
                arrayList2.add(checkpointAndXid);
            }
        });
        return new Tuple2<>(arrayList, arrayList2);
    }

    public void setInputType(TypeInformation<?> typeInformation, ExecutionConfig executionConfig) {
        this.serializer = JdbcOutputSerializer.of(typeInformation.createSerializer(executionConfig));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 777675114:
                if (implMethodName.equals("lambda$new$df5f1bb$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/connector/jdbc/internal/JdbcOutputFormat$StatementExecutorFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/flink/connector/jdbc/JdbcStatementBuilder;)Lorg/apache/flink/connector/jdbc/internal/executor/JdbcBatchStatementExecutor;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    JdbcStatementBuilder jdbcStatementBuilder = (JdbcStatementBuilder) serializedLambda.getCapturedArg(1);
                    return () -> {
                        return JdbcBatchStatementExecutor.simple(str, jdbcStatementBuilder);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
