package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.StateEntry;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.heap.CopyOnWriteStateMap;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/CopyOnWriteStateMapSnapshot.class */
public class CopyOnWriteStateMapSnapshot<K, N, S> extends StateMapSnapshot<K, N, S, CopyOnWriteStateMap<K, N, S>> {
    private final int snapshotVersion;

    @Nonnull
    private final CopyOnWriteStateMap.StateMapEntry<K, N, S>[] snapshotData;

    @Nonnegative
    private final int numberOfEntriesInSnapshotData;
    private boolean released;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/heap/CopyOnWriteStateMapSnapshot$NonTransformSnapshotIterator.class */
    public static class NonTransformSnapshotIterator<K, N, S> extends SnapshotIterator<K, N, S> {
        NonTransformSnapshotIterator(int i, CopyOnWriteStateMap.StateMapEntry<K, N, S>[] stateMapEntryArr) {
            super(i, stateMapEntryArr, null);
        }

        @Override // org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.SnapshotIterator
        void transform(@Nullable StateSnapshotTransformer<S> stateSnapshotTransformer) {
        }

        @Override // org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.SnapshotIterator
        public int size() {
            return this.numberOfEntriesInSnapshotData;
        }

        @Override // org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.SnapshotIterator
        Iterator<CopyOnWriteStateMap.StateMapEntry<K, N, S>> getChainIterator() {
            return Arrays.stream(this.snapshotData).filter((v0) -> {
                return Objects.nonNull(v0);
            }).iterator();
        }

        @Override // org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.SnapshotIterator
        Iterator<CopyOnWriteStateMap.StateMapEntry<K, N, S>> getEntryIterator(final CopyOnWriteStateMap.StateMapEntry<K, N, S> stateMapEntry) {
            return new Iterator<CopyOnWriteStateMap.StateMapEntry<K, N, S>>() { // from class: org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.NonTransformSnapshotIterator.1
                CopyOnWriteStateMap.StateMapEntry<K, N, S> nextEntry;

                {
                    this.nextEntry = stateMapEntry;
                }

                @Override // java.util.Iterator
                public boolean hasNext() {
                    return this.nextEntry != null;
                }

                @Override // java.util.Iterator
                public CopyOnWriteStateMap.StateMapEntry<K, N, S> next() {
                    if (this.nextEntry == null) {
                        throw new NoSuchElementException();
                    }
                    CopyOnWriteStateMap.StateMapEntry<K, N, S> stateMapEntry2 = this.nextEntry;
                    this.nextEntry = this.nextEntry.next;
                    return stateMapEntry2;
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/heap/CopyOnWriteStateMapSnapshot$SnapshotIterator.class */
    public static abstract class SnapshotIterator<K, N, S> implements Iterator<StateEntry<K, N, S>> {
        int numberOfEntriesInSnapshotData;
        CopyOnWriteStateMap.StateMapEntry<K, N, S>[] snapshotData;
        Iterator<CopyOnWriteStateMap.StateMapEntry<K, N, S>> chainIterator;
        Iterator<CopyOnWriteStateMap.StateMapEntry<K, N, S>> entryIterator;

        SnapshotIterator(int i, CopyOnWriteStateMap.StateMapEntry<K, N, S>[] stateMapEntryArr, @Nullable StateSnapshotTransformer<S> stateSnapshotTransformer) {
            this.numberOfEntriesInSnapshotData = i;
            this.snapshotData = stateMapEntryArr;
            transform(stateSnapshotTransformer);
            this.chainIterator = getChainIterator();
            this.entryIterator = Collections.emptyIterator();
        }

        abstract int size();

        abstract void transform(@Nullable StateSnapshotTransformer<S> stateSnapshotTransformer);

        abstract Iterator<CopyOnWriteStateMap.StateMapEntry<K, N, S>> getChainIterator();

        abstract Iterator<CopyOnWriteStateMap.StateMapEntry<K, N, S>> getEntryIterator(CopyOnWriteStateMap.StateMapEntry<K, N, S> stateMapEntry);

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.entryIterator.hasNext() || this.chainIterator.hasNext();
        }

        @Override // java.util.Iterator
        public CopyOnWriteStateMap.StateMapEntry<K, N, S> next() {
            if (this.entryIterator.hasNext()) {
                return this.entryIterator.next();
            }
            this.entryIterator = getEntryIterator(this.chainIterator.next());
            return this.entryIterator.next();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/heap/CopyOnWriteStateMapSnapshot$TransformedSnapshotIterator.class */
    public static class TransformedSnapshotIterator<K, N, S> extends SnapshotIterator<K, N, S> {
        TransformedSnapshotIterator(int i, CopyOnWriteStateMap.StateMapEntry<K, N, S>[] stateMapEntryArr, @Nonnull StateSnapshotTransformer<S> stateSnapshotTransformer) {
            super(i, stateMapEntryArr, stateSnapshotTransformer);
        }

        int moveChainsToBackOfArray() {
            int length = this.snapshotData.length - 1;
            while (length >= 0 && this.snapshotData[length] != null) {
                length--;
            }
            int i = length;
            while (true) {
                length--;
                if (length < 0) {
                    return i + 1;
                }
                CopyOnWriteStateMap.StateMapEntry<K, N, S> stateMapEntry = this.snapshotData[length];
                if (stateMapEntry != null) {
                    this.snapshotData[i] = stateMapEntry;
                    this.snapshotData[length] = null;
                    i--;
                }
            }
        }

        @Override // org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.SnapshotIterator
        void transform(@Nullable StateSnapshotTransformer<S> stateSnapshotTransformer) {
            Preconditions.checkNotNull(stateSnapshotTransformer);
            int i = 0;
            for (int moveChainsToBackOfArray = moveChainsToBackOfArray(); moveChainsToBackOfArray < this.snapshotData.length; moveChainsToBackOfArray++) {
                CopyOnWriteStateMap.StateMapEntry<K, N, S> stateMapEntry = this.snapshotData[moveChainsToBackOfArray];
                while (true) {
                    CopyOnWriteStateMap.StateMapEntry<K, N, S> stateMapEntry2 = stateMapEntry;
                    if (stateMapEntry2 != null) {
                        S filterOrTransform = stateSnapshotTransformer.filterOrTransform(stateMapEntry2.state);
                        if (filterOrTransform != null) {
                            CopyOnWriteStateMap.StateMapEntry<K, N, S> stateMapEntry3 = stateMapEntry2;
                            if (filterOrTransform != stateMapEntry2.state) {
                                stateMapEntry3 = new CopyOnWriteStateMap.StateMapEntry<>(stateMapEntry2, stateMapEntry2.entryVersion);
                                stateMapEntry3.state = filterOrTransform;
                            }
                            int i2 = i;
                            i++;
                            this.snapshotData[i2] = stateMapEntry3;
                        }
                        stateMapEntry = stateMapEntry2.next;
                    }
                }
            }
            this.numberOfEntriesInSnapshotData = i;
        }

        @Override // org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.SnapshotIterator
        public int size() {
            return this.numberOfEntriesInSnapshotData;
        }

        @Override // org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.SnapshotIterator
        Iterator<CopyOnWriteStateMap.StateMapEntry<K, N, S>> getChainIterator() {
            return Arrays.stream(this.snapshotData, 0, this.numberOfEntriesInSnapshotData).iterator();
        }

        @Override // org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.SnapshotIterator
        Iterator<CopyOnWriteStateMap.StateMapEntry<K, N, S>> getEntryIterator(CopyOnWriteStateMap.StateMapEntry<K, N, S> stateMapEntry) {
            return Collections.singleton(stateMapEntry).iterator();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CopyOnWriteStateMapSnapshot(CopyOnWriteStateMap<K, N, S> copyOnWriteStateMap) {
        super(copyOnWriteStateMap);
        this.snapshotData = copyOnWriteStateMap.snapshotMapArrays();
        this.snapshotVersion = copyOnWriteStateMap.getStateMapVersion();
        this.numberOfEntriesInSnapshotData = copyOnWriteStateMap.size();
        this.released = false;
    }

    @Override // org.apache.flink.runtime.state.heap.StateMapSnapshot
    public void release() {
        if (this.released) {
            return;
        }
        ((CopyOnWriteStateMap) this.owningStateMap).releaseSnapshot(this);
        this.released = true;
    }

    public boolean isReleased() {
        return this.released;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getSnapshotVersion() {
        return this.snapshotVersion;
    }

    SnapshotIterator<K, N, S> getSnapshotIterator(@Nullable StateSnapshotTransformer<S> stateSnapshotTransformer) {
        return stateSnapshotTransformer == null ? new NonTransformSnapshotIterator(this.numberOfEntriesInSnapshotData, this.snapshotData) : new TransformedSnapshotIterator(this.numberOfEntriesInSnapshotData, this.snapshotData, stateSnapshotTransformer);
    }

    @Override // org.apache.flink.runtime.state.heap.StateMapSnapshot
    public void writeState(TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2, TypeSerializer<S> typeSerializer3, @Nonnull DataOutputView dataOutputView, @Nullable StateSnapshotTransformer<S> stateSnapshotTransformer) throws IOException {
        SnapshotIterator<K, N, S> snapshotIterator = getSnapshotIterator(stateSnapshotTransformer);
        dataOutputView.writeInt(snapshotIterator.size());
        while (snapshotIterator.hasNext()) {
            CopyOnWriteStateMap.StateMapEntry<K, N, S> next = snapshotIterator.next();
            typeSerializer2.serialize(next.getNamespace(), dataOutputView);
            typeSerializer.serialize(next.getKey(), dataOutputView);
            typeSerializer3.serialize(next.getState(), dataOutputView);
        }
    }
}
