package org.apache.flink.table.runtime.operators.rank;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava30.com.google.common.cache.RemovalCause;
import org.apache.flink.shaded.guava30.com.google.common.cache.RemovalListener;
import org.apache.flink.shaded.guava30.com.google.common.cache.RemovalNotification;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/rank/FastTop1Function.class */
public class FastTop1Function extends AbstractTopNFunction implements CheckpointedFunction {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(FastTop1Function.class);
    private final TypeSerializer<RowData> inputRowSer;
    private final long cacheSize;
    private transient ValueState<RowData> dataState;
    private transient Cache<RowData, RowData> kvCache;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/rank/FastTop1Function$CacheRemovalListener.class */
    private class CacheRemovalListener implements RemovalListener<RowData, RowData> {
        private CacheRemovalListener() {
        }

        public void onRemoval(RemovalNotification<RowData, RowData> removalNotification) {
            if (removalNotification.getCause() != RemovalCause.SIZE || removalNotification.getValue() == null) {
                return;
            }
            RowData rowData = (RowData) FastTop1Function.this.keyContext.getCurrentKey();
            FastTop1Function.this.keyContext.setCurrentKey((RowData) removalNotification.getKey());
            try {
                try {
                    FastTop1Function.this.dataState.update(removalNotification.getValue());
                    FastTop1Function.this.keyContext.setCurrentKey(rowData);
                } catch (Throwable th) {
                    FastTop1Function.LOG.error("Fail to synchronize state!", th);
                    throw new RuntimeException(th);
                }
            } catch (Throwable th2) {
                FastTop1Function.this.keyContext.setCurrentKey(rowData);
                throw th2;
            }
        }
    }

    public FastTop1Function(StateTtlConfig stateTtlConfig, InternalTypeInfo<RowData> internalTypeInfo, GeneratedRecordComparator generatedRecordComparator, RowDataKeySelector rowDataKeySelector, RankType rankType, RankRange rankRange, boolean z, boolean z2, long j) {
        super(stateTtlConfig, internalTypeInfo, generatedRecordComparator, rowDataKeySelector, rankType, rankRange, z, z2, 0, 0L);
        this.inputRowSer = internalTypeInfo.createSerializer(new ExecutionConfig());
        this.cacheSize = j;
    }

    @Override // org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        int max = Math.max(1, (int) (this.cacheSize / getDefaultTopNSize()));
        CacheBuilder newBuilder = CacheBuilder.newBuilder();
        if (this.ttlConfig.isEnabled()) {
            newBuilder.expireAfterWrite(this.ttlConfig.getTtl().toMilliseconds(), TimeUnit.MILLISECONDS);
        }
        this.kvCache = newBuilder.maximumSize(max).removalListener(new CacheRemovalListener()).build();
        LOG.info("Top-1 operator is using LRU caches key-size: {}", Integer.valueOf(max));
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("Top1-Rank-State", this.inputRowType);
        if (this.ttlConfig.isEnabled()) {
            valueStateDescriptor.enableTimeToLive(this.ttlConfig);
        }
        this.dataState = getRuntimeContext().getState(valueStateDescriptor);
        registerMetric(this.kvCache.size() * getDefaultTopNSize());
    }

    public void processElement(RowData rowData, KeyedProcessFunction<RowData, RowData, RowData>.Context context, Collector<RowData> collector) throws Exception {
        this.requestCount += serialVersionUID;
        RowData rowData2 = (RowData) this.keyContext.getCurrentKey();
        RowData rowData3 = (RowData) this.kvCache.getIfPresent(rowData2);
        if (rowData3 == null) {
            rowData3 = (RowData) this.dataState.value();
        } else {
            this.hitCount += serialVersionUID;
        }
        if (rowData3 == null) {
            this.kvCache.put(rowData2, this.inputRowSer.copy(rowData));
            if (this.outputRankNumber) {
                collectInsertWithoutBuffer(collector, rowData, serialVersionUID);
                return;
            } else {
                collectInsertWithoutBuffer(collector, rowData);
                return;
            }
        }
        if (this.sortKeyComparator.compare((RowData) this.sortKeySelector.getKey(rowData), (RowData) this.sortKeySelector.getKey(rowData3)) < 0) {
            this.kvCache.put(rowData2, this.inputRowSer.copy(rowData));
            if (this.outputRankNumber) {
                collectUpdateBeforeWithoutBuffer(collector, rowData3, serialVersionUID);
                collectUpdateAfterWithoutBuffer(collector, rowData, serialVersionUID);
            } else {
                collectUpdateBeforeWithoutBuffer(collector, rowData3);
                collectUpdateAfterWithoutBuffer(collector, rowData);
            }
        }
    }

    @Override // org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        for (Map.Entry entry : this.kvCache.asMap().entrySet()) {
            this.keyContext.setCurrentKey(entry.getKey());
            this.dataState.update(entry.getValue());
        }
    }

    @Override // org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
        processElement((RowData) obj, (KeyedProcessFunction<RowData, RowData, RowData>.Context) context, (Collector<RowData>) collector);
    }
}
