package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.util.Iterator;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.StateEntry;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.heap.CopyOnWriteStateMap;
import org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/ComplexStateMapSnapshot.class */
public class ComplexStateMapSnapshot<K, N, S> extends StateMapSnapshot<K, N, S, ComplexStateMap<K, N, S>> {
    private final CopyOnWriteStateMapSnapshot<K, N, S> heapSnapshot;
    private final SpillStateMapSnapshot<K, N, S, ? extends SpillStateMap<K, N, S>> spillSnapshot;

    /* loaded from: input_file:org/apache/flink/runtime/state/heap/ComplexStateMapSnapshot$IteratorJoin.class */
    static class IteratorJoin<T> implements Iterator<T> {
        private final Iterator<T> first;
        private final Iterator<T> next;

        public IteratorJoin(Iterator<T> it, Iterator<T> it2) {
            this.first = it;
            this.next = it2;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.first.hasNext() || this.next.hasNext();
        }

        @Override // java.util.Iterator
        public T next() {
            return this.first.hasNext() ? this.first.next() : this.next.next();
        }
    }

    public ComplexStateMapSnapshot(ComplexStateMap<K, N, S> complexStateMap, CopyOnWriteStateMapSnapshot<K, N, S> copyOnWriteStateMapSnapshot, SpillStateMapSnapshot<K, N, S, ? extends SpillStateMap<K, N, S>> spillStateMapSnapshot) {
        super(complexStateMap);
        this.heapSnapshot = copyOnWriteStateMapSnapshot;
        this.spillSnapshot = spillStateMapSnapshot;
    }

    @Override // org.apache.flink.runtime.state.heap.StateMapSnapshot
    public Iterator<StateEntry<K, N, S>> getIterator(@NotNull TypeSerializer<K> typeSerializer, @NotNull TypeSerializer<N> typeSerializer2, @NotNull TypeSerializer<S> typeSerializer3, @Nullable StateSnapshotTransformer<S> stateSnapshotTransformer) {
        return new IteratorJoin(this.heapSnapshot.getIterator((TypeSerializer) typeSerializer, (TypeSerializer) typeSerializer2, (TypeSerializer) typeSerializer3, (StateSnapshotTransformer) stateSnapshotTransformer), this.spillSnapshot.getIterator(typeSerializer, typeSerializer2, typeSerializer3, stateSnapshotTransformer));
    }

    @Override // org.apache.flink.runtime.state.heap.StateMapSnapshot
    public void writeState(TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2, TypeSerializer<S> typeSerializer3, @Nonnull DataOutputView dataOutputView, @Nullable StateSnapshotTransformer<S> stateSnapshotTransformer) throws IOException {
        CopyOnWriteStateMapSnapshot.SnapshotIterator<K, N, S> iterator = this.heapSnapshot.getIterator((TypeSerializer) typeSerializer, (TypeSerializer) typeSerializer2, (TypeSerializer) typeSerializer3, (StateSnapshotTransformer) stateSnapshotTransformer);
        dataOutputView.writeInt(iterator.size() + this.spillSnapshot.calculateStateSize(this.spillSnapshot.getSnapshotNodeIterator(false), new SpillableValueSerializer<>(typeSerializer3), stateSnapshotTransformer));
        while (iterator.hasNext()) {
            CopyOnWriteStateMap.StateMapEntry<K, N, S> next = iterator.next();
            typeSerializer2.serialize(next.getNamespace(), dataOutputView);
            typeSerializer.serialize(next.getKey(), dataOutputView);
            typeSerializer3.serialize(next.getState(), dataOutputView);
        }
        this.spillSnapshot.complexModeEnable();
        this.spillSnapshot.writeState(typeSerializer, typeSerializer2, typeSerializer3, dataOutputView, stateSnapshotTransformer);
    }
}
