package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.IterableStateSnapshot;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyValueStateIterator;
import org.apache.flink.runtime.state.ListDelimitedSerializer;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.StateEntry;
import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.util.Preconditions;

@Internal
@NotThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapKeyValueStateIterator.class */
public final class HeapKeyValueStateIterator implements KeyValueStateIterator {
    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
    private final Map<StateUID, Integer> stateNamesToId;
    private final Map<StateUID, StateSnapshot> stateStableSnapshots;
    private final int keyGroupPrefixBytes;
    private boolean isValid;
    private boolean newKeyGroup;
    private boolean newKVState;
    private byte[] currentKey;
    private byte[] currentValue;
    private final Iterator<Integer> keyGroupIterator;
    private int currentKeyGroup;
    private Iterator<StateUID> statesIterator;
    private StateUID currentState;
    private SingleStateIterator currentStateIterator;
    private final DataOutputSerializer valueOut = new DataOutputSerializer(64);
    private final ListDelimitedSerializer listDelimitedSerializer = new ListDelimitedSerializer();
    private final SerializedCompositeKeyBuilder<Object> compositeKeyBuilder;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.runtime.state.heap.HeapKeyValueStateIterator$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapKeyValueStateIterator$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$api$common$state$StateDescriptor$Type = new int[StateDescriptor.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$api$common$state$StateDescriptor$Type[StateDescriptor.Type.AGGREGATING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$state$StateDescriptor$Type[StateDescriptor.Type.REDUCING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$state$StateDescriptor$Type[StateDescriptor.Type.FOLDING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$state$StateDescriptor$Type[StateDescriptor.Type.VALUE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$state$StateDescriptor$Type[StateDescriptor.Type.LIST.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$state$StateDescriptor$Type[StateDescriptor.Type.MAP.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapKeyValueStateIterator$MapStateIterator.class */
    public final class MapStateIterator implements SingleStateIterator {
        private final Iterator<Map.Entry<Object, Object>> mapEntries;
        private final TypeSerializer<Object> userKeySerializer;
        private final TypeSerializer<Object> userValueSerializer;
        private final StateTableIterator parentIterator;
        static final /* synthetic */ boolean $assertionsDisabled;

        private MapStateIterator(Map<Object, Object> map, TypeSerializer<Object> typeSerializer, TypeSerializer<Object> typeSerializer2, StateTableIterator stateTableIterator) {
            if (!$assertionsDisabled && map.isEmpty()) {
                throw new AssertionError();
            }
            this.mapEntries = map.entrySet().iterator();
            this.userKeySerializer = typeSerializer;
            this.userValueSerializer = typeSerializer2;
            this.parentIterator = stateTableIterator;
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyValueStateIterator.SingleStateIterator
        public boolean hasNext() {
            if ($assertionsDisabled || this.mapEntries.hasNext()) {
                return true;
            }
            throw new AssertionError();
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyValueStateIterator.SingleStateIterator
        public boolean writeOutNext() throws IOException {
            Map.Entry<Object, Object> next = this.mapEntries.next();
            HeapKeyValueStateIterator.this.valueOut.clear();
            HeapKeyValueStateIterator.this.currentKey = HeapKeyValueStateIterator.this.compositeKeyBuilder.buildCompositeKeyUserKey(next.getKey(), this.userKeySerializer);
            Object value = next.getValue();
            HeapKeyValueStateIterator.this.valueOut.writeBoolean(value == null);
            this.userValueSerializer.serialize(value, HeapKeyValueStateIterator.this.valueOut);
            HeapKeyValueStateIterator.this.currentValue = HeapKeyValueStateIterator.this.valueOut.getCopyOfBuffer();
            if (this.mapEntries.hasNext()) {
                return true;
            }
            HeapKeyValueStateIterator.this.currentStateIterator = this.parentIterator;
            return true;
        }

        /* synthetic */ MapStateIterator(HeapKeyValueStateIterator heapKeyValueStateIterator, Map map, TypeSerializer typeSerializer, TypeSerializer typeSerializer2, StateTableIterator stateTableIterator, AnonymousClass1 anonymousClass1) {
            this(map, typeSerializer, typeSerializer2, stateTableIterator);
        }

        static {
            $assertionsDisabled = !HeapKeyValueStateIterator.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapKeyValueStateIterator$QueueIterator.class */
    public final class QueueIterator<T> implements SingleStateIterator {
        private final Iterator<T> elementsForKeyGroup;
        private final RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo;
        private final DataOutputSerializer keyOut = new DataOutputSerializer(128);
        private final int afterKeyMark;

        public QueueIterator(Iterator<T> it, RegisteredPriorityQueueStateBackendMetaInfo<T> registeredPriorityQueueStateBackendMetaInfo) throws IOException {
            this.elementsForKeyGroup = it;
            this.metaInfo = registeredPriorityQueueStateBackendMetaInfo;
            CompositeKeySerializationUtils.writeKeyGroup(HeapKeyValueStateIterator.this.keyGroup(), HeapKeyValueStateIterator.this.keyGroupPrefixBytes, this.keyOut);
            this.afterKeyMark = this.keyOut.length();
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyValueStateIterator.SingleStateIterator
        public boolean hasNext() {
            return this.elementsForKeyGroup.hasNext();
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyValueStateIterator.SingleStateIterator
        public boolean writeOutNext() throws IOException {
            HeapKeyValueStateIterator.this.currentValue = HeapKeyValueStateIterator.EMPTY_BYTE_ARRAY;
            this.keyOut.setPosition(this.afterKeyMark);
            this.metaInfo.getElementSerializer().serialize(this.elementsForKeyGroup.next(), this.keyOut);
            HeapKeyValueStateIterator.this.currentKey = this.keyOut.getCopyOfBuffer();
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapKeyValueStateIterator$SingleStateIterator.class */
    public interface SingleStateIterator {
        boolean hasNext();

        boolean writeOutNext() throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapKeyValueStateIterator$StateTableIterator.class */
    public final class StateTableIterator implements SingleStateIterator {
        private final Iterator<? extends StateEntry<?, ?, ?>> entriesIterator;
        private final RegisteredKeyValueStateBackendMetaInfo<?, ?> stateSnapshot;

        private StateTableIterator(Iterator<? extends StateEntry<?, ?, ?>> it, RegisteredKeyValueStateBackendMetaInfo<?, ?> registeredKeyValueStateBackendMetaInfo) {
            this.entriesIterator = it;
            this.stateSnapshot = registeredKeyValueStateBackendMetaInfo;
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyValueStateIterator.SingleStateIterator
        public boolean hasNext() {
            return this.entriesIterator.hasNext();
        }

        @Override // org.apache.flink.runtime.state.heap.HeapKeyValueStateIterator.SingleStateIterator
        public boolean writeOutNext() throws IOException {
            StateEntry<?, ?, ?> next = this.entriesIterator.next();
            HeapKeyValueStateIterator.this.valueOut.clear();
            HeapKeyValueStateIterator.this.compositeKeyBuilder.setKeyAndKeyGroup(next.getKey(), HeapKeyValueStateIterator.this.keyGroup());
            HeapKeyValueStateIterator.this.compositeKeyBuilder.setNamespace(next.getNamespace(), HeapKeyValueStateIterator.castToType(this.stateSnapshot.getNamespaceSerializer()));
            TypeSerializer<?> stateSerializer = this.stateSnapshot.getStateSerializer();
            switch (AnonymousClass1.$SwitchMap$org$apache$flink$api$common$state$StateDescriptor$Type[this.stateSnapshot.getStateType().ordinal()]) {
                case 1:
                case 2:
                case 3:
                case QueryScopeInfo.INFO_CATEGORY_OPERATOR /* 4 */:
                    return writeOutValue(next, stateSerializer);
                case 5:
                    return writeOutList(next, stateSerializer);
                case 6:
                    return writeOutMap(next, stateSerializer);
                default:
                    throw new IllegalStateException("");
            }
        }

        private boolean writeOutValue(StateEntry<?, ?, ?> stateEntry, TypeSerializer<?> typeSerializer) throws IOException {
            HeapKeyValueStateIterator.this.currentKey = HeapKeyValueStateIterator.this.compositeKeyBuilder.build();
            HeapKeyValueStateIterator.castToType(typeSerializer).serialize(stateEntry.getState(), HeapKeyValueStateIterator.this.valueOut);
            HeapKeyValueStateIterator.this.currentValue = HeapKeyValueStateIterator.this.valueOut.getCopyOfBuffer();
            return true;
        }

        private boolean writeOutList(StateEntry<?, ?, ?> stateEntry, TypeSerializer<?> typeSerializer) throws IOException {
            List list = (List) stateEntry.getState();
            if (list.isEmpty()) {
                return false;
            }
            HeapKeyValueStateIterator.this.currentKey = HeapKeyValueStateIterator.this.compositeKeyBuilder.build();
            HeapKeyValueStateIterator.this.currentValue = HeapKeyValueStateIterator.this.listDelimitedSerializer.serializeList(list, ((ListSerializer) typeSerializer).getElementSerializer());
            return true;
        }

        private boolean writeOutMap(StateEntry<?, ?, ?> stateEntry, TypeSerializer<?> typeSerializer) throws IOException {
            Map map = (Map) stateEntry.getState();
            if (map.isEmpty()) {
                return false;
            }
            MapSerializer mapSerializer = (MapSerializer) typeSerializer;
            HeapKeyValueStateIterator.this.currentStateIterator = new MapStateIterator(HeapKeyValueStateIterator.this, map, mapSerializer.getKeySerializer(), mapSerializer.getValueSerializer(), this, null);
            return HeapKeyValueStateIterator.this.currentStateIterator.writeOutNext();
        }

        /* synthetic */ StateTableIterator(HeapKeyValueStateIterator heapKeyValueStateIterator, Iterator it, RegisteredKeyValueStateBackendMetaInfo registeredKeyValueStateBackendMetaInfo, AnonymousClass1 anonymousClass1) {
            this(it, registeredKeyValueStateBackendMetaInfo);
        }
    }

    public HeapKeyValueStateIterator(@Nonnull KeyGroupRange keyGroupRange, @Nonnull TypeSerializer<?> typeSerializer, @Nonnegative int i, @Nonnull Map<StateUID, Integer> map, @Nonnull Map<StateUID, StateSnapshot> map2) throws IOException {
        Preconditions.checkNotNull(keyGroupRange);
        Preconditions.checkNotNull(typeSerializer);
        this.stateNamesToId = (Map) Preconditions.checkNotNull(map);
        this.stateStableSnapshots = (Map) Preconditions.checkNotNull(map2);
        this.statesIterator = map2.keySet().iterator();
        this.keyGroupIterator = keyGroupRange.iterator();
        this.keyGroupPrefixBytes = CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(i);
        this.compositeKeyBuilder = new SerializedCompositeKeyBuilder<>(castToType(typeSerializer), this.keyGroupPrefixBytes, 32);
        if (!this.keyGroupIterator.hasNext() || !this.statesIterator.hasNext()) {
            this.isValid = false;
            return;
        }
        this.currentKeyGroup = this.keyGroupIterator.next().intValue();
        next();
        this.newKeyGroup = true;
    }

    @Override // org.apache.flink.runtime.state.KeyValueStateIterator
    public boolean isValid() {
        return this.isValid;
    }

    @Override // org.apache.flink.runtime.state.KeyValueStateIterator
    public boolean isNewKeyValueState() {
        return this.newKVState;
    }

    @Override // org.apache.flink.runtime.state.KeyValueStateIterator
    public boolean isNewKeyGroup() {
        return this.newKeyGroup;
    }

    @Override // org.apache.flink.runtime.state.KeyValueStateIterator
    public int keyGroup() {
        return this.currentKeyGroup;
    }

    @Override // org.apache.flink.runtime.state.KeyValueStateIterator
    public int kvStateId() {
        return this.stateNamesToId.get(this.currentState).intValue();
    }

    @Override // org.apache.flink.runtime.state.KeyValueStateIterator
    public void next() throws IOException {
        this.newKVState = false;
        this.newKeyGroup = false;
        boolean z = false;
        do {
            if (this.currentState == null && !moveToNextState()) {
                this.isValid = false;
                return;
            }
            boolean z2 = this.currentStateIterator != null && this.currentStateIterator.hasNext();
            if (!z2) {
                this.currentState = null;
            }
            if (z2) {
                z = this.currentStateIterator.writeOutNext();
            }
        } while (!z);
        this.isValid = true;
    }

    private boolean moveToNextState() throws IOException {
        if (this.statesIterator.hasNext()) {
            this.currentState = this.statesIterator.next();
            this.newKVState = true;
        } else {
            if (!this.keyGroupIterator.hasNext()) {
                return false;
            }
            this.currentKeyGroup = this.keyGroupIterator.next().intValue();
            resetStates();
            this.newKeyGroup = true;
            this.newKVState = true;
        }
        setCurrentStateIterator(this.stateStableSnapshots.get(this.currentState));
        return true;
    }

    private void resetStates() {
        this.statesIterator = this.stateStableSnapshots.keySet().iterator();
        this.currentState = this.statesIterator.next();
    }

    private void setCurrentStateIterator(StateSnapshot stateSnapshot) throws IOException {
        if (stateSnapshot instanceof IterableStateSnapshot) {
            this.currentStateIterator = new StateTableIterator(this, ((IterableStateSnapshot) stateSnapshot).getIterator(this.currentKeyGroup), new RegisteredKeyValueStateBackendMetaInfo(stateSnapshot.getMetaInfoSnapshot()), null);
        } else {
            if (!(stateSnapshot instanceof HeapPriorityQueueStateSnapshot)) {
                throw new IllegalStateException("Unknown snapshot type: " + stateSnapshot);
            }
            this.currentStateIterator = new QueueIterator(((HeapPriorityQueueStateSnapshot) stateSnapshot).getIteratorForKeyGroup(this.currentKeyGroup), new RegisteredPriorityQueueStateBackendMetaInfo(stateSnapshot.getMetaInfoSnapshot()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    @Nonnull
    public static <T> TypeSerializer<T> castToType(@Nonnull TypeSerializer<?> typeSerializer) {
        return typeSerializer;
    }

    @Override // org.apache.flink.runtime.state.KeyValueStateIterator
    public byte[] key() {
        return this.currentKey;
    }

    @Override // org.apache.flink.runtime.state.KeyValueStateIterator
    public byte[] value() {
        return this.currentValue;
    }

    @Override // org.apache.flink.runtime.state.KeyValueStateIterator, java.lang.AutoCloseable
    public void close() {
    }
}
