package org.apache.flink.connector.hbase2.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.RetryableAsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.RetryableRequest;
import org.apache.flink.connector.hbase.security.HbaseSecurityModule;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.TableName;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncBufferedMutator;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/hbase2/sink/HBaseAsyncSinkWriter.class */
public class HBaseAsyncSinkWriter<T> extends RetryableAsyncSinkWriter<T, Mutation> {
    private static final Logger LOG = LoggerFactory.getLogger(HBaseAsyncSinkWriter.class);
    private transient AsyncConnection asyncConnection;
    private final AsyncBufferedMutator asyncMutator;
    private final boolean snapshotsEnabled;
    private final long writeTtlMs;

    public HBaseAsyncSinkWriter(ElementConverter<T, RetryableRequest<Mutation>> elementConverter, Sink.InitContext initContext, String str, Configuration configuration, boolean z, long j, int i, int i2, int i3, long j2, long j3, long j4, int i4, Collection<BufferedRequestState<RetryableRequest<Mutation>>> collection) {
        super(elementConverter, initContext, i, i2, i3, j2, j3, j4, i4, collection);
        this.snapshotsEnabled = z;
        this.writeTtlMs = j;
        HbaseSecurityModule.createConnection(configuration, () -> {
            AsyncConnection asyncConnection = ConnectionFactory.createAsyncConnection(configuration).get();
            this.asyncConnection = asyncConnection;
            return asyncConnection;
        });
        this.asyncMutator = this.asyncConnection.getBufferedMutatorBuilder(TableName.valueOf(str)).disableWriteBufferPeriodicFlush().setWriteBufferSize(j2).build();
    }

    public List<BufferedRequestState<RetryableRequest<Mutation>>> snapshotState(long j) {
        return this.snapshotsEnabled ? super.snapshotState(j) : Collections.emptyList();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getRequestSizeInBytes(Mutation mutation) {
        return mutation.heapSize();
    }

    protected void submitRequestEntries(List<RetryableRequest<Mutation>> list, Consumer<List<RetryableRequest<Mutation>>> consumer) {
        Stream map = list.stream().map((v0) -> {
            return v0.getRequest();
        });
        if (this.writeTtlMs > 0) {
            map = map.map(mutation -> {
                return mutation.setTTL(this.writeTtlMs);
            });
        }
        List<? extends Mutation> list2 = (List) map.collect(Collectors.toList());
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Iterator<RetryableRequest<Mutation>> it = list.iterator();
        for (CompletableFuture<Void> completableFuture : this.asyncMutator.mutate(list2)) {
            RetryableRequest<Mutation> next = it.next();
            arrayList2.add(completableFuture.whenComplete((r6, th) -> {
                if (th != null) {
                    LOG.error("Error during HBase batch put", th);
                    arrayList.add(next);
                }
            }));
        }
        CompletableFuture.allOf((CompletableFuture[]) arrayList2.toArray(new CompletableFuture[list2.size()])).whenComplete((r5, th2) -> {
            if (th2 != null) {
                consumer.accept(arrayList);
            } else {
                consumer.accept(Collections.emptyList());
            }
        });
    }

    protected void flush() throws InterruptedException {
        super.flush();
        this.asyncMutator.flush();
    }

    public void close() {
        super.close();
        if (this.asyncMutator != null) {
            this.asyncMutator.close();
        }
        if (this.asyncConnection != null) {
            try {
                this.asyncConnection.close();
            } catch (IOException e) {
                LOG.warn("Exception occurs while closing HBase Connection.", e);
            }
        }
    }
}
