package org.apache.flink.connector.hbase.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.hbase.security.HbaseSecurityModule;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseLookupFunctionUtils;
import org.apache.flink.connector.hbase.util.HBaseSerde;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase.util.RowKeyInfo;
import org.apache.flink.connector.hbase.util.RowKeyPart;
import org.apache.flink.connector.hbase.util.analysis.filter.FilterResult;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.TableName;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.Connection;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.Get;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.HTable;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.Result;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.Scan;
import org.apache.flink.table.api.LookupKeys;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.AbstractBatchLookupFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.class */
public class HBaseRowDataLookupFunction extends AbstractBatchLookupFunction {
    private static final Logger LOG = LoggerFactory.getLogger(HBaseRowDataLookupFunction.class);
    private static final long serialVersionUID = 1;
    private final String hTableName;
    private final byte[] serializedConfig;
    private final HBaseTableSchema hbaseTableSchema;
    private final String nullStringLiteral;
    private final List<RowKeyPart> lookupKeyFields;
    private final FilterResult filterResult;
    private final RowKeyInfo rowKeyInfo;
    private transient Connection hConnection;
    private transient HTable table;
    private transient HBaseSerde serde;
    private transient Scan scan;
    private final int maxRetryTimes;
    private final boolean reuseRow;
    private boolean fullKeyJoin;

    public HBaseRowDataLookupFunction(Configuration configuration, String str, HBaseTableSchema hBaseTableSchema, List<RowKeyPart> list, FilterResult filterResult, String str2, int i, boolean z) {
        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(configuration);
        this.hTableName = str;
        this.hbaseTableSchema = hBaseTableSchema;
        this.rowKeyInfo = hBaseTableSchema.getRowKeyInfo();
        this.lookupKeyFields = list;
        this.filterResult = filterResult;
        this.nullStringLiteral = str2;
        this.maxRetryTimes = i;
        this.reuseRow = z;
    }

    private <T> T withRetry(SupplierWithException<T, IOException> supplierWithException) {
        for (int i = 0; i <= this.maxRetryTimes; i++) {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                T t = (T) supplierWithException.get();
                this.lookupRequestTimeHist.update(System.currentTimeMillis() - currentTimeMillis);
                return t;
            } catch (IOException e) {
                this.failsCounter.inc();
                LOG.error(String.format("HBase lookup error, retry times = %d", Integer.valueOf(i)), e);
                if (i >= this.maxRetryTimes) {
                    throw new RuntimeException("Execution of HBase lookup failed.", e);
                }
                try {
                    Thread.sleep(1000 * i);
                } catch (InterruptedException e2) {
                    throw new RuntimeException(e2);
                }
            }
        }
        return null;
    }

    public Collection<RowData> lookup(RowData rowData) throws IOException {
        Object[] lookupKeyValuesFromRow = HBaseLookupFunctionUtils.getLookupKeyValuesFromRow((GenericRowData) rowData);
        return (Collection) withRetry(() -> {
            return lookupInternal(lookupKeyValuesFromRow);
        });
    }

    public Collection<RowData> lookupInternal(Object[] objArr) throws IOException {
        if (this.fullKeyJoin) {
            Get createGet = this.serde.createGet(this.lookupKeyFields, objArr);
            if (createGet != null) {
                Result result = this.table.get(createGet);
                if (!result.isEmpty()) {
                    return Collections.singletonList(parseToRow(result, this.lookupKeyFields, objArr));
                }
            }
        } else {
            RowKeyInfo rowKeyInfo = this.rowKeyInfo;
            Scan scan = this.scan;
            List<RowKeyPart> list = this.lookupKeyFields;
            HBaseSerde hBaseSerde = this.serde;
            Objects.requireNonNull(hBaseSerde);
            HBaseLookupFunctionUtils.setScanRange(rowKeyInfo, scan, list, objArr, hBaseSerde::serializeRowKeyPart);
            ResultScanner scanner = this.table.getScanner(this.scan);
            ArrayList arrayList = new ArrayList();
            if (scanner != null) {
                for (Result result2 : scanner) {
                    if (!result2.isEmpty()) {
                        Object[] deserializeRowKey = this.serde.deserializeRowKey(result2.getRow());
                        if (HBaseLookupFunctionUtils.keyPartsMatches(deserializeRowKey, this.lookupKeyFields, objArr)) {
                            arrayList.add(this.serde.convertToNewRow(result2, this.rowKeyInfo.getKeys(), deserializeRowKey));
                        }
                    }
                }
                scanner.close();
                return arrayList;
            }
        }
        return Collections.emptyList();
    }

    public Collection<RowData> lookupBatch(List<LookupKeys> list) {
        return (Collection) withRetry(() -> {
            return batchLookupInternal(list);
        });
    }

    public Collection<RowData> batchLookupInternal(List<LookupKeys> list) throws IOException {
        Result[] resultArr = this.table.get((List<Get>) list.stream().map((v0) -> {
            return v0.getKeys();
        }).map(objArr -> {
            return this.serde.createGet(this.lookupKeyFields, objArr);
        }).collect(Collectors.toList()));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < resultArr.length; i++) {
            if (!resultArr[i].isEmpty()) {
                LookupKeys lookupKeys = list.get(i);
                arrayList.add(buildResultRow(lookupKeys, parseToRow(resultArr[i], this.lookupKeyFields, lookupKeys.getKeys())));
            }
        }
        return arrayList;
    }

    public void open(FunctionContext functionContext) throws Exception {
        LOG.info("start open ...");
        super.open(functionContext);
        Configuration prepareRuntimeConfiguration = HBaseLookupFunctionUtils.prepareRuntimeConfiguration(this.serializedConfig);
        try {
            if (this.hConnection == null) {
                HbaseSecurityModule.createConnection(prepareRuntimeConfiguration, () -> {
                    Connection createConnection = ConnectionFactory.createConnection(prepareRuntimeConfiguration);
                    this.hConnection = createConnection;
                    return createConnection;
                });
            }
            this.table = (HTable) this.hConnection.getTable(TableName.valueOf(this.hTableName));
            this.serde = new HBaseSerde(this.hbaseTableSchema, this.nullStringLiteral);
            this.fullKeyJoin = this.rowKeyInfo.getAllKeys().size() == this.lookupKeyFields.size();
            if (!this.fullKeyJoin) {
                this.scan = this.serde.createScan();
                if (this.filterResult != null && this.filterResult.getFilter() != null) {
                    this.scan.setFilter(this.filterResult.getFilter());
                }
            }
            LOG.info("end open.");
        } catch (TableNotFoundException e) {
            LOG.error("Table '{}' not found ", this.hTableName, e);
            throw new RuntimeException("HBase table '" + this.hTableName + "' not found.", e);
        } catch (IOException e2) {
            LOG.error("Exception while creating connection to HBase.", e2);
            throw new RuntimeException("Cannot create connection to HBase.", e2);
        }
    }

    public void close() {
        LOG.info("start close ...");
        if (null != this.table) {
            try {
                this.table.close();
                this.table = null;
            } catch (IOException e) {
                LOG.warn("exception when close table", e);
            }
        }
        if (null != this.hConnection) {
            try {
                this.hConnection.close();
                this.hConnection = null;
            } catch (IOException e2) {
                LOG.warn("exception when close connection", e2);
            }
        }
        LOG.info("end close.");
    }

    private RowData parseToRow(Result result, List<RowKeyPart> list, Object[] objArr) {
        return this.reuseRow ? this.serde.convertToReusedRow(result, list, objArr) : this.serde.convertToNewRow(result, list, objArr);
    }

    @VisibleForTesting
    public String getHTableName() {
        return this.hTableName;
    }
}
