package org.apache.flink.connector.hbase.sink;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.hbase.security.HbaseSecurityModule;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.TableName;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.Connection;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.Mutation;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/hbase/sink/HBaseSinkFunction.class */
public class HBaseSinkFunction<T> extends RichSinkFunction<T> implements CheckpointedFunction, BufferedMutator.ExceptionListener {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(HBaseSinkFunction.class);
    private final String hTableName;
    private final byte[] serializedConfig;
    private final long bufferFlushMaxSizeInBytes;
    private final long bufferFlushMaxMutations;
    private final long bufferFlushIntervalMillis;
    private final long writeTtl;
    private final HBaseMutationConverter<T> mutationConverter;
    private transient Connection connection;
    private transient DeduplicatedMutator mutator;
    private transient ScheduledExecutorService executor;
    private transient ScheduledFuture scheduledFuture;
    private transient AtomicLong numPendingRequests;
    private volatile transient boolean closed = false;
    private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/hbase/sink/HBaseSinkFunction$DeduplicatedMutator.class */
    public static class DeduplicatedMutator {
        private final BufferedMutator mutator;
        private final Map<ByteBuffer, Mutation> mutations;

        DeduplicatedMutator(int i, BufferedMutator bufferedMutator) {
            this.mutator = bufferedMutator;
            this.mutations = new HashMap(i);
        }

        synchronized void mutate(Mutation mutation) {
            ByteBuffer wrap = ByteBuffer.wrap(mutation.getRow());
            Mutation mutation2 = this.mutations.get(wrap);
            if (mutation2 == null || mutation.getTimeStamp() >= mutation2.getTimeStamp()) {
                this.mutations.put(wrap, mutation);
            }
        }

        synchronized void flush() throws IOException {
            this.mutator.mutate(new ArrayList(this.mutations.values()));
            this.mutator.flush();
            this.mutations.clear();
        }

        synchronized void close() throws IOException {
            this.mutator.mutate(new ArrayList(this.mutations.values()));
            this.mutator.close();
            this.mutations.clear();
        }
    }

    public HBaseSinkFunction(String str, Configuration configuration, HBaseMutationConverter<T> hBaseMutationConverter, long j, long j2, long j3, long j4) {
        this.hTableName = str;
        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
        this.mutationConverter = hBaseMutationConverter;
        this.bufferFlushMaxSizeInBytes = j;
        this.bufferFlushMaxMutations = j2;
        this.bufferFlushIntervalMillis = j3;
        this.writeTtl = j4;
    }

    public void open(org.apache.flink.configuration.Configuration configuration) throws Exception {
        LOG.info("start open ...");
        Configuration prepareRuntimeConfiguration = HBaseConfigurationUtil.prepareRuntimeConfiguration(this.serializedConfig);
        try {
            this.mutationConverter.open();
            this.numPendingRequests = new AtomicLong(0L);
            if (null == this.connection) {
                HbaseSecurityModule.createConnection(prepareRuntimeConfiguration, () -> {
                    Connection createConnection = ConnectionFactory.createConnection(prepareRuntimeConfiguration);
                    this.connection = createConnection;
                    return createConnection;
                });
            }
            BufferedMutatorParams listener = new BufferedMutatorParams(TableName.valueOf(this.hTableName)).listener(this);
            if (this.bufferFlushMaxSizeInBytes > 0) {
                listener.writeBufferSize(this.bufferFlushMaxSizeInBytes);
            }
            this.mutator = new DeduplicatedMutator((int) this.bufferFlushMaxMutations, this.connection.getBufferedMutator(listener));
            if (this.bufferFlushIntervalMillis > 0 && this.bufferFlushMaxMutations != 1) {
                this.executor = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
                this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
                    if (this.closed) {
                        return;
                    }
                    try {
                        flush();
                    } catch (Exception e) {
                        this.failureThrowable.compareAndSet(null, e);
                    }
                }, this.bufferFlushIntervalMillis, this.bufferFlushIntervalMillis, TimeUnit.MILLISECONDS);
            }
            LOG.info("end open.");
        } catch (TableNotFoundException e) {
            LOG.error("The table " + this.hTableName + " not found ", e);
            throw new RuntimeException("HBase table '" + this.hTableName + "' not found.", e);
        } catch (IOException e2) {
            LOG.error("Exception while creating connection to HBase.", e2);
            throw new RuntimeException("Cannot create connection to HBase.", e2);
        }
    }

    private void checkErrorAndRethrow() {
        Throwable th = this.failureThrowable.get();
        if (th != null) {
            throw new RuntimeException("An error occurred in HBaseSink.", th);
        }
    }

    public void invoke(T t, SinkFunction.Context context) throws Exception {
        checkErrorAndRethrow();
        Mutation convertToMutation = this.mutationConverter.convertToMutation(t);
        if (this.writeTtl > 0) {
            convertToMutation.setTTL(this.writeTtl);
        }
        if (convertToMutation != null) {
            this.mutator.mutate(convertToMutation);
            if (this.bufferFlushMaxMutations <= 0 || this.numPendingRequests.incrementAndGet() < this.bufferFlushMaxMutations) {
                return;
            }
            flush();
        }
    }

    private void flush() throws IOException {
        this.mutator.flush();
        this.numPendingRequests.set(0L);
        checkErrorAndRethrow();
    }

    public void close() throws Exception {
        this.closed = true;
        if (this.mutator != null) {
            try {
                this.mutator.close();
            } catch (IOException e) {
                LOG.warn("Exception occurs while closing HBase BufferedMutator.", e);
            }
            this.mutator = null;
        }
        if (this.connection != null) {
            try {
                this.connection.close();
            } catch (IOException e2) {
                LOG.warn("Exception occurs while closing HBase Connection.", e2);
            }
            this.connection = null;
        }
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(false);
            if (this.executor != null) {
                this.executor.shutdownNow();
            }
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        while (this.numPendingRequests.get() != 0) {
            flush();
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

    @Override // org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.BufferedMutator.ExceptionListener
    public void onException(RetriesExhaustedWithDetailsException retriesExhaustedWithDetailsException, BufferedMutator bufferedMutator) throws RetriesExhaustedWithDetailsException {
        this.failureThrowable.compareAndSet(null, retriesExhaustedWithDetailsException);
    }
}
