package org.apache.flink.table.runtime.operators.join.lookup;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.LookupMetricGroup;
import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
import org.apache.flink.runtime.metrics.groups.InternalLookupMetricGroup;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.table.runtime.operators.join.lookup.fetch.async.ResultFutureProvider;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/lookup/AbstractAsyncLookupJoinRunner.class */
public abstract class AbstractAsyncLookupJoinRunner<IN> extends RichAsyncFunction<IN, RowData> {
    private static final long serialVersionUID = -8477782423846031597L;
    public static final String LOOKUP_JOIN_METRIC_GROUP_NAME = "async";
    private final GeneratedFunction<AsyncFunction<IN, Object>> generatedFetcher;
    private final int asyncBufferCapacity;
    private final int lookupTimeHistorySize;
    protected final DataStructureConverter<RowData, Object> fetcherConverter;
    protected final boolean isLeftOuterJoin;
    protected final int maxRetries;
    protected final ResultFutureProvider resultFutureProvider;
    protected final RowDataSerializer rightRowSerializer;
    private transient List<JoinedRowResultFuture<IN>> allResultFutures;
    protected transient AsyncFunction<IN, Object> fetcher;
    protected transient BlockingQueue<JoinedRowResultFuture<IN>> resultFutureBuffer;
    protected transient LookupMetricGroup lookupMetricGroup;
    protected transient Counter failedLookupsCounter;
    protected transient Histogram lookupRequestTimeHist;

    public AbstractAsyncLookupJoinRunner(GeneratedFunction<AsyncFunction<IN, Object>> generatedFunction, DataStructureConverter<RowData, Object> dataStructureConverter, RowDataSerializer rowDataSerializer, ResultFutureProvider resultFutureProvider, boolean z, int i, int i2, int i3) {
        this.generatedFetcher = generatedFunction;
        this.fetcherConverter = dataStructureConverter;
        this.isLeftOuterJoin = z;
        this.asyncBufferCapacity = i;
        this.rightRowSerializer = rowDataSerializer;
        this.maxRetries = i2;
        this.resultFutureProvider = resultFutureProvider;
        this.lookupTimeHistorySize = i3;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.resultFutureProvider.setup(configuration, getRuntimeContext());
        this.fetcher = (AsyncFunction) this.generatedFetcher.newInstance(getRuntimeContext().getUserCodeClassLoader());
        FunctionUtils.setFunctionRuntimeContext(this.fetcher, getRuntimeContext());
        FunctionUtils.openFunction(this.fetcher, configuration);
        this.fetcherConverter.open(getRuntimeContext().getUserCodeClassLoader());
        this.resultFutureBuffer = new ArrayBlockingQueue(this.asyncBufferCapacity + 1);
        this.allResultFutures = new ArrayList();
        registerMetrics();
        for (int i = 0; i < this.asyncBufferCapacity + 1; i++) {
            TableFunctionResultFuture<RowData> createFetcherResultFuture = createFetcherResultFuture(configuration);
            createFetcherResultFuture.setRuntimeContext(getRuntimeContext());
            createFetcherResultFuture.open(configuration);
            JoinedRowResultFuture<IN> createJoinedResultFuture = createJoinedResultFuture(createFetcherResultFuture, this.rightRowSerializer.getArity());
            this.resultFutureBuffer.add(createJoinedResultFuture);
            this.allResultFutures.add(createJoinedResultFuture);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerMetrics() {
        if (this.lookupMetricGroup == null) {
            this.lookupMetricGroup = new InternalLookupMetricGroup(getRuntimeContext().getMetricGroup(), LOOKUP_JOIN_METRIC_GROUP_NAME);
        }
        this.failedLookupsCounter = new SimpleCounter();
        this.lookupRequestTimeHist = new DescriptiveStatisticsHistogram(this.lookupTimeHistorySize);
        this.lookupMetricGroup.numFailedLookupsCounter(this.failedLookupsCounter);
        this.lookupMetricGroup.lookupRequestTimeHist(this.lookupRequestTimeHist);
        LookupMetricGroup lookupMetricGroup = this.lookupMetricGroup;
        BlockingQueue<JoinedRowResultFuture<IN>> blockingQueue = this.resultFutureBuffer;
        blockingQueue.getClass();
        lookupMetricGroup.numInFlightLookupRequestsGauge(blockingQueue::remainingCapacity);
    }

    public TableFunctionResultFuture<RowData> createFetcherResultFuture(Configuration configuration) {
        return this.resultFutureProvider.provide(getRuntimeContext());
    }

    protected abstract JoinedRowResultFuture<IN> createJoinedResultFuture(TableFunctionResultFuture<RowData> tableFunctionResultFuture, int i);

    @VisibleForTesting
    public List<JoinedRowResultFuture<IN>> getAllResultFutures() {
        return this.allResultFutures;
    }

    @VisibleForTesting
    public void setLookupMetricGroup(LookupMetricGroup lookupMetricGroup) {
        this.lookupMetricGroup = lookupMetricGroup;
    }

    public void close() throws Exception {
        super.close();
        if (this.fetcher != null) {
            FunctionUtils.closeFunction(this.fetcher);
        }
        if (this.allResultFutures != null) {
            Iterator<JoinedRowResultFuture<IN>> it = this.allResultFutures.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
        }
    }
}
