package org.apache.flink.streaming.api.functions.sink;

import java.io.IOException;
import java.time.Clock;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.math3.optimization.direct.CMAESOptimizer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.class */
public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichSinkFunction<IN> implements CheckpointedFunction, CheckpointListener {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TwoPhaseCommitSinkFunction.class);
    protected final LinkedHashMap<Long, TransactionHolder<TXN>> pendingCommitTransactions;
    protected transient Optional<CONTEXT> userContext;
    protected transient ListState<State<TXN, CONTEXT>> state;
    private final Clock clock;
    private final ListStateDescriptor<State<TXN, CONTEXT>> stateDescriptor;
    private TransactionHolder<TXN> currentTransactionHolder;
    private long transactionTimeout;
    private boolean ignoreFailuresAfterTransactionTimeout;
    private double transactionTimeoutWarningRatio;

    @VisibleForTesting
    @Internal
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction$State.class */
    public static final class State<TXN, CONTEXT> {
        protected TransactionHolder<TXN> pendingTransaction;
        protected List<TransactionHolder<TXN>> pendingCommitTransactions;
        protected Optional<CONTEXT> context;

        public State() {
            this.pendingCommitTransactions = new ArrayList();
        }

        public State(TransactionHolder<TXN> transactionHolder, List<TransactionHolder<TXN>> list, Optional<CONTEXT> optional) {
            this.pendingCommitTransactions = new ArrayList();
            this.context = (Optional) Objects.requireNonNull(optional, "context is null");
            this.pendingTransaction = (TransactionHolder) Objects.requireNonNull(transactionHolder, "pendingTransaction is null");
            this.pendingCommitTransactions = (List) Objects.requireNonNull(list, "pendingCommitTransactions is null");
        }

        public TransactionHolder<TXN> getPendingTransaction() {
            return this.pendingTransaction;
        }

        public void setPendingTransaction(TransactionHolder<TXN> transactionHolder) {
            this.pendingTransaction = transactionHolder;
        }

        public List<TransactionHolder<TXN>> getPendingCommitTransactions() {
            return this.pendingCommitTransactions;
        }

        public void setPendingCommitTransactions(List<TransactionHolder<TXN>> list) {
            this.pendingCommitTransactions = list;
        }

        public Optional<CONTEXT> getContext() {
            return this.context;
        }

        public void setContext(Optional<CONTEXT> optional) {
            this.context = optional;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            State state = (State) obj;
            if (this.pendingTransaction != null) {
                if (!this.pendingTransaction.equals(state.pendingTransaction)) {
                    return false;
                }
            } else if (state.pendingTransaction != null) {
                return false;
            }
            if (this.pendingCommitTransactions != null) {
                if (!this.pendingCommitTransactions.equals(state.pendingCommitTransactions)) {
                    return false;
                }
            } else if (state.pendingCommitTransactions != null) {
                return false;
            }
            return this.context != null ? this.context.equals(state.context) : state.context == null;
        }

        public int hashCode() {
            return (31 * ((31 * (this.pendingTransaction != null ? this.pendingTransaction.hashCode() : 0)) + (this.pendingCommitTransactions != null ? this.pendingCommitTransactions.hashCode() : 0))) + (this.context != null ? this.context.hashCode() : 0);
        }
    }

    @VisibleForTesting
    @Internal
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction$StateSerializer.class */
    public static final class StateSerializer<TXN, CONTEXT> extends TypeSerializer<State<TXN, CONTEXT>> {
        private static final long serialVersionUID = 1;
        private final TypeSerializer<TXN> transactionSerializer;
        private final TypeSerializer<CONTEXT> contextSerializer;

        public StateSerializer(TypeSerializer<TXN> typeSerializer, TypeSerializer<CONTEXT> typeSerializer2) {
            this.transactionSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
            this.contextSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer2);
        }

        @Override // org.apache.flink.api.common.typeutils.TypeSerializer
        public boolean isImmutableType() {
            return this.transactionSerializer.isImmutableType() && this.contextSerializer.isImmutableType();
        }

        @Override // org.apache.flink.api.common.typeutils.TypeSerializer
        /* renamed from: duplicate */
        public TypeSerializer<State<TXN, CONTEXT>> duplicate2() {
            return new StateSerializer(this.transactionSerializer.duplicate2(), this.contextSerializer.duplicate2());
        }

        @Override // org.apache.flink.api.common.typeutils.TypeSerializer
        /* renamed from: createInstance */
        public State<TXN, CONTEXT> mo4656createInstance() {
            return null;
        }

        @Override // org.apache.flink.api.common.typeutils.TypeSerializer
        public State<TXN, CONTEXT> copy(State<TXN, CONTEXT> state) {
            TransactionHolder<TXN> pendingTransaction = state.getPendingTransaction();
            TransactionHolder transactionHolder = new TransactionHolder(this.transactionSerializer.copy(((TransactionHolder) pendingTransaction).handle), ((TransactionHolder) pendingTransaction).transactionStartTime);
            ArrayList arrayList = new ArrayList();
            for (TransactionHolder<TXN> transactionHolder2 : state.getPendingCommitTransactions()) {
                arrayList.add(new TransactionHolder(this.transactionSerializer.copy(((TransactionHolder) transactionHolder2).handle), ((TransactionHolder) transactionHolder2).transactionStartTime));
            }
            Optional<CONTEXT> context = state.getContext();
            TypeSerializer<CONTEXT> typeSerializer = this.contextSerializer;
            typeSerializer.getClass();
            return new State<>(transactionHolder, arrayList, context.map(typeSerializer::copy));
        }

        @Override // org.apache.flink.api.common.typeutils.TypeSerializer
        public State<TXN, CONTEXT> copy(State<TXN, CONTEXT> state, State<TXN, CONTEXT> state2) {
            return copy((State) state);
        }

        @Override // org.apache.flink.api.common.typeutils.TypeSerializer
        public int getLength() {
            return -1;
        }

        @Override // org.apache.flink.api.common.typeutils.TypeSerializer
        public void serialize(State<TXN, CONTEXT> state, DataOutputView dataOutputView) throws IOException {
            TransactionHolder<TXN> pendingTransaction = state.getPendingTransaction();
            this.transactionSerializer.serialize(((TransactionHolder) pendingTransaction).handle, dataOutputView);
            dataOutputView.writeLong(((TransactionHolder) pendingTransaction).transactionStartTime);
            List<TransactionHolder<TXN>> pendingCommitTransactions = state.getPendingCommitTransactions();
            dataOutputView.writeInt(pendingCommitTransactions.size());
            for (TransactionHolder<TXN> transactionHolder : pendingCommitTransactions) {
                this.transactionSerializer.serialize(((TransactionHolder) transactionHolder).handle, dataOutputView);
                dataOutputView.writeLong(((TransactionHolder) transactionHolder).transactionStartTime);
            }
            Optional<CONTEXT> context = state.getContext();
            if (!context.isPresent()) {
                dataOutputView.writeBoolean(false);
            } else {
                dataOutputView.writeBoolean(true);
                this.contextSerializer.serialize(context.get(), dataOutputView);
            }
        }

        @Override // org.apache.flink.api.common.typeutils.TypeSerializer
        /* renamed from: deserialize */
        public State<TXN, CONTEXT> mo4655deserialize(DataInputView dataInputView) throws IOException {
            TransactionHolder transactionHolder = new TransactionHolder(this.transactionSerializer.mo4655deserialize(dataInputView), dataInputView.readLong());
            int readInt = dataInputView.readInt();
            ArrayList arrayList = new ArrayList(readInt);
            for (int i = 0; i < readInt; i++) {
                arrayList.add(new TransactionHolder(this.transactionSerializer.mo4655deserialize(dataInputView), dataInputView.readLong()));
            }
            Optional empty = Optional.empty();
            if (dataInputView.readBoolean()) {
                empty = Optional.of(this.contextSerializer.mo4655deserialize(dataInputView));
            }
            return new State<>(transactionHolder, arrayList, empty);
        }

        @Override // org.apache.flink.api.common.typeutils.TypeSerializer
        public State<TXN, CONTEXT> deserialize(State<TXN, CONTEXT> state, DataInputView dataInputView) throws IOException {
            return mo4655deserialize(dataInputView);
        }

        @Override // org.apache.flink.api.common.typeutils.TypeSerializer
        public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
            this.transactionSerializer.serialize(this.transactionSerializer.mo4655deserialize(dataInputView), dataOutputView);
            dataOutputView.writeLong(dataInputView.readLong());
            int readInt = dataInputView.readInt();
            dataOutputView.writeInt(readInt);
            for (int i = 0; i < readInt; i++) {
                this.transactionSerializer.serialize(this.transactionSerializer.mo4655deserialize(dataInputView), dataOutputView);
                dataOutputView.writeLong(dataInputView.readLong());
            }
            boolean readBoolean = dataInputView.readBoolean();
            dataOutputView.writeBoolean(readBoolean);
            if (readBoolean) {
                this.contextSerializer.serialize(this.contextSerializer.mo4655deserialize(dataInputView), dataOutputView);
            }
        }

        @Override // org.apache.flink.api.common.typeutils.TypeSerializer
        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            StateSerializer stateSerializer = (StateSerializer) obj;
            if (this.transactionSerializer.equals(stateSerializer.transactionSerializer)) {
                return this.contextSerializer.equals(stateSerializer.contextSerializer);
            }
            return false;
        }

        @Override // org.apache.flink.api.common.typeutils.TypeSerializer
        public int hashCode() {
            return (31 * this.transactionSerializer.hashCode()) + this.contextSerializer.hashCode();
        }

        @Override // org.apache.flink.api.common.typeutils.TypeSerializer
        /* renamed from: snapshotConfiguration */
        public StateSerializerSnapshot<TXN, CONTEXT> snapshotConfiguration2() {
            return new StateSerializerSnapshot<>(this);
        }
    }

    @Internal
    @Deprecated
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction$StateSerializerConfigSnapshot.class */
    public static final class StateSerializerConfigSnapshot<TXN, CONTEXT> extends CompositeTypeSerializerConfigSnapshot<State<TXN, CONTEXT>> {
        private static final int VERSION = 1;

        public StateSerializerConfigSnapshot() {
        }

        public StateSerializerConfigSnapshot(TypeSerializer<TXN> typeSerializer, TypeSerializer<CONTEXT> typeSerializer2) {
            super(typeSerializer, typeSerializer2);
        }

        @Override // org.apache.flink.core.io.Versioned
        public int getVersion() {
            return 1;
        }

        @Override // org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot, org.apache.flink.api.common.typeutils.TypeSerializerSnapshot
        public TypeSerializerSchemaCompatibility<State<TXN, CONTEXT>> resolveSchemaCompatibility(TypeSerializer<State<TXN, CONTEXT>> typeSerializer) {
            return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(typeSerializer, new StateSerializerSnapshot(), (TypeSerializerSnapshot[]) getNestedSerializersAndConfigs().stream().map(tuple2 -> {
                return (TypeSerializerSnapshot) tuple2.f1;
            }).toArray(i -> {
                return new TypeSerializerSnapshot[i];
            }));
        }
    }

    @Internal
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction$StateSerializerSnapshot.class */
    public static final class StateSerializerSnapshot<TXN, CONTEXT> extends CompositeTypeSerializerSnapshot<State<TXN, CONTEXT>, StateSerializer<TXN, CONTEXT>> {
        private static final int VERSION = 2;

        public StateSerializerSnapshot() {
            super((Class<? extends TypeSerializer>) StateSerializer.class);
        }

        StateSerializerSnapshot(StateSerializer<TXN, CONTEXT> stateSerializer) {
            super(stateSerializer);
        }

        @Override // org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot
        protected int getCurrentOuterSnapshotVersion() {
            return 2;
        }

        @Override // org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot
        protected StateSerializer<TXN, CONTEXT> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] typeSerializerArr) {
            return new StateSerializer<>(typeSerializerArr[0], typeSerializerArr[1]);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot
        public TypeSerializer<?>[] getNestedSerializers(StateSerializer<TXN, CONTEXT> stateSerializer) {
            return new TypeSerializer[]{((StateSerializer) stateSerializer).transactionSerializer, ((StateSerializer) stateSerializer).contextSerializer};
        }

        @Override // org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot
        protected /* bridge */ /* synthetic */ TypeSerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] typeSerializerArr) {
            return createOuterSerializerWithNestedSerializers((TypeSerializer<?>[]) typeSerializerArr);
        }
    }

    @VisibleForTesting
    @Internal
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction$TransactionHolder.class */
    public static final class TransactionHolder<TXN> {
        private final TXN handle;
        private final long transactionStartTime;

        @VisibleForTesting
        public TransactionHolder(TXN txn, long j) {
            this.handle = txn;
            this.transactionStartTime = j;
        }

        long elapsedTime(Clock clock) {
            return clock.millis() - this.transactionStartTime;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            TransactionHolder transactionHolder = (TransactionHolder) obj;
            if (this.transactionStartTime != transactionHolder.transactionStartTime) {
                return false;
            }
            return this.handle != null ? this.handle.equals(transactionHolder.handle) : transactionHolder.handle == null;
        }

        public int hashCode() {
            return (31 * (this.handle != null ? this.handle.hashCode() : 0)) + ((int) (this.transactionStartTime ^ (this.transactionStartTime >>> 32)));
        }

        public String toString() {
            return "TransactionHolder{handle=" + this.handle + ", transactionStartTime=" + this.transactionStartTime + '}';
        }
    }

    public TwoPhaseCommitSinkFunction(TypeSerializer<TXN> typeSerializer, TypeSerializer<CONTEXT> typeSerializer2) {
        this(typeSerializer, typeSerializer2, Clock.systemUTC());
    }

    @VisibleForTesting
    TwoPhaseCommitSinkFunction(TypeSerializer<TXN> typeSerializer, TypeSerializer<CONTEXT> typeSerializer2, Clock clock) {
        this.pendingCommitTransactions = new LinkedHashMap<>();
        this.transactionTimeout = CheckpointOptions.NO_ALIGNMENT_TIME_OUT;
        this.transactionTimeoutWarningRatio = -1.0d;
        this.stateDescriptor = new ListStateDescriptor<>(JobDetailsInfo.FIELD_NAME_JOB_STATUS, new StateSerializer(typeSerializer, typeSerializer2));
        this.clock = clock;
    }

    protected Optional<CONTEXT> initializeUserContext() {
        return Optional.empty();
    }

    protected Optional<CONTEXT> getUserContext() {
        return this.userContext;
    }

    @Nullable
    protected TXN currentTransaction() {
        if (this.currentTransactionHolder == null) {
            return null;
        }
        return (TXN) ((TransactionHolder) this.currentTransactionHolder).handle;
    }

    @Nonnull
    protected Stream<Map.Entry<Long, TXN>> pendingTransactions() {
        return (Stream<Map.Entry<Long, TXN>>) this.pendingCommitTransactions.entrySet().stream().map(entry -> {
            return new AbstractMap.SimpleEntry(entry.getKey(), ((TransactionHolder) entry.getValue()).handle);
        });
    }

    protected abstract void invoke(TXN txn, IN in, SinkFunction.Context context) throws Exception;

    protected abstract TXN beginTransaction() throws Exception;

    protected abstract void preCommit(TXN txn) throws Exception;

    protected abstract void commit(TXN txn);

    protected void recoverAndCommit(TXN txn) {
        commit(txn);
    }

    protected abstract void abort(TXN txn);

    protected void recoverAndAbort(TXN txn) {
        abort(txn);
    }

    protected void finishRecoveringContext(Collection<TXN> collection) {
    }

    @Override // org.apache.flink.streaming.api.functions.sink.SinkFunction
    public final void invoke(IN in) throws Exception {
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.api.functions.sink.SinkFunction
    public final void invoke(IN in, SinkFunction.Context context) throws Exception {
        invoke(((TransactionHolder) this.currentTransactionHolder).handle, in, context);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.api.common.state.CheckpointListener
    public final void notifyCheckpointComplete(long j) throws Exception {
        Iterator<Map.Entry<Long, TransactionHolder<TXN>>> it = this.pendingCommitTransactions.entrySet().iterator();
        Throwable th = null;
        while (it.hasNext()) {
            Map.Entry<Long, TransactionHolder<TXN>> next = it.next();
            Long key = next.getKey();
            TransactionHolder<TXN> value = next.getValue();
            if (key.longValue() <= j) {
                LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}", name(), Long.valueOf(j), value, key);
                logWarningIfTimeoutAlmostReached(value);
                try {
                    commit(((TransactionHolder) value).handle);
                } catch (Throwable th2) {
                    if (th == null) {
                        th = th2;
                    }
                }
                LOG.debug("{} - committed checkpoint transaction {}", name(), value);
                it.remove();
            }
        }
        if (th != null) {
            throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure", th);
        }
    }

    @Override // org.apache.flink.api.common.state.CheckpointListener
    public void notifyCheckpointAborted(long j) {
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        Preconditions.checkState(this.currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");
        long checkpointId = functionSnapshotContext.getCheckpointId();
        LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), Long.valueOf(functionSnapshotContext.getCheckpointId()), this.currentTransactionHolder);
        preCommit(((TransactionHolder) this.currentTransactionHolder).handle);
        this.pendingCommitTransactions.put(Long.valueOf(checkpointId), this.currentTransactionHolder);
        LOG.debug("{} - stored pending transactions {}", name(), this.pendingCommitTransactions);
        this.currentTransactionHolder = beginTransactionInternal();
        LOG.debug("{} - started new transaction '{}'", name(), this.currentTransactionHolder);
        this.state.clear();
        this.state.add(new State(this.currentTransactionHolder, new ArrayList(this.pendingCommitTransactions.values()), this.userContext));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.state = functionInitializationContext.getOperatorStateStore().getListState(this.stateDescriptor);
        boolean z = false;
        if (functionInitializationContext.isRestored()) {
            LOG.info("{} - restoring state", name());
            for (State state : this.state.get()) {
                this.userContext = state.getContext();
                List<TransactionHolder<TXN>> pendingCommitTransactions = state.getPendingCommitTransactions();
                ArrayList arrayList = new ArrayList(pendingCommitTransactions.size() + 1);
                for (TransactionHolder<TXN> transactionHolder : pendingCommitTransactions) {
                    recoverAndCommitInternal(transactionHolder);
                    arrayList.add(((TransactionHolder) transactionHolder).handle);
                    LOG.info("{} committed recovered transaction {}", name(), transactionHolder);
                }
                Object obj = ((TransactionHolder) state.getPendingTransaction()).handle;
                recoverAndAbort(obj);
                arrayList.add(obj);
                LOG.info("{} aborted recovered transaction {}", name(), state.getPendingTransaction());
                if (this.userContext.isPresent()) {
                    finishRecoveringContext(arrayList);
                    z = true;
                }
            }
        }
        if (!z) {
            LOG.info("{} - no state to restore", name());
            this.userContext = initializeUserContext();
        }
        this.pendingCommitTransactions.clear();
        this.currentTransactionHolder = beginTransactionInternal();
        LOG.debug("{} - started new transaction '{}'", name(), this.currentTransactionHolder);
    }

    private TransactionHolder<TXN> beginTransactionInternal() throws Exception {
        return new TransactionHolder<>(beginTransaction(), this.clock.millis());
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void recoverAndCommitInternal(TransactionHolder<TXN> transactionHolder) {
        try {
            logWarningIfTimeoutAlmostReached(transactionHolder);
            recoverAndCommit(((TransactionHolder) transactionHolder).handle);
        } catch (Exception e) {
            long millis = this.clock.millis() - ((TransactionHolder) transactionHolder).transactionStartTime;
            if (!this.ignoreFailuresAfterTransactionTimeout || millis <= this.transactionTimeout) {
                throw e;
            }
            LOG.error("Error while committing transaction {}. Transaction has been open for longer than the transaction timeout ({}).Commit will not be attempted again. Data loss might have occurred.", ((TransactionHolder) transactionHolder).handle, Long.valueOf(this.transactionTimeout), e);
        }
    }

    private void logWarningIfTimeoutAlmostReached(TransactionHolder<TXN> transactionHolder) {
        long elapsedTime = transactionHolder.elapsedTime(this.clock);
        if (this.transactionTimeoutWarningRatio < CMAESOptimizer.DEFAULT_STOPFITNESS || elapsedTime <= this.transactionTimeout * this.transactionTimeoutWarningRatio) {
            return;
        }
        LOG.warn("Transaction {} has been open for {} ms. This is close to or even exceeding the transaction timeout of {} ms.", ((TransactionHolder) transactionHolder).handle, Long.valueOf(elapsedTime), Long.valueOf(this.transactionTimeout));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void close() throws Exception {
        super.close();
        if (this.currentTransactionHolder != null) {
            abort(((TransactionHolder) this.currentTransactionHolder).handle);
            this.currentTransactionHolder = null;
        }
    }

    protected TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> setTransactionTimeout(long j) {
        Preconditions.checkArgument(j >= 0, "transactionTimeout must not be negative");
        this.transactionTimeout = j;
        return this;
    }

    protected TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> ignoreFailuresAfterTransactionTimeout() {
        this.ignoreFailuresAfterTransactionTimeout = true;
        return this;
    }

    protected TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> enableTransactionTimeoutWarnings(double d) {
        Preconditions.checkArgument(d >= CMAESOptimizer.DEFAULT_STOPFITNESS && d <= 1.0d, "warningRatio must be in range [0,1]");
        this.transactionTimeoutWarningRatio = d;
        return this;
    }

    private String name() {
        return String.format("%s %s/%s", getClass().getSimpleName(), Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask() + 1), Integer.valueOf(getRuntimeContext().getNumberOfParallelSubtasks()));
    }
}
