package org.apache.flink.runtime.state.heap;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateEntry;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.heap.estimate.SampleStateMemoryEstimator;
import org.apache.flink.runtime.state.heap.estimate.StateMemoryEstimatorFactory;
import org.apache.flink.runtime.state.heap.space.Allocator;
import org.apache.flink.util.IOUtils;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/SpillableStateTable.class */
public abstract class SpillableStateTable<K, N, S> extends StateTable<K, N, S> implements Closeable {
    private static final int DEFAULT_NUM_KEYS_TO_DELETED_ONE_TIME = 2;
    private static final float DEFAULT_LOGICAL_REMOVE_KEYS_RATIO = 0.2f;
    protected static final int DEFAULT_ESTIMATE_SAMPLE_COUNT = 1000;
    private final Allocator spaceAllocator;
    protected final SpillAndLoadManager spillAndLoadManager;
    protected final StateTransformationFunctionWrapper transformationWrapper;
    private final SpillStateType spillStateType;
    private final long[] numRequests;
    private final SampleStateMemoryEstimator<K, N, S> stateMemoryEstimator;

    /* loaded from: input_file:org/apache/flink/runtime/state/heap/SpillableStateTable$KeyGroupType.class */
    public enum KeyGroupType {
        SIMPLE,
        COMPLEX
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/heap/SpillableStateTable$SpillStateType.class */
    public enum SpillStateType {
        HASH_TABLE,
        SKIP_LIST
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/heap/SpillableStateTable$StateMapMeta.class */
    public static class StateMapMeta {
        private final SpillableStateTable stateTable;
        private final int keyGroupIndex;
        private final boolean isOnHeap;
        private final boolean isComplex;
        private final int size;
        private final long numRequests;
        private long estimatedMemorySize = -1;

        public StateMapMeta(SpillableStateTable spillableStateTable, int i, boolean z, boolean z2, int i2, long j) {
            this.stateTable = spillableStateTable;
            this.keyGroupIndex = i;
            this.isOnHeap = z;
            this.isComplex = z2;
            this.size = i2;
            this.numRequests = j;
        }

        public SpillableStateTable getStateTable() {
            return this.stateTable;
        }

        public boolean isOnHeap() {
            return this.isOnHeap;
        }

        public boolean isComplex() {
            return this.isComplex;
        }

        public int getSize() {
            return this.size;
        }

        public int getKeyGroupIndex() {
            return this.keyGroupIndex;
        }

        public long getNumRequests() {
            return this.numRequests;
        }

        public long getEstimatedMemorySize() {
            return this.estimatedMemorySize;
        }

        public void setEstimatedMemorySize(long j) {
            this.estimatedMemorySize = j;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/heap/SpillableStateTable$StateTransformationFunctionWrapper.class */
    class StateTransformationFunctionWrapper<T> implements StateTransformationFunction<S, T> {
        private N namespace;
        private StateTransformationFunction<S, T> stateTransformationFunction;

        StateTransformationFunctionWrapper() {
        }

        public StateTransformationFunction<S, T> getStateTransformationFunction() {
            return this.stateTransformationFunction;
        }

        public void setStateTransformationFunction(N n, StateTransformationFunction<S, T> stateTransformationFunction) {
            this.namespace = n;
            this.stateTransformationFunction = stateTransformationFunction;
        }

        @Override // org.apache.flink.runtime.state.StateTransformationFunction
        public S apply(S s, T t) throws Exception {
            S apply = this.stateTransformationFunction.apply(s, t);
            if (apply != null) {
                SpillableStateTable.this.updateStateEstimate(this.namespace, apply);
            }
            return apply;
        }
    }

    public SpillableStateTable(InternalKeyContext<K> internalKeyContext, RegisteredKeyValueStateBackendMetaInfo<N, S> registeredKeyValueStateBackendMetaInfo, TypeSerializer<K> typeSerializer, Allocator allocator, SpillAndLoadManager spillAndLoadManager, SpillStateType spillStateType, int i) {
        super(internalKeyContext, registeredKeyValueStateBackendMetaInfo, typeSerializer);
        this.spaceAllocator = allocator;
        this.transformationWrapper = new StateTransformationFunctionWrapper();
        this.spillStateType = spillStateType;
        this.spillAndLoadManager = spillAndLoadManager;
        this.numRequests = new long[internalKeyContext.getKeyGroupRange().getNumberOfKeyGroups()];
        for (int i2 = 0; i2 < this.numRequests.length; i2++) {
            this.numRequests[i2] = 0;
        }
        this.stateMemoryEstimator = StateMemoryEstimatorFactory.createSampleEstimator(typeSerializer, registeredKeyValueStateBackendMetaInfo, i);
    }

    @Override // org.apache.flink.runtime.state.heap.StateTable
    public void put(N n, S s) {
        super.put(n, s);
        updateStateEstimate(n, s);
        this.spillAndLoadManager.checkResource();
    }

    @Override // org.apache.flink.runtime.state.heap.StateTable
    public void put(K k, int i, N n, S s) {
        super.put(k, i, n, s);
        this.stateMemoryEstimator.updateEstimatedSize(k, n, s);
        this.spillAndLoadManager.checkResource();
    }

    public long getStateEstimatedSize(boolean z) {
        long estimatedSize = this.stateMemoryEstimator.getEstimatedSize();
        if (estimatedSize == -1 && z) {
            for (StateMap<K, N, S> stateMap : this.keyGroupedStateMaps) {
                if (!stateMap.isEmpty()) {
                    StateEntry<K, N, S> next = stateMap.iterator().next();
                    this.stateMemoryEstimator.forceUpdateEstimatedSize(next.getKey(), next.getNamespace(), next.getState());
                }
            }
            estimatedSize = this.stateMemoryEstimator.getEstimatedSize();
        }
        return estimatedSize;
    }

    public void updateStateEstimate(N n, S s) {
        this.stateMemoryEstimator.updateEstimatedSize(this.keyContext.getCurrentKey(), n, s);
    }

    public void checkResource() {
        this.spillAndLoadManager.checkResource();
    }

    public abstract void spillState(int i);

    public abstract void loadState(int i);

    public abstract float getSpilledRatio();

    @Override // org.apache.flink.runtime.state.heap.StateTable
    public StateMap<K, N, S> getMapForKeyGroup(int i) {
        int indexToOffset = indexToOffset(i);
        if (indexToOffset < 0 || indexToOffset >= this.keyGroupedStateMaps.length) {
            return null;
        }
        this.numRequests[indexToOffset] = this.numRequests[indexToOffset] + 1;
        return this.keyGroupedStateMaps[indexToOffset];
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setMapForKeyGroup(int i, StateMap<K, N, S> stateMap) {
        this.keyGroupedStateMaps[indexToOffset(i)] = stateMap;
    }

    private int indexToOffset(int i) {
        return i - getKeyGroupOffset();
    }

    public Iterator<StateMapMeta> stateMapIterator() {
        return new Iterator<StateMapMeta>() { // from class: org.apache.flink.runtime.state.heap.SpillableStateTable.1
            int next = 0;

            @Override // java.util.Iterator
            public boolean hasNext() {
                return this.next < SpillableStateTable.this.keyGroupedStateMaps.length;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public StateMapMeta next() {
                StateMapMeta stateMapMeta = new StateMapMeta(SpillableStateTable.this, this.next + SpillableStateTable.this.getKeyGroupOffset(), SpillableStateTable.this.keyGroupedStateMaps[this.next] instanceof CopyOnWriteStateMap, SpillableStateTable.this.keyGroupedStateMaps[this.next] instanceof ComplexStateMap, SpillableStateTable.this.keyGroupedStateMaps[this.next].size(), SpillableStateTable.this.numRequests[this.next]);
                this.next++;
                return stateMapMeta;
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CopyOnWriteStateMap<K, N, S> createHashStateMap() {
        return new CopyOnWriteStateMap<>(getStateSerializer());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SpillStateMap<K, N, S> createSpillStateMap() {
        switch (this.spillStateType) {
            case HASH_TABLE:
                return new CopyOnWriteHashTableStateMap(getKeySerializer(), getNamespaceSerializer(), getStateSerializer(), this.spaceAllocator, 2, DEFAULT_LOGICAL_REMOVE_KEYS_RATIO);
            case SKIP_LIST:
                return new CopyOnWriteSkipListStateMap(getKeySerializer(), getNamespaceSerializer(), getStateSerializer(), this.spaceAllocator, 2, DEFAULT_LOGICAL_REMOVE_KEYS_RATIO);
            default:
                throw new RuntimeException("Illegal type of base structure for spill state");
        }
    }

    @Override // org.apache.flink.runtime.state.heap.StateTable
    public <T> void transform(N n, T t, StateTransformationFunction<S, T> stateTransformationFunction) throws Exception {
        this.transformationWrapper.setStateTransformationFunction(n, stateTransformationFunction);
        super.transform(n, t, this.transformationWrapper);
        this.spillAndLoadManager.checkResource();
    }

    public InternalKeyContext<K> getInternalKeyContext() {
        return this.keyContext;
    }

    public StateMap<K, N, S> getCurrentStateMap() {
        return getMapForKeyGroup(this.keyContext.getCurrentKeyGroupIndex());
    }

    @Override // org.apache.flink.runtime.state.heap.StateTable, org.apache.flink.runtime.state.StateSnapshotRestore
    @Nonnull
    public SpillableStateTableSnapshot<K, N, S> stateSnapshot() {
        return new SpillableStateTableSnapshot<>(this, getKeySerializer().duplicate2(), getNamespaceSerializer().duplicate2(), getStateSerializer().duplicate2(), getMetaInfo().getStateSnapshotTransformFactory().createForDeserializedState().orElse(null));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<StateMapSnapshot<K, N, S, ? extends StateMap<K, N, S>>> getStateMapSnapshotList() {
        ArrayList arrayList = new ArrayList(this.keyGroupedStateMaps.length);
        for (int i = 0; i < this.keyGroupedStateMaps.length; i++) {
            arrayList.add(this.keyGroupedStateMaps[i].stateSnapshot());
        }
        return arrayList;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        for (StateMap<K, N, S> stateMap : this.keyGroupedStateMaps) {
            if (stateMap instanceof SpillStateMap) {
                IOUtils.closeQuietly((SpillStateMap) stateMap);
            }
        }
    }
}
