package org.apache.flink.table.runtime.operators.rank;

import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.outputcache.CacheBufferUtils;
import org.apache.flink.table.runtime.operators.outputcache.OutputCacheBuffer;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/rank/AbstractTopNFunction.class */
public abstract class AbstractTopNFunction extends KeyedProcessFunctionWithCleanupState<RowData, RowData, RowData> implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractTopNFunction.class);
    private static final String RANK_UNSUPPORTED_MSG = "RANK() on streaming table is not supported currently";
    private static final String DENSE_RANK_UNSUPPORTED_MSG = "DENSE_RANK() on streaming table is not supported currently";
    private static final String WITHOUT_RANK_END_UNSUPPORTED_MSG = "Rank end is not specified. Currently rank only support TopN, which means the rank end must be specified.";
    private transient ListState<Tuple3<RowData, Long, Integer>> outputBufferState;
    private transient OutputCacheBuffer<RowData, RankOrderedRow> outputBuffer;
    private static final long DEFAULT_TOPN_SIZE = 100;
    protected final boolean outputBufferEnabled;
    private int outputBufferCounter;
    private final int outputBufferSize;
    private final boolean outputBufferFlushingEnabled;
    private final long outputBufferFlushInterval;
    private long outputBufferFlushingTime;
    private GeneratedRecordComparator generatedSortKeyComparator;
    protected Comparator<RowData> sortKeyComparator;
    private final boolean generateUpdateBefore;
    protected final boolean outputRankNumber;
    protected final InternalTypeInfo<RowData> inputRowType;
    protected final TypeSerializer<RowData> inputRowSer;
    protected final KeySelector<RowData, RowData> sortKeySelector;
    private final TypeSerializer<RowData> sortKeySer;
    protected final InternalTypeInfo<RowData> sortKeyType;
    protected KeyContext keyContext;
    private final boolean isConstantRankEnd;
    private final long rankStart;
    private final int rankEndIndex;
    protected long rankEnd;
    private transient Function<RowData, Long> rankEndFetcher;
    private ValueState<Long> rankEndState;
    private Counter invalidCounter;
    private JoinedRowData outputRow;
    protected long hitCount;
    protected long requestCount;

    /* renamed from: org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/rank/AbstractTopNFunction$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$table$types$logical$LogicalTypeRoot = new int[LogicalTypeRoot.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$table$types$logical$LogicalTypeRoot[LogicalTypeRoot.BIGINT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$table$types$logical$LogicalTypeRoot[LogicalTypeRoot.INTEGER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$table$types$logical$LogicalTypeRoot[LogicalTypeRoot.SMALLINT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$flink$table$runtime$operators$rank$RankType = new int[RankType.values().length];
            try {
                $SwitchMap$org$apache$flink$table$runtime$operators$rank$RankType[RankType.ROW_NUMBER.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$flink$table$runtime$operators$rank$RankType[RankType.RANK.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$flink$table$runtime$operators$rank$RankType[RankType.DENSE_RANK.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/rank/AbstractTopNFunction$RankOrderedRow.class */
    public class RankOrderedRow {
        private final RowData row;
        private final long rank;
        private final int ord;

        private RankOrderedRow(RowData rowData, long j, int i) {
            this.row = rowData;
            this.rank = j;
            this.ord = i;
        }

        private RankOrderedRow(Tuple3<RowData, Long, Integer> tuple3) {
            this.row = (RowData) tuple3.f0;
            this.rank = ((Long) tuple3.f1).longValue();
            this.ord = ((Integer) tuple3.f2).intValue();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Tuple3<RowData, Long, Integer> getAsTuple() {
            return new Tuple3<>(this.row, Long.valueOf(this.rank), Integer.valueOf(this.ord));
        }

        public RankOrderedRow setRowKind(RowKind rowKind) {
            this.row.setRowKind(rowKind);
            return this;
        }

        public RowData getRow() {
            return this.row;
        }

        public long getRank() {
            return this.rank;
        }

        public int getOrd() {
            return this.ord;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            RankOrderedRow rankOrderedRow = (RankOrderedRow) obj;
            if (this.rank == rankOrderedRow.rank && this.row.getArity() == rankOrderedRow.row.getArity()) {
                return this.row.getClass() != rankOrderedRow.row.getClass() ? fieldsEquals(this.row, rankOrderedRow.row) : Objects.equals(this.row, rankOrderedRow.row);
            }
            return false;
        }

        private boolean fieldsEquals(RowData rowData, RowData rowData2) {
            LogicalType[] rowFieldTypes = AbstractTopNFunction.this.inputRowType.toRowFieldTypes();
            for (int i = 0; i < rowFieldTypes.length; i++) {
                RowData.FieldGetter createFieldGetter = RowData.createFieldGetter(rowFieldTypes[i], i);
                if (!Objects.equals(createFieldGetter.getFieldOrNull(rowData), createFieldGetter.getFieldOrNull(rowData2))) {
                    return false;
                }
            }
            return true;
        }

        public int hashCode() {
            return Objects.hash(this.row, Long.valueOf(this.rank));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractTopNFunction(long j, long j2, InternalTypeInfo<RowData> internalTypeInfo, GeneratedRecordComparator generatedRecordComparator, RowDataKeySelector rowDataKeySelector, RankType rankType, RankRange rankRange, boolean z, boolean z2, int i, long j3) {
        super(j, j2);
        this.outputBufferCounter = 0;
        this.hitCount = 0L;
        this.requestCount = 0L;
        switch (rankType) {
            case ROW_NUMBER:
                if (rankRange instanceof ConstantRankRange) {
                    ConstantRankRange constantRankRange = (ConstantRankRange) rankRange;
                    this.isConstantRankEnd = true;
                    this.rankStart = constantRankRange.getRankStart();
                    this.rankEnd = constantRankRange.getRankEnd();
                    this.rankEndIndex = -1;
                } else {
                    if (!(rankRange instanceof VariableRankRange)) {
                        LOG.error(WITHOUT_RANK_END_UNSUPPORTED_MSG);
                        throw new UnsupportedOperationException(WITHOUT_RANK_END_UNSUPPORTED_MSG);
                    }
                    this.rankEndIndex = ((VariableRankRange) rankRange).getRankEndIndex();
                    this.isConstantRankEnd = false;
                    this.rankStart = -1L;
                    this.rankEnd = -1L;
                }
                this.generatedSortKeyComparator = generatedRecordComparator;
                this.generateUpdateBefore = z;
                this.inputRowType = internalTypeInfo;
                this.inputRowSer = internalTypeInfo.createSerializer(new ExecutionConfig());
                this.outputRankNumber = z2;
                this.sortKeySelector = rowDataKeySelector;
                this.sortKeyType = rowDataKeySelector.mo72getProducedType();
                this.sortKeySer = this.sortKeyType.createSerializer(new ExecutionConfig());
                this.outputBufferSize = i;
                this.outputBufferFlushInterval = j3;
                this.outputBufferEnabled = i > 0;
                this.outputBufferFlushingEnabled = i > 0 && j3 > 0;
                return;
            case RANK:
                LOG.error(RANK_UNSUPPORTED_MSG);
                throw new UnsupportedOperationException(RANK_UNSUPPORTED_MSG);
            case DENSE_RANK:
                LOG.error(DENSE_RANK_UNSUPPORTED_MSG);
                throw new UnsupportedOperationException(DENSE_RANK_UNSUPPORTED_MSG);
            default:
                LOG.error("Streaming tables do not support {}", rankType.name());
                throw new UnsupportedOperationException("Streaming tables do not support " + rankType.toString());
        }
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        initCleanupTimeState("RankFunctionCleanupTime");
        this.outputRow = new JoinedRowData();
        if (!this.isConstantRankEnd) {
            this.rankEndState = getRuntimeContext().getState(new ValueStateDescriptor("rankEnd", Types.LONG));
        }
        this.sortKeyComparator = this.generatedSortKeyComparator.newInstance(getRuntimeContext().getUserCodeClassLoader());
        this.generatedSortKeyComparator = null;
        this.invalidCounter = getRuntimeContext().getMetricGroup().counter("topn.invalidTopSize");
        if (this.isConstantRankEnd) {
            return;
        }
        LogicalType logicalType = this.inputRowType.toRowFieldTypes()[this.rankEndIndex];
        switch (AnonymousClass2.$SwitchMap$org$apache$flink$table$types$logical$LogicalTypeRoot[logicalType.getTypeRoot().ordinal()]) {
            case 1:
                this.rankEndFetcher = rowData -> {
                    return Long.valueOf(rowData.getLong(this.rankEndIndex));
                };
                return;
            case 2:
                this.rankEndFetcher = rowData2 -> {
                    return Long.valueOf(rowData2.getInt(this.rankEndIndex));
                };
                return;
            case 3:
                this.rankEndFetcher = rowData3 -> {
                    return Long.valueOf(rowData3.getShort(this.rankEndIndex));
                };
                return;
            default:
                LOG.error("variable rank index column must be long, short or int type, while input type is {}", logicalType.getClass().getName());
                throw new UnsupportedOperationException("variable rank index column must be long type, while input type is " + logicalType.getClass().getName());
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        if (this.outputBufferEnabled) {
            this.outputBufferState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("output-buffered-data-state", TypeInformation.of(new TypeHint<Tuple3<RowData, Long, Integer>>() { // from class: org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction.1
            })));
            this.outputBuffer = new OutputCacheBuffer<>(Comparator.comparing((v0) -> {
                return v0.getOrd();
            }), this.outputBufferSize);
            if (functionInitializationContext.isRestored()) {
                for (Tuple3 tuple3 : (Iterable) this.outputBufferState.get()) {
                    this.outputBuffer.collect((RowData) this.sortKeySelector.getKey(tuple3.f0), new RankOrderedRow(tuple3));
                    this.outputBufferCounter++;
                }
            }
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (this.outputBufferEnabled) {
            this.outputBufferState.clear();
            Iterator<Map.Entry<RowData, List<RankOrderedRow>>> it = this.outputBuffer.getInternalMap().entrySet().iterator();
            while (it.hasNext()) {
                this.outputBufferState.addAll((List) it.next().getValue().stream().map(obj -> {
                    return ((RankOrderedRow) obj).getAsTuple();
                }).collect(Collectors.toCollection(LinkedList::new)));
            }
        }
    }

    public void onTimer(long j, KeyedProcessFunction<RowData, RowData, RowData>.OnTimerContext onTimerContext, Collector<RowData> collector) throws Exception {
        if (this.outputBufferFlushingTime == j) {
            checkAndFlushOutputBuffer(collector, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerProcessingOutputBufferFlushingTimer(KeyedProcessFunction<RowData, RowData, RowData>.Context context, long j) {
        if (this.outputBufferFlushingEnabled && this.outputBuffer.isEmpty()) {
            TimerService timerService = context.timerService();
            if (j > this.outputBufferFlushingTime) {
                long j2 = j + this.outputBufferFlushInterval;
                timerService.registerProcessingTimeTimer(j2);
                this.outputBufferFlushingTime = j2;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getDefaultTopNSize() {
        return this.isConstantRankEnd ? this.rankEnd : DEFAULT_TOPN_SIZE;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long initRankEnd(RowData rowData) throws Exception {
        if (!this.isConstantRankEnd) {
            Long l = (Long) this.rankEndState.value();
            long longValue = this.rankEndFetcher.apply(rowData).longValue();
            if (l == null) {
                this.rankEnd = longValue;
                this.rankEndState.update(Long.valueOf(this.rankEnd));
            } else {
                this.rankEnd = l.longValue();
                if (this.rankEnd != longValue) {
                    this.invalidCounter.inc();
                }
            }
        }
        return this.rankEnd;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean checkSortKeyInBufferRange(RowData rowData, TopNBuffer topNBuffer) {
        Comparator<RowData> sortKeyComparator = topNBuffer.getSortKeyComparator();
        Map.Entry<RowData, Collection<RowData>> lastEntry = topNBuffer.lastEntry();
        return lastEntry == null || sortKeyComparator.compare(rowData, lastEntry.getKey()) < 0 || ((long) topNBuffer.getCurrentTopNum()) < getDefaultTopNSize();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerMetric(long j) {
        getRuntimeContext().getMetricGroup().gauge("topn.cache.hitRate", () -> {
            return Double.valueOf(this.requestCount == 0 ? 1.0d : Long.valueOf(this.hitCount).doubleValue() / this.requestCount);
        });
        getRuntimeContext().getMetricGroup().gauge("topn.cache.size", () -> {
            return Long.valueOf(j);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void collectInsert(Collector<RowData> collector, RowData rowData, RowData rowData2, long j) {
        if (isInRankRange(j)) {
            if (this.outputBufferEnabled) {
                collectThroughOutputBuffer(rowData, rowData2, RowKind.INSERT, j);
            } else {
                collector.collect(createOutputRow(rowData2, j, RowKind.INSERT));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void collectInsert(Collector<RowData> collector, RowData rowData, RowData rowData2) {
        if (this.outputBufferEnabled) {
            collectThroughOutputBuffer(rowData, rowData2, RowKind.INSERT);
        } else {
            rowData2.setRowKind(RowKind.INSERT);
            collector.collect(rowData2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void collectDelete(Collector<RowData> collector, RowData rowData, RowData rowData2, long j) {
        if (isInRankRange(j)) {
            if (this.outputBufferEnabled) {
                collectThroughOutputBuffer(rowData, rowData2, RowKind.DELETE, j);
            } else {
                collector.collect(createOutputRow(rowData2, j, RowKind.DELETE));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void collectDelete(Collector<RowData> collector, RowData rowData, RowData rowData2) {
        if (this.outputBufferEnabled) {
            collectThroughOutputBuffer(rowData, rowData2, RowKind.DELETE);
        } else {
            rowData2.setRowKind(RowKind.DELETE);
            collector.collect(rowData2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void collectUpdateAfter(Collector<RowData> collector, RowData rowData, RowData rowData2, long j) {
        if (isInRankRange(j)) {
            if (this.outputBufferEnabled) {
                collectThroughOutputBuffer(rowData, rowData2, RowKind.UPDATE_AFTER, j);
            } else {
                collector.collect(createOutputRow(rowData2, j, RowKind.UPDATE_AFTER));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void collectUpdateAfter(Collector<RowData> collector, RowData rowData, RowData rowData2) {
        if (this.outputBufferEnabled) {
            collectThroughOutputBuffer(rowData, rowData2, RowKind.UPDATE_AFTER);
        } else {
            rowData2.setRowKind(RowKind.UPDATE_AFTER);
            collector.collect(rowData2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void collectUpdateBefore(Collector<RowData> collector, RowData rowData, RowData rowData2, long j) {
        if (isInRankRange(j)) {
            if (this.outputBufferEnabled) {
                collectThroughOutputBuffer(rowData, rowData2, RowKind.UPDATE_BEFORE, j);
            } else if (this.generateUpdateBefore) {
                collector.collect(createOutputRow(rowData2, j, RowKind.UPDATE_BEFORE));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void collectUpdateBefore(Collector<RowData> collector, RowData rowData, RowData rowData2) {
        if (this.outputBufferEnabled) {
            collectThroughOutputBuffer(rowData, rowData2, RowKind.UPDATE_BEFORE);
        } else if (this.generateUpdateBefore) {
            rowData2.setRowKind(RowKind.UPDATE_BEFORE);
            collector.collect(rowData2);
        }
    }

    private void collectThroughOutputBuffer(RowData rowData, RowData rowData2, RowKind rowKind) {
        collectThroughOutputBuffer(rowData, rowData2, rowKind, -1L);
    }

    private void collectThroughOutputBuffer(RowData rowData, RowData rowData2, RowKind rowKind, long j) {
        RowData rowData3 = (RowData) this.sortKeySer.copy(rowData);
        RowData rowData4 = (RowData) this.inputRowSer.copy(rowData2);
        if (!this.outputRankNumber) {
            j = -1;
        }
        int i = this.outputBufferCounter + 1;
        this.outputBufferCounter = i;
        RankOrderedRow rankOrderedRow = new RankOrderedRow(rowData4, j, i);
        Tuple2<RowKind, RowKind> oppositeRowKinds = CacheBufferUtils.getOppositeRowKinds(rowKind);
        if (this.outputBuffer.retract(rowData3, rankOrderedRow.setRowKind((RowKind) oppositeRowKinds.f0)) || this.outputBuffer.retract(rowData3, rankOrderedRow.setRowKind((RowKind) oppositeRowKinds.f1))) {
            return;
        }
        this.outputBuffer.collect(rowData3, rankOrderedRow.setRowKind(rowKind));
    }

    public void checkAndFlushOutputBuffer(Collector<RowData> collector, boolean z) {
        if (this.outputBufferEnabled) {
            if (z || this.outputBuffer.isReadyToFlush()) {
                this.outputBuffer.flush(rankOrderedRow -> {
                    if (!rankOrderedRow.row.getRowKind().equals(RowKind.UPDATE_BEFORE) || this.generateUpdateBefore) {
                        if (rankOrderedRow.getRank() == -1) {
                            collector.collect(rankOrderedRow.getRow());
                        } else {
                            collector.collect(createOutputRow(rankOrderedRow.getRow(), rankOrderedRow.getRank()));
                        }
                    }
                });
                this.outputBufferCounter = 0;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isInRankEnd(long j) {
        return j <= this.rankEnd;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isInRankRange(long j) {
        return j <= this.rankEnd && j >= this.rankStart;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean hasOffset() {
        return this.rankStart > 1;
    }

    private RowData createOutputRow(RowData rowData, long j) {
        return createOutputRow(rowData, j, rowData.getRowKind());
    }

    private RowData createOutputRow(RowData rowData, long j, RowKind rowKind) {
        if (!this.outputRankNumber) {
            rowData.setRowKind(rowKind);
            return rowData;
        }
        GenericRowData genericRowData = new GenericRowData(1);
        genericRowData.setField(0, Long.valueOf(j));
        this.outputRow.replace(rowData, genericRowData);
        this.outputRow.setRowKind(rowKind);
        return this.outputRow;
    }

    public void setKeyContext(KeyContext keyContext) {
        this.keyContext = keyContext;
    }
}
