package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RestoreOperation;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/SpillableHeapRestoreOperation.class */
public class SpillableHeapRestoreOperation<K> implements RestoreOperation<Void> {
    private final Collection<KeyedStateHandle> restoreStateHandles;
    private final StateSerializerProvider<K> keySerializerProvider;
    private final ClassLoader userCodeClassLoader;
    private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates;
    private final CloseableRegistry cancelStreamRegistry;
    private final HeapPriorityQueueSetFactory priorityQueueSetFactory;

    @Nonnull
    private final KeyGroupRange keyGroupRange;

    @Nonnegative
    private final int numberOfKeyGroups;
    private final SpillableSnapshotStrategy<K> snapshotStrategy;
    private final InternalKeyContext<K> keyContext;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SpillableHeapRestoreOperation(@Nonnull Collection<KeyedStateHandle> collection, StateSerializerProvider<K> stateSerializerProvider, ClassLoader classLoader, Map<String, StateTable<K, ?, ?>> map, Map<String, HeapPriorityQueueSnapshotRestoreWrapper> map2, CloseableRegistry closeableRegistry, HeapPriorityQueueSetFactory heapPriorityQueueSetFactory, @Nonnull KeyGroupRange keyGroupRange, int i, SpillableSnapshotStrategy<K> spillableSnapshotStrategy, InternalKeyContext<K> internalKeyContext) {
        this.restoreStateHandles = collection;
        this.keySerializerProvider = stateSerializerProvider;
        this.userCodeClassLoader = classLoader;
        this.registeredKVStates = map;
        this.registeredPQStates = map2;
        this.cancelStreamRegistry = closeableRegistry;
        this.priorityQueueSetFactory = heapPriorityQueueSetFactory;
        this.keyGroupRange = keyGroupRange;
        this.numberOfKeyGroups = i;
        this.snapshotStrategy = spillableSnapshotStrategy;
        this.keyContext = internalKeyContext;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.runtime.state.RestoreOperation
    public Void restore() throws Exception {
        this.registeredKVStates.clear();
        this.registeredPQStates.clear();
        boolean z = false;
        for (KeyedStateHandle keyedStateHandle : this.restoreStateHandles) {
            if (keyedStateHandle != null) {
                if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
                    throw new IllegalStateException("Unexpected state handle type, expected: " + KeyGroupsStateHandle.class + ", but found: " + keyedStateHandle.getClass());
                }
                KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle;
                FSDataInputStream openInputStream = keyGroupsStateHandle.openInputStream();
                this.cancelStreamRegistry.registerCloseable(openInputStream);
                try {
                    DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(openInputStream);
                    KeyedBackendSerializationProxy keyedBackendSerializationProxy = new KeyedBackendSerializationProxy(this.userCodeClassLoader);
                    keyedBackendSerializationProxy.read(dataInputViewStreamWrapper);
                    if (!z) {
                        TypeSerializerSchemaCompatibility<K> previousSerializerSnapshotForRestoredState = this.keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(keyedBackendSerializationProxy.getKeySerializerSnapshot());
                        if (previousSerializerSnapshotForRestoredState.isCompatibleAfterMigration() || previousSerializerSnapshotForRestoredState.isIncompatible()) {
                            throw new StateMigrationException("The new key serializer must be compatible.");
                        }
                        z = true;
                    }
                    List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = keyedBackendSerializationProxy.getStateMetaInfoSnapshots();
                    HashMap hashMap = new HashMap();
                    createOrCheckStateForMetaInfo(stateMetaInfoSnapshots, hashMap);
                    readStateHandleStateData(openInputStream, dataInputViewStreamWrapper, keyGroupsStateHandle.getGroupRangeOffsets(), hashMap, stateMetaInfoSnapshots.size(), keyedBackendSerializationProxy.getReadVersion(), keyedBackendSerializationProxy.isUsingKeyGroupCompression());
                    if (this.cancelStreamRegistry.unregisterCloseable(openInputStream)) {
                        IOUtils.closeQuietly((InputStream) openInputStream);
                    }
                } catch (Throwable th) {
                    if (this.cancelStreamRegistry.unregisterCloseable(openInputStream)) {
                        IOUtils.closeQuietly((InputStream) openInputStream);
                    }
                    throw th;
                }
            }
        }
        return null;
    }

    private void createOrCheckStateForMetaInfo(List<StateMetaInfoSnapshot> list, Map<Integer, StateMetaInfoSnapshot> map) {
        HeapPriorityQueueSnapshotRestoreWrapper heapPriorityQueueSnapshotRestoreWrapper;
        for (StateMetaInfoSnapshot stateMetaInfoSnapshot : list) {
            switch (stateMetaInfoSnapshot.getBackendStateType()) {
                case KEY_VALUE:
                    heapPriorityQueueSnapshotRestoreWrapper = this.registeredKVStates.get(stateMetaInfoSnapshot.getName());
                    if (heapPriorityQueueSnapshotRestoreWrapper == null) {
                        this.registeredKVStates.put(stateMetaInfoSnapshot.getName(), this.snapshotStrategy.newStateTable(this.keyContext, new RegisteredKeyValueStateBackendMetaInfo(stateMetaInfoSnapshot), this.keySerializerProvider.currentSchemaSerializer()));
                        break;
                    }
                    break;
                case PRIORITY_QUEUE:
                    heapPriorityQueueSnapshotRestoreWrapper = this.registeredPQStates.get(stateMetaInfoSnapshot.getName());
                    if (heapPriorityQueueSnapshotRestoreWrapper == null) {
                        createInternal(new RegisteredPriorityQueueStateBackendMetaInfo<>(stateMetaInfoSnapshot));
                        break;
                    }
                    break;
                default:
                    throw new IllegalStateException("Unexpected state type: " + stateMetaInfoSnapshot.getBackendStateType() + ".");
            }
            if (heapPriorityQueueSnapshotRestoreWrapper == null) {
                map.put(Integer.valueOf(map.size()), stateMetaInfoSnapshot);
            }
        }
    }

    private <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> void createInternal(RegisteredPriorityQueueStateBackendMetaInfo<T> registeredPriorityQueueStateBackendMetaInfo) {
        String name = registeredPriorityQueueStateBackendMetaInfo.getName();
        this.registeredPQStates.put(name, new HeapPriorityQueueSnapshotRestoreWrapper(this.priorityQueueSetFactory.create(name, (TypeSerializer) registeredPriorityQueueStateBackendMetaInfo.getElementSerializer()), registeredPriorityQueueStateBackendMetaInfo, KeyExtractorFunction.forKeyedObjects(), this.keyGroupRange, this.numberOfKeyGroups));
    }

    private void readStateHandleStateData(FSDataInputStream fSDataInputStream, DataInputViewStreamWrapper dataInputViewStreamWrapper, KeyGroupRangeOffsets keyGroupRangeOffsets, Map<Integer, StateMetaInfoSnapshot> map, int i, int i2, boolean z) throws IOException {
        StreamCompressionDecorator streamCompressionDecorator = z ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
        Iterator<Tuple2<Integer, Long>> it = keyGroupRangeOffsets.iterator();
        while (it.hasNext()) {
            Tuple2<Integer, Long> next = it.next();
            int intValue = next.f0.intValue();
            long longValue = next.f1.longValue();
            Preconditions.checkState(this.keyGroupRange.contains(intValue), "The key group must belong to the backend.");
            fSDataInputStream.seek(longValue);
            Preconditions.checkState(dataInputViewStreamWrapper.readInt() == intValue, "Unexpected key-group in restore.");
            InputStream decorateWithCompression = streamCompressionDecorator.decorateWithCompression(fSDataInputStream);
            Throwable th = null;
            try {
                try {
                    readKeyGroupStateData(decorateWithCompression, map, intValue, i, i2);
                    if (decorateWithCompression != null) {
                        if (0 != 0) {
                            try {
                                decorateWithCompression.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            decorateWithCompression.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (decorateWithCompression != null) {
                    if (th != null) {
                        try {
                            decorateWithCompression.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        decorateWithCompression.close();
                    }
                }
                throw th3;
            }
        }
    }

    private void readKeyGroupStateData(InputStream inputStream, Map<Integer, StateMetaInfoSnapshot> map, int i, int i2, int i3) throws IOException {
        StateTable<K, ?, ?> stateTable;
        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(inputStream);
        for (int i4 = 0; i4 < i2; i4++) {
            StateMetaInfoSnapshot stateMetaInfoSnapshot = map.get(Integer.valueOf(dataInputViewStreamWrapper.readShort()));
            switch (stateMetaInfoSnapshot.getBackendStateType()) {
                case KEY_VALUE:
                    stateTable = this.registeredKVStates.get(stateMetaInfoSnapshot.getName());
                    break;
                case PRIORITY_QUEUE:
                    stateTable = (StateTable<K, ?, ?>) this.registeredPQStates.get(stateMetaInfoSnapshot.getName());
                    break;
                default:
                    throw new IllegalStateException("Unexpected state type: " + stateMetaInfoSnapshot.getBackendStateType() + ".");
            }
            stateTable.keyGroupReader(i3).readMappingsInKeyGroup(dataInputViewStreamWrapper, i);
        }
    }
}
