package org.apache.flink.connector.pulsar.sink.committer;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/pulsar/sink/committer/PulsarCommitter.class */
public class PulsarCommitter implements Committer<PulsarCommittable>, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarCommitter.class);
    private final SinkConfiguration sinkConfiguration;
    private PulsarClient pulsarClient;
    private TransactionCoordinatorClient coordinatorClient;

    public PulsarCommitter(SinkConfiguration sinkConfiguration) {
        this.sinkConfiguration = (SinkConfiguration) Preconditions.checkNotNull(sinkConfiguration);
    }

    public void commit(Collection<Committer.CommitRequest<PulsarCommittable>> collection) throws PulsarClientException {
        TransactionCoordinatorClient transactionCoordinatorClient = transactionCoordinatorClient();
        for (Committer.CommitRequest<PulsarCommittable> commitRequest : collection) {
            PulsarCommittable pulsarCommittable = (PulsarCommittable) commitRequest.getCommittable();
            TxnID txnID = pulsarCommittable.getTxnID();
            String topic = pulsarCommittable.getTopic();
            LOG.debug("Start committing the Pulsar transaction {} for topic {}", txnID, topic);
            try {
                transactionCoordinatorClient.commit(txnID);
            } catch (Exception e) {
                LOG.error("Transaction ({}) encountered unknown error and data could be potentially lost.", pulsarCommittable, e);
                commitRequest.signalFailedWithUnknownReason(e);
            } catch (TransactionCoordinatorClientException.CoordinatorNotFoundException e2) {
                LOG.error("We couldn't find the Transaction Coordinator from Pulsar broker {}. Check your broker configuration.", pulsarCommittable, e2);
                commitRequest.signalFailedWithKnownReason(e2);
            } catch (TransactionCoordinatorClientException e3) {
                LOG.error("Encountered retriable exception while committing transaction {} for topic {}.", new Object[]{pulsarCommittable, topic, e3});
                int maxRecommitTimes = this.sinkConfiguration.getMaxRecommitTimes();
                if (commitRequest.getNumberOfRetries() < maxRecommitTimes) {
                    commitRequest.retryLater();
                } else {
                    commitRequest.signalFailedWithKnownReason(new FlinkRuntimeException(String.format("Failed to commit transaction %s after retrying %d times", txnID, Integer.valueOf(maxRecommitTimes)), e3));
                }
            } catch (TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException e4) {
                LOG.error("We can't find the meta store handler by the mostSigBits from TxnID {}. Did you change the metadata for topic {}?", new Object[]{pulsarCommittable, SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, e4});
                commitRequest.signalFailedWithKnownReason(e4);
            } catch (TransactionCoordinatorClientException.InvalidTxnStatusException e5) {
                LOG.error("Unable to commit transaction ({}) because it's in an invalid state. Most likely the transaction has been aborted for some reason. Please check the Pulsar broker logs for more details.", pulsarCommittable, e5);
                commitRequest.signalAlreadyCommitted();
            } catch (TransactionCoordinatorClientException.TransactionNotFoundException e6) {
                if (commitRequest.getNumberOfRetries() == 0) {
                    LOG.error("Unable to commit transaction ({}) because it's not found on Pulsar broker. Most likely the checkpoint interval exceed the transaction timeout.", pulsarCommittable, e6);
                    commitRequest.signalFailedWithKnownReason(e6);
                } else {
                    LOG.warn("We can't find the transaction {} after {} retry committing. This may mean that the transaction have been committed in previous but failed with timeout. So we just mark it as committed.", txnID, Integer.valueOf(commitRequest.getNumberOfRetries()));
                    commitRequest.signalAlreadyCommitted();
                }
            }
        }
    }

    private TransactionCoordinatorClient transactionCoordinatorClient() throws PulsarClientException {
        if (this.coordinatorClient == null) {
            this.pulsarClient = PulsarClientFactory.createClient(this.sinkConfiguration);
            this.coordinatorClient = PulsarTransactionUtils.getTcClient(this.pulsarClient);
        }
        return this.coordinatorClient;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.pulsarClient != null) {
            this.pulsarClient.close();
        }
    }
}
