package org.apache.flink.runtime.state.heap;

import java.util.Iterator;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.state.heap.space.Allocator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/SpillStateMap.class */
public abstract class SpillStateMap<K, N, S> extends StateMap<K, N, S> implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SpillStateMap.class);
    static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
    static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
    protected final SpillableKeySerializer<K, N> spillableKeySerializer;
    protected final SpillableValueSerializer<S> spillableValueSerializer;
    protected final Allocator spaceAllocator;
    protected volatile int stateMapVersion;
    protected int requestCount;
    protected volatile int highestRequiredSnapshotVersionPlusOne;
    protected volatile int highestFinishedSnapshotVersion;
    protected final TreeSet<Integer> snapshotVersions;
    protected float logicalRemovedKeysRatio;
    protected int numKeysToDeleteOneTime;
    protected final Set<Long> pruningValueNodes;
    private final AtomicBoolean closed;
    private final ResourceGuard resourceGuard;
    protected long lastReqCnt;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/heap/SpillStateMap$Node.class */
    public static class Node {
        MemorySegment nodeSegment;
        int nodeOffset;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Node(MemorySegment memorySegment, int i) {
            this.nodeSegment = memorySegment;
            this.nodeOffset = i;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/heap/SpillStateMap$SpillNodePointers.class */
    static class SpillNodePointers {
        long prevNode;
        long currentNode;
        long nextNode;

        /* JADX INFO: Access modifiers changed from: package-private */
        public SpillNodePointers(long j, long j2, long j3) {
            this.prevNode = j;
            this.currentNode = j2;
            this.nextNode = j3;
        }
    }

    public SpillStateMap(@Nonnull TypeSerializer<K> typeSerializer, @Nonnull TypeSerializer<N> typeSerializer2, @Nonnull TypeSerializer<S> typeSerializer3, @Nonnull Allocator allocator, int i, float f) {
        this.spillableKeySerializer = new SpillableKeySerializer<>(typeSerializer, typeSerializer2);
        this.spillableValueSerializer = new SpillableValueSerializer<>(typeSerializer3);
        this.spaceAllocator = allocator;
        Preconditions.checkArgument(i >= 0, "numKeysToDeleteOneTime should be non-negative, but is " + i);
        this.numKeysToDeleteOneTime = i;
        Preconditions.checkArgument(f >= 0.0f && f <= 1.0f, "logicalRemovedKeysRatio should be in [0, 1], but is " + f);
        this.logicalRemovedKeysRatio = f;
        this.pruningValueNodes = ConcurrentHashMap.newKeySet();
        this.closed = new AtomicBoolean(false);
        this.resourceGuard = new ResourceGuard();
        this.snapshotVersions = new TreeSet<>();
        this.stateMapVersion = 0;
        this.highestRequiredSnapshotVersionPlusOne = 0;
        this.highestFinishedSnapshotVersion = 0;
        this.requestCount = 0;
        this.lastReqCnt = 0L;
    }

    @VisibleForTesting
    Set<Integer> getSnapshotVersions() {
        return this.snapshotVersions;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getStateMapVersion() {
        return this.stateMapVersion;
    }

    @Override // org.apache.flink.runtime.state.heap.StateMap
    public void releaseSnapshot(StateMapSnapshot<K, N, S, ? extends StateMap<K, N, S>> stateMapSnapshot) {
        SpillStateMapSnapshot spillStateMapSnapshot = (SpillStateMapSnapshot) stateMapSnapshot;
        int snapshotVersion = spillStateMapSnapshot.getSnapshotVersion();
        Preconditions.checkArgument(spillStateMapSnapshot.isOwner(this), "Cannot release snapshot which is owned by a different state map.");
        synchronized (this.snapshotVersions) {
            Preconditions.checkState(this.snapshotVersions.remove(Integer.valueOf(snapshotVersion)), "Attempt to release unknown snapshot version");
            this.highestRequiredSnapshotVersionPlusOne = this.snapshotVersions.isEmpty() ? 0 : this.snapshotVersions.last().intValue();
            this.highestFinishedSnapshotVersion = this.snapshotVersions.isEmpty() ? this.stateMapVersion : this.snapshotVersions.first().intValue() - 1;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isClosed() {
        return this.closed.get();
    }

    @VisibleForTesting
    ResourceGuard getResourceGuard() {
        return this.resourceGuard;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void tryToDeleteNodesPhysically() {
        int i;
        int size;
        if (this.highestRequiredSnapshotVersionPlusOne == 0 && (size = getLogicallyRemovedNodes().size()) > (i = (int) (totalSize() * this.logicalRemovedKeysRatio))) {
            deleteLogicallyRemovedNodes(size - i);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.state.heap.StateMap
    public long getRequestCountForLastRemovedKey() {
        return this.lastReqCnt;
    }

    abstract void deleteLogicallyRemovedNodes(int i);

    abstract S helpGetState(long j);

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract S helpGetState(long j, SpillableValueSerializer<S> spillableValueSerializer);

    abstract int totalSize();

    public abstract int getRequestCount();

    abstract Set<?> getLogicallyRemovedNodes();

    abstract Set<Long> getPruningValueNodes();

    abstract Iterator<Long> getNodeIterator();

    protected abstract void releaseAllResource();

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract Tuple2<byte[], byte[]> helpGetBytesForKeyAndNamespace(long j);

    abstract Iterator<Long> getNamespaceIterator(MemorySegment memorySegment, int i, int i2);

    @VisibleForTesting
    int getHighestRequiredSnapshotVersionPlusOne() {
        return this.highestRequiredSnapshotVersionPlusOne;
    }

    @VisibleForTesting
    int getHighestFinishedSnapshotVersion() {
        return this.highestFinishedSnapshotVersion;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            LOG.warn("State map has been closed");
        } else {
            this.resourceGuard.close();
            releaseAllResource();
        }
    }

    @Override // org.apache.flink.runtime.state.heap.StateMap
    @Nonnull
    public abstract SpillStateMapSnapshot<K, N, S, ? extends SpillStateMap<K, N, S>> stateSnapshot();

    /* JADX INFO: Access modifiers changed from: package-private */
    public ResourceGuard.Lease tryGetLease() {
        try {
            return getResourceGuard().acquireResource();
        } catch (Exception e) {
            throw new RuntimeException("Acquire resource failed, and can't make snapshot of state map", e);
        }
    }
}
