package org.apache.flink.connector.hbase2.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseLookupFunctionUtils;
import org.apache.flink.connector.hbase.util.HBaseSerde;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase.util.RowKeyInfo;
import org.apache.flink.connector.hbase.util.RowKeyPart;
import org.apache.flink.connector.hbase.util.analysis.filter.FilterResult;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.TableName;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.Get;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.Result;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.Scan;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ScanResultConsumer;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.util.Threads;
import org.apache.flink.table.api.LookupKeys;
import org.apache.flink.table.connector.exception.RetryLookupException;
import org.apache.flink.table.connector.source.AbstractAsyncBatchLookupFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.class */
public class HBaseRowDataAsyncLookupFunction extends AbstractAsyncBatchLookupFunction<Result> {
    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
    private static final long serialVersionUID = 2;
    private final String hTableName;
    private final byte[] serializedConfig;
    private final HBaseTableSchema hbaseTableSchema;
    private final String nullStringLiteral;
    private final List<RowKeyPart> lookupKeyFields;
    private final FilterResult filterResult;
    private final RowKeyInfo rowKeyInfo;
    private transient AsyncConnection asyncConnection;
    private transient AsyncTable<ScanResultConsumer> table;
    private transient HBaseSerde serde;
    private boolean fullKeyJoin;
    private static final int THREAD_POOL_SIZE = 16;

    /* loaded from: input_file:org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction$LookupScanResultConsumer.class */
    public class LookupScanResultConsumer implements ScanResultConsumer {
        private final List<RowData> resultRows = new ArrayList();
        private final CompletableFuture<Collection<RowData>> resultFuture;
        private final Object[] lookupKeysValues;

        public LookupScanResultConsumer(CompletableFuture<Collection<RowData>> completableFuture, Object[] objArr) {
            this.resultFuture = completableFuture;
            this.lookupKeysValues = objArr;
        }

        @Override // org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ScanResultConsumer
        public boolean onNext(Result result) {
            try {
                if (result.isEmpty()) {
                    return true;
                }
                Object[] deserializeRowKey = HBaseRowDataAsyncLookupFunction.this.serde.deserializeRowKey(result.getRow());
                if (!HBaseLookupFunctionUtils.keyPartsMatches(deserializeRowKey, HBaseRowDataAsyncLookupFunction.this.lookupKeyFields, this.lookupKeysValues)) {
                    return true;
                }
                this.resultRows.add(HBaseRowDataAsyncLookupFunction.this.serde.convertToNewRow(result, HBaseRowDataAsyncLookupFunction.this.rowKeyInfo.getKeys(), deserializeRowKey));
                return true;
            } catch (Throwable th) {
                this.resultFuture.completeExceptionally(th);
                return false;
            }
        }

        @Override // org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ScanResultConsumerBase
        public void onError(Throwable th) {
            HBaseRowDataAsyncLookupFunction.this.handleError(th, this.resultFuture);
        }

        @Override // org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ScanResultConsumerBase
        public void onComplete() {
            this.resultFuture.complete(this.resultRows);
        }
    }

    public HBaseRowDataAsyncLookupFunction(Configuration configuration, String str, HBaseTableSchema hBaseTableSchema, List<RowKeyPart> list, FilterResult filterResult, String str2) {
        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
        this.hTableName = str;
        this.hbaseTableSchema = hBaseTableSchema;
        this.rowKeyInfo = hBaseTableSchema.getRowKeyInfo();
        this.lookupKeyFields = list;
        this.filterResult = filterResult;
        this.nullStringLiteral = str2;
    }

    public void open(FunctionContext functionContext) {
        LOG.info("start open ...");
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(16, new ExecutorThreadFactory("hbase-async-lookup-worker", Threads.LOGGING_EXCEPTION_HANDLER));
        try {
            this.asyncConnection = ConnectionFactory.createAsyncConnection(HBaseLookupFunctionUtils.prepareRuntimeConfiguration(this.serializedConfig)).get();
            this.table = this.asyncConnection.getTable(TableName.valueOf(this.hTableName), newFixedThreadPool);
            this.serde = new HBaseSerde(this.hbaseTableSchema, this.nullStringLiteral);
            this.fullKeyJoin = this.rowKeyInfo.getAllKeys().size() == this.lookupKeyFields.size();
            LOG.info("end open.");
        } catch (InterruptedException | ExecutionException e) {
            LOG.error("Exception while creating connection to HBase.", e);
            throw new RuntimeException("Cannot create connection to HBase.", e);
        }
    }

    public void eval(CompletableFuture<Collection<RowData>> completableFuture, Object... objArr) {
        if (this.fullKeyJoin) {
            this.table.get(this.serde.createGet(this.lookupKeyFields, objArr)).whenCompleteAsync((result, th) -> {
                try {
                    if (th != null) {
                        handleError(th, completableFuture);
                    } else {
                        completableFuture.complete(result.isEmpty() ? Collections.emptyList() : Collections.singletonList(this.serde.convertToNewRow(result, this.lookupKeyFields, objArr)));
                    }
                } catch (Throwable th) {
                    completableFuture.completeExceptionally(th);
                }
            });
            return;
        }
        Scan createScan = this.serde.createScan();
        if (this.filterResult != null && this.filterResult.getFilter() != null) {
            createScan.setFilter(this.filterResult.getFilter());
        }
        RowKeyInfo rowKeyInfo = this.rowKeyInfo;
        List<RowKeyPart> list = this.lookupKeyFields;
        HBaseSerde hBaseSerde = this.serde;
        hBaseSerde.getClass();
        HBaseLookupFunctionUtils.setScanRange(rowKeyInfo, createScan, list, objArr, hBaseSerde::serializeRowKeyPart);
        this.table.scan(createScan, new LookupScanResultConsumer(completableFuture, objArr));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleError(Throwable th, CompletableFuture<Collection<RowData>> completableFuture) {
        if (th instanceof TableNotFoundException) {
            LOG.error("Table '{}' not found ", this.hTableName, th);
            completableFuture.completeExceptionally(new RuntimeException("HBase table '" + this.hTableName + "' not found.", th));
        } else {
            if (!(th instanceof IOException)) {
                completableFuture.completeExceptionally(th);
                return;
            }
            Object[] objArr = new Object[1];
            objArr[0] = this.fullKeyJoin ? "Get" : "Scan";
            completableFuture.completeExceptionally(new RetryLookupException(String.format("HBase '%s' operation failed.", objArr), th));
        }
    }

    public void close() {
        LOG.info("start close ...");
        if (null != this.table) {
            this.table = null;
        }
        if (null != this.asyncConnection) {
            try {
                this.asyncConnection.close();
                this.asyncConnection = null;
            } catch (IOException e) {
                LOG.warn("exception when close connection", e);
            }
        }
        LOG.info("end close.");
    }

    @VisibleForTesting
    public String getHTableName() {
        return this.hTableName;
    }

    public void eval(CompletableFuture<Collection<RowData>> completableFuture, List<LookupKeys> list) {
        List<Get> list2 = (List) list.stream().map((v0) -> {
            return v0.getKeys();
        }).map(objArr -> {
            return this.serde.createGet(this.lookupKeyFields, objArr);
        }).collect(Collectors.toList());
        if (list2.isEmpty()) {
            completableFuture.complete(Collections.emptyList());
        } else {
            this.table.getAll(list2).whenCompleteAsync((list3, th) -> {
                try {
                    if (th != null) {
                        handleError(th, completableFuture);
                    } else {
                        completableFuture.complete(list3.isEmpty() ? Collections.emptyList() : asBatchResultRows(list, list3));
                    }
                } catch (Throwable th) {
                    completableFuture.completeExceptionally(th);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<RowData> convertResultToRows(Object[] objArr, Result result) {
        if (result.isEmpty()) {
            return null;
        }
        return Collections.singletonList(this.serde.convertToNewRow(result, this.lookupKeyFields, objArr));
    }
}
