package org.apache.flink.table.runtime.operators.join.lookup;

import java.util.Collection;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.ProxyCounter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.CacheMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalCacheMetricGroup;
import org.apache.flink.table.connector.config.lookup.LookupCachePolicy;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.collector.TableFunctionCollector;
import org.apache.flink.table.runtime.generated.GeneratedCollector;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.operators.join.lookup.cache.LookupCache;
import org.apache.flink.table.runtime.operators.join.lookup.collector.CachingCollector;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/lookup/LookupJoinCachingRunner.class */
public class LookupJoinCachingRunner extends LookupJoinRunner {
    private static final long serialVersionUID = 1;
    public static final String LOOKUP_CACHE_METRIC_GROUP_NAME = "";
    private final GeneratedProjection generatedProjection;
    private final RowDataSerializer cacheKeysRowSerializer;
    private final boolean isObjectReuseEnabled;
    protected final LookupCache lookupCache;
    private transient Projection<RowData, GenericRowData> projection;
    protected transient CachingCollector cachingCollector;
    private transient CacheMetricGroup cacheMetricGroup;
    private transient Counter hitCounter;
    private transient Counter missCounter;

    public LookupJoinCachingRunner(GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedFunction, GeneratedCollector<TableFunctionCollector<RowData>> generatedCollector, RowDataSerializer rowDataSerializer, GeneratedProjection generatedProjection, LookupCache lookupCache, boolean z, boolean z2, int i, int i2) {
        super(generatedFunction, generatedCollector, z, i, lookupCache.getLookupConfig().getMaxRetryTimes(), lookupCache.getLookupConfig().getCachePolicy() != LookupCachePolicy.ALL, i2);
        this.lookupCache = lookupCache;
        this.generatedProjection = generatedProjection;
        this.cacheKeysRowSerializer = rowDataSerializer;
        this.isObjectReuseEnabled = z2;
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.AbstractLookupJoinRunner
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.lookupCache.open(configuration, getRuntimeContext(), this.cacheMetricGroup);
        this.projection = this.generatedProjection.newInstance(Thread.currentThread().getContextClassLoader());
        this.cachingCollector = new CachingCollector(this.collector, this.lookupCache);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.table.runtime.operators.join.lookup.AbstractLookupJoinRunner
    public void registerMetrics() {
        super.registerMetrics();
        if (this.cacheMetricGroup == null) {
            this.cacheMetricGroup = new InternalCacheMetricGroup(getRuntimeContext().getMetricGroup(), "");
        }
        this.hitCounter = new SimpleCounter();
        this.missCounter = new SimpleCounter();
        this.cacheMetricGroup.hitCounter(this.hitCounter);
        this.cacheMetricGroup.missCounter(this.missCounter);
        if (this.useFetcher) {
            this.cacheMetricGroup.latestLoadTimeGauge(() -> {
                long[] values = this.lookupRequestTimeHist.getStatistics().getValues();
                return Long.valueOf(values.length > 0 ? values[values.length - 1] : -1L);
            });
            this.cacheMetricGroup.numLoadFailuresCounter(new ProxyCounter(this.failedLookupsCounter));
        }
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner
    protected void fetchElements(RowData rowData) throws Exception {
        GenericRowData apply = this.projection.apply(rowData);
        apply.setRowKind(RowKind.INSERT);
        Collection<RowData> cachedRows = this.lookupCache.getCachedRows(apply);
        if (cachedRows != null) {
            this.hitCounter.inc();
            Collector<RowData> fetcherCollector = getFetcherCollector();
            fetcherCollector.getClass();
            cachedRows.forEach((v1) -> {
                r1.collect(v1);
            });
            return;
        }
        this.missCounter.inc();
        if (!this.useFetcher) {
            throw new IllegalStateException("There is no required data in cache, but we can't fetch it. This should never happen. If you see this, report a bug, please.");
        }
        long currentTimeMillis = System.currentTimeMillis();
        this.fetcher.flatMap(rowData, getCachingFetcherCollector());
        this.lookupRequestTimeHist.update(System.currentTimeMillis() - currentTimeMillis);
        if (this.isObjectReuseEnabled) {
            apply = this.cacheKeysRowSerializer.copy((RowData) apply);
        }
        this.cachingCollector.storeInCache(apply);
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.AbstractLookupJoinRunner
    public void close() throws Exception {
        super.close();
        this.lookupCache.close();
    }

    public Collector<RowData> getCachingFetcherCollector() {
        return this.cachingCollector;
    }

    @VisibleForTesting
    public void setCacheMetricGroup(CacheMetricGroup cacheMetricGroup) {
        this.cacheMetricGroup = cacheMetricGroup;
    }
}
