package org.apache.flink.table.runtime.operators.join.lookup.cache;

import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.CacheMetricGroup;
import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
import org.apache.flink.table.connector.config.lookup.LookupCachePolicy;
import org.apache.flink.table.connector.config.lookup.LookupConfig;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/lookup/cache/LookupLRUCache.class */
public class LookupLRUCache extends LookupCache {
    private static final Logger LOG = LoggerFactory.getLogger(LookupLRUCache.class);
    private static final long serialVersionUID = 1;
    public static final String HIT_RATE_METRIC = "LookupLRUCacheHitRate";
    protected transient Cache<GenericRowData, Collection<RowData>> cache;
    private transient Counter loadCount;

    public LookupLRUCache(LookupConfig lookupConfig) {
        super(lookupConfig);
        Preconditions.checkArgument(lookupConfig.getCachePolicy() == LookupCachePolicy.LRU);
        Preconditions.checkArgument(lookupConfig.getCacheMaxRows() >= 0, "For LRU lookup cache policy size of cache must be >= 0.");
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.cache.LookupCache
    public void open(Configuration configuration, RuntimeContext runtimeContext, CacheMetricGroup cacheMetricGroup) {
        CacheBuilder recordStats = CacheBuilder.newBuilder().recordStats();
        if (this.lookupConfig.getCacheMaxRows() > 0) {
            recordStats.maximumWeight(this.lookupConfig.getCacheMaxRows()).weigher((genericRowData, collection) -> {
                return collection.size();
            });
        }
        if (this.lookupConfig.getCacheExpireMs() > 0) {
            recordStats.expireAfterWrite(this.lookupConfig.getCacheExpireMs(), TimeUnit.MILLISECONDS);
        }
        this.cache = recordStats.build();
        this.loadCount = new SimpleCounter();
        cacheMetricGroup.loadCounter(this.loadCount);
        Cache<GenericRowData, Collection<RowData>> cache = this.cache;
        cache.getClass();
        cacheMetricGroup.numCachedRecordsGauge(cache::size);
        runtimeContext.getMetricGroup().gauge(HIT_RATE_METRIC, () -> {
            return Double.valueOf(this.cache.stats().hitRate());
        });
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.cache.LookupCache
    public Collection<RowData> getCachedRows(GenericRowData genericRowData) {
        return (Collection) this.cache.getIfPresent(genericRowData);
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.cache.LookupCache
    public void putCollectedRows(GenericRowData genericRowData, Collection<RowData> collection) {
        this.loadCount.inc();
        this.cache.put(genericRowData, collection);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Cache content: \n{\n\t{}\n}", Joiner.on(",\n\t").withKeyValueSeparator(" = ").join(this.cache.asMap()));
        }
        this.cache.invalidateAll();
    }

    @Override // org.apache.flink.table.runtime.operators.join.lookup.cache.LookupCache
    public long size() {
        return this.cache.size();
    }
}
