package org.apache.flink.table.runtime.operators.join.lookup;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.operators.join.lookup.cache.LookupCache;
import org.apache.flink.table.runtime.operators.join.lookup.collector.CachingResultFuture;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.types.RowKind;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/lookup/AsyncLookupJoinCachingRunner.class */
public class AsyncLookupJoinCachingRunner extends AsyncLookupJoinRunner {
    private static final long serialVersionUID = 1;
    protected final LookupCache lookupCache;
    private final RowDataSerializer cacheKeysRowSerializer;
    private final GeneratedProjection generatedProjection;
    private transient Projection<RowData, GenericRowData> projection;
    protected transient AtomicLong latestLoadTimeMs;
    private transient Counter hitCount;
    private transient Counter missCount;

    public AsyncLookupJoinCachingRunner(GeneratedFunction<AsyncFunction<RowData, Object>> generatedFunction, DataStructureConverter<RowData, Object> dataStructureConverter, GeneratedResultFuture<TableFunctionResultFuture<RowData>> generatedResultFuture, RowDataSerializer rowDataSerializer, RowDataSerializer rowDataSerializer2, GeneratedProjection generatedProjection, LookupCache lookupCache, boolean z, int i) {
        super(generatedFunction, dataStructureConverter, rowDataSerializer, generatedResultFuture, z, i, lookupCache.getLookupConfig().getMaxRetryTimes());
        this.generatedProjection = generatedProjection;
        this.cacheKeysRowSerializer = rowDataSerializer2;
        this.lookupCache = lookupCache;
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.AbstractAsyncLookupJoinRunner
    public void open(Configuration configuration) throws Exception {
        this.projection = this.generatedProjection.newInstance(Thread.currentThread().getContextClassLoader());
        this.lookupCache.setRuntimeContext(getRuntimeContext());
        this.lookupCache.open(configuration);
        this.latestLoadTimeMs = new AtomicLong();
        this.hitCount = getRuntimeContext().getMetricGroup().counter(LookupCache.HIT_COUNT_METRIC);
        this.missCount = getRuntimeContext().getMetricGroup().counter(LookupCache.MISS_COUNT_METRIC);
        getRuntimeContext().getMetricGroup().gauge(LookupCache.LATEST_LOAD_TIME_METRIC, () -> {
            return Long.valueOf(this.latestLoadTimeMs.get());
        });
        super.open(configuration);
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.AbstractAsyncLookupJoinRunner
    public TableFunctionResultFuture<RowData> createFetcherResultFuture(Configuration configuration) {
        return new CachingResultFuture(this.lookupCache, this.resultFutureProvider.provide(getRuntimeContext()), this.latestLoadTimeMs);
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner
    public void asyncInvoke(RowData rowData, ResultFuture<RowData> resultFuture) throws Exception {
        RowData rowData2 = (GenericRowData) this.projection.apply(rowData);
        rowData2.setRowKind(RowKind.INSERT);
        Collection<RowData> cachedRows = this.lookupCache.getCachedRows(rowData2);
        JoinedRowResultFuture<RowData> joinedRowResultFuture = (JoinedRowResultFuture) this.resultFutureBuffer.take();
        joinedRowResultFuture.reset(rowData, resultFuture);
        if (cachedRows != null) {
            this.hitCount.inc();
            getNonCachingFuture(joinedRowResultFuture).complete(cachedRows);
        } else {
            this.missCount.inc();
            getCachingFuture(joinedRowResultFuture).setInputKeys((GenericRowData) this.cacheKeysRowSerializer.copy(rowData2));
            this.fetcher.asyncInvoke(rowData, joinedRowResultFuture);
        }
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.AbstractAsyncLookupJoinRunner
    public void close() throws Exception {
        super.close();
        this.lookupCache.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CachingResultFuture getCachingFuture(JoinedRowResultFuture<RowData> joinedRowResultFuture) {
        return (CachingResultFuture) joinedRowResultFuture.getFetcherResultFuture();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JoinedRowResultFuture<RowData> getNonCachingFuture(JoinedRowResultFuture<RowData> joinedRowResultFuture) {
        getCachingFuture(joinedRowResultFuture).ignoreCaching();
        return joinedRowResultFuture;
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner
    public /* bridge */ /* synthetic */ void asyncInvoke(Object obj, ResultFuture resultFuture) throws Exception {
        asyncInvoke((RowData) obj, (ResultFuture<RowData>) resultFuture);
    }
}
