package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/CopyOnWriteHashTableStateMapSnapshot.class */
public class CopyOnWriteHashTableStateMapSnapshot<K, N, S> extends SpillStateMapSnapshot<K, N, S, CopyOnWriteHashTableStateMap<K, N, S>> {
    private final ResourceGuard.Lease lease;
    private final AtomicBoolean closed;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/heap/CopyOnWriteHashTableStateMapSnapshot$SnapshotHashTableNodeIterator.class */
    public class SnapshotHashTableNodeIterator implements Iterator<Tuple2<Long, Long>> {
        private final boolean isPrune;
        private long nextNode;
        private long nextValuePointer;
        private final CopyOnWriteHashTableStateMap<K, N, S>.HashTableNodeIterator stateIterator = getNodeIterator();

        SnapshotHashTableNodeIterator(boolean z) {
            this.isPrune = z;
            advance();
        }

        private CopyOnWriteHashTableStateMap<K, N, S>.HashTableNodeIterator getNodeIterator() {
            Preconditions.checkState(CopyOnWriteHashTableStateMapSnapshot.this.owningStateMap instanceof CopyOnWriteHashTableStateMap, "Only CopyOnWriteHashTableStateMap can use");
            return ((CopyOnWriteHashTableStateMap) CopyOnWriteHashTableStateMapSnapshot.this.owningStateMap).nodeIterator(false);
        }

        private void advance() {
            if (this.nextNode == -1) {
                return;
            }
            long j = -1;
            long j2 = -1;
            while (this.stateIterator.hasNext()) {
                j = this.stateIterator.next().longValue();
                j2 = this.isPrune ? ((CopyOnWriteHashTableStateMap) CopyOnWriteHashTableStateMapSnapshot.this.owningStateMap).getAndPruneValueForSnapshot(j, CopyOnWriteHashTableStateMapSnapshot.this.snapshotVersion) : ((CopyOnWriteHashTableStateMap) CopyOnWriteHashTableStateMapSnapshot.this.owningStateMap).getValueForSnapshot(j, CopyOnWriteHashTableStateMapSnapshot.this.snapshotVersion);
                if ((j2 == -1 ? 0 : ((CopyOnWriteHashTableStateMap) CopyOnWriteHashTableStateMapSnapshot.this.owningStateMap).helpGetValueLen(j2)) != 0) {
                    break;
                } else {
                    j = -1;
                }
            }
            this.nextNode = j;
            this.nextValuePointer = j2;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.nextNode != -1;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public Tuple2<Long, Long> next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            long j = this.nextNode;
            long j2 = this.nextValuePointer;
            advance();
            return Tuple2.of(Long.valueOf(j), Long.valueOf(j2));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CopyOnWriteHashTableStateMapSnapshot(CopyOnWriteHashTableStateMap<K, N, S> copyOnWriteHashTableStateMap, ResourceGuard.Lease lease) {
        super(copyOnWriteHashTableStateMap);
        this.lease = lease;
        this.closed = new AtomicBoolean(false);
    }

    @Override // org.apache.flink.runtime.state.heap.StateMapSnapshot
    public void release() {
        if (this.closed.compareAndSet(false, true)) {
            ((CopyOnWriteHashTableStateMap) this.owningStateMap).releaseSnapshot(this);
            this.lease.close();
        }
    }

    @Override // org.apache.flink.runtime.state.heap.StateMapSnapshot
    public void writeState(TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2, TypeSerializer<S> typeSerializer3, @Nonnull DataOutputView dataOutputView, @Nullable StateSnapshotTransformer<S> stateSnapshotTransformer) throws IOException {
        if (stateSnapshotTransformer == null) {
            writeStateWithNoTransform(dataOutputView);
        } else {
            writeStateWithTransform(typeSerializer3, dataOutputView, stateSnapshotTransformer);
        }
    }

    private void writeStateWithNoTransform(@Nonnull DataOutputView dataOutputView) throws IOException {
        if (!this.complexMode) {
            dataOutputView.writeInt(this.numberOfEntriesInSnapshotData);
        }
        SnapshotHashTableNodeIterator snapshotHashTableNodeIterator = new SnapshotHashTableNodeIterator(true);
        while (snapshotHashTableNodeIterator.hasNext()) {
            Tuple2<Long, Long> next = snapshotHashTableNodeIterator.next();
            writeKeyAndNamespace(next.f0.longValue(), dataOutputView);
            writeValue(next.f1.longValue(), dataOutputView);
        }
    }

    private void writeStateWithTransform(TypeSerializer<S> typeSerializer, @Nonnull DataOutputView dataOutputView, @Nonnull StateSnapshotTransformer<S> stateSnapshotTransformer) throws IOException {
        SpillableValueSerializer<S> spillableValueSerializer = new SpillableValueSerializer<>(typeSerializer);
        SnapshotHashTableNodeIterator snapshotHashTableNodeIterator = new SnapshotHashTableNodeIterator(true);
        if (!this.complexMode) {
            dataOutputView.writeInt(calculateStateSize(snapshotHashTableNodeIterator, spillableValueSerializer, stateSnapshotTransformer));
        }
        writeState(dataOutputView, new SnapshotHashTableNodeIterator(false), typeSerializer, spillableValueSerializer, stateSnapshotTransformer);
    }

    @Override // org.apache.flink.runtime.state.heap.SpillStateMapSnapshot
    void writeKeyAndNamespace(long j, DataOutputView dataOutputView) throws IOException {
        Tuple2<byte[], byte[]> helpGetBytesForKeyAndNamespace = ((CopyOnWriteHashTableStateMap) this.owningStateMap).helpGetBytesForKeyAndNamespace(j);
        dataOutputView.write(helpGetBytesForKeyAndNamespace.f1);
        dataOutputView.write(helpGetBytesForKeyAndNamespace.f0);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.state.heap.SpillStateMapSnapshot
    public Iterator<Tuple2<Long, Long>> getSnapshotNodeIterator(boolean z) {
        return new SnapshotHashTableNodeIterator(z);
    }

    private void writeValue(long j, DataOutputView dataOutputView) throws IOException {
        dataOutputView.write(((CopyOnWriteHashTableStateMap) this.owningStateMap).helpGetBytesForState(j));
    }
}
