package org.apache.flink.table.runtime.operators.join.lookup;

import java.util.Collection;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.connector.config.lookup.LookupCachePolicy;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.collector.TableFunctionCollector;
import org.apache.flink.table.runtime.generated.GeneratedCollector;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.operators.join.lookup.cache.LookupCache;
import org.apache.flink.table.runtime.operators.join.lookup.collector.CachingCollector;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/lookup/LookupJoinCachingRunner.class */
public class LookupJoinCachingRunner extends LookupJoinRunner {
    private static final long serialVersionUID = 1;
    private final GeneratedProjection generatedProjection;
    private final TypeInformation<RowData> keysTypeInformation;
    private final boolean isObjectReuseEnabled;
    protected final LookupCache lookupCache;
    private transient Projection<RowData, GenericRowData> projection;
    private transient TypeSerializer<RowData> keysSerializer;
    protected transient CachingCollector cachingCollector;

    public LookupJoinCachingRunner(GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedFunction, GeneratedCollector<TableFunctionCollector<RowData>> generatedCollector, GeneratedProjection generatedProjection, TypeInformation<RowData> typeInformation, LookupCache lookupCache, boolean z, boolean z2, int i) {
        super(generatedFunction, generatedCollector, z, i, lookupCache.getLookupConfig().getMaxRetryTimes(), lookupCache.getLookupConfig().getCachePolicy() != LookupCachePolicy.ALL);
        this.lookupCache = lookupCache;
        this.generatedProjection = generatedProjection;
        this.keysTypeInformation = typeInformation;
        this.isObjectReuseEnabled = z2;
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.lookupCache.open(configuration, getRuntimeContext());
        this.projection = this.generatedProjection.newInstance(Thread.currentThread().getContextClassLoader());
        this.keysSerializer = this.keysTypeInformation.createSerializer(getRuntimeContext().getExecutionConfig());
        this.cachingCollector = new CachingCollector(this.collector, this.lookupCache);
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner
    protected void fetchElements(RowData rowData) throws Exception {
        GenericRowData apply = this.projection.apply(rowData);
        apply.setRowKind(RowKind.INSERT);
        Collection<RowData> cachedRows = this.lookupCache.getCachedRows(apply);
        if (cachedRows != null) {
            Collector<RowData> fetcherCollector = getFetcherCollector();
            fetcherCollector.getClass();
            cachedRows.forEach((v1) -> {
                r1.collect(v1);
            });
        } else {
            if (!this.useFetcher) {
                throw new IllegalStateException("There is no required data in cache, but we can't fetch it. This should never happen. If you see this, report a bug, please.");
            }
            this.fetcher.flatMap(rowData, getCachingFetcherCollector());
            if (this.isObjectReuseEnabled) {
                apply = (GenericRowData) this.keysSerializer.copy(apply);
            }
            this.cachingCollector.storeInCache(apply);
        }
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner
    public void close() throws Exception {
        super.close();
        this.lookupCache.close();
    }

    public Collector<RowData> getCachingFetcherCollector() {
        return this.cachingCollector;
    }
}
