package org.apache.flink.contrib.streaming.state.restore;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil;
import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDBException;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.class */
public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperation<K> {
    private KeyGroupsStateHandle currentKeyGroupsStateHandle;
    private FSDataInputStream currentStateHandleInStream;
    private DataInputView currentStateHandleInView;
    private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies;
    private StreamCompressionDecorator keygroupStreamCompressionDecorator;
    private final long writeBatchSize;
    private final PriorityQueueFlag queueRestoreEnabled;

    public RocksDBFullRestoreOperation(KeyGroupRange keyGroupRange, int i, int i2, CloseableRegistry closeableRegistry, ClassLoader classLoader, Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> map, StateSerializerProvider<K> stateSerializerProvider, File file, File file2, DBOptions dBOptions, Function<String, ColumnFamilyOptions> function, RocksDBNativeMetricOptions rocksDBNativeMetricOptions, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> collection, @Nonnull RocksDbTtlCompactFiltersManager rocksDbTtlCompactFiltersManager, @Nonnegative long j, Long l, PriorityQueueFlag priorityQueueFlag) {
        super(keyGroupRange, i, i2, closeableRegistry, classLoader, map, stateSerializerProvider, file, file2, dBOptions, function, rocksDBNativeMetricOptions, metricGroup, collection, rocksDbTtlCompactFiltersManager, l);
        Preconditions.checkArgument(j >= 0, "Write batch size have to be no negative.");
        this.writeBatchSize = j;
        this.queueRestoreEnabled = priorityQueueFlag;
    }

    @Override // org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation, org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreOperation
    /* renamed from: restore */
    public RocksDBRestoreResult mo33restore() throws IOException, StateMigrationException, RocksDBException {
        openDB();
        Iterator<KeyedStateHandle> it = this.restoreStateHandles.iterator();
        while (it.hasNext()) {
            KeyGroupsStateHandle keyGroupsStateHandle = (KeyedStateHandle) it.next();
            if (keyGroupsStateHandle != null) {
                if (!(keyGroupsStateHandle instanceof KeyGroupsStateHandle)) {
                    throw StateUtil.unexpectedStateHandleException(KeyGroupsStateHandle.class, keyGroupsStateHandle.getClass());
                }
                this.currentKeyGroupsStateHandle = keyGroupsStateHandle;
                restoreKeyGroupsInStateHandle();
            }
        }
        return new RocksDBRestoreResult(this.db, this.defaultColumnFamilyHandle, this.nativeMetricMonitor, -1L, null, null);
    }

    private void restoreKeyGroupsInStateHandle() throws IOException, StateMigrationException, RocksDBException {
        try {
            this.logger.info("Starting to restore from state handle: {}.", this.currentKeyGroupsStateHandle);
            this.currentStateHandleInStream = this.currentKeyGroupsStateHandle.openInputStream();
            this.cancelStreamRegistry.registerCloseable(this.currentStateHandleInStream);
            this.currentStateHandleInView = new DataInputViewStreamWrapper(this.currentStateHandleInStream);
            restoreKVStateMetaData();
            restoreKVStateData();
            this.logger.info("Finished restoring from state handle: {}.", this.currentKeyGroupsStateHandle);
        } finally {
            if (this.cancelStreamRegistry.unregisterCloseable(this.currentStateHandleInStream)) {
                IOUtils.closeQuietly(this.currentStateHandleInStream);
            }
        }
    }

    private void restoreKVStateMetaData() throws IOException, StateMigrationException {
        KeyedBackendSerializationProxy<K> readMetaData = readMetaData(this.currentStateHandleInView);
        this.keygroupStreamCompressionDecorator = readMetaData.isUsingKeyGroupCompression() ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
        List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = readMetaData.getStateMetaInfoSnapshots();
        this.currentStateHandleKVStateColumnFamilies = new ArrayList(stateMetaInfoSnapshots.size());
        for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
            if (stateMetaInfoSnapshot.getBackendStateType() == StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE && this.queueRestoreEnabled == PriorityQueueFlag.THROW_ON_PRIORITY_QUEUE) {
                throw new StateMigrationException("Can not restore savepoint taken with RocksDB timers enabled with Heap timers!");
            }
            this.currentStateHandleKVStateColumnFamilies.add(getOrRegisterStateColumnFamilyHandle(null, stateMetaInfoSnapshot).columnFamilyHandle);
        }
    }

    private void restoreKVStateData() throws IOException, RocksDBException {
        RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper = new RocksDBWriteBatchWrapper(this.db, this.writeBatchSize);
        Throwable th = null;
        try {
            Iterator it = this.currentKeyGroupsStateHandle.getGroupRangeOffsets().iterator();
            while (it.hasNext()) {
                Tuple2 tuple2 = (Tuple2) it.next();
                Preconditions.checkState(this.keyGroupRange.contains(((Integer) tuple2.f0).intValue()), "The key group must belong to the backend");
                long longValue = ((Long) tuple2.f1).longValue();
                if (0 != longValue) {
                    this.currentStateHandleInStream.seek(longValue);
                    InputStream decorateWithCompression = this.keygroupStreamCompressionDecorator.decorateWithCompression(this.currentStateHandleInStream);
                    Throwable th2 = null;
                    try {
                        try {
                            DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(decorateWithCompression);
                            ColumnFamilyHandle columnFamilyHandle = this.currentStateHandleKVStateColumnFamilies.get(dataInputViewStreamWrapper.readShort());
                            boolean z = true;
                            while (z) {
                                byte[] deserialize = BytePrimitiveArraySerializer.INSTANCE.deserialize(dataInputViewStreamWrapper);
                                byte[] deserialize2 = BytePrimitiveArraySerializer.INSTANCE.deserialize(dataInputViewStreamWrapper);
                                if (RocksSnapshotUtil.hasMetaDataFollowsFlag(deserialize)) {
                                    RocksSnapshotUtil.clearMetaDataFollowsFlag(deserialize);
                                    rocksDBWriteBatchWrapper.put(columnFamilyHandle, deserialize, deserialize2);
                                    int readShort = 65535 & dataInputViewStreamWrapper.readShort();
                                    if (65535 == readShort) {
                                        z = false;
                                    } else {
                                        columnFamilyHandle = this.currentStateHandleKVStateColumnFamilies.get(readShort);
                                    }
                                } else {
                                    rocksDBWriteBatchWrapper.put(columnFamilyHandle, deserialize, deserialize2);
                                }
                            }
                            if (decorateWithCompression != null) {
                                if (0 != 0) {
                                    try {
                                        decorateWithCompression.close();
                                    } catch (Throwable th3) {
                                        th2.addSuppressed(th3);
                                    }
                                } else {
                                    decorateWithCompression.close();
                                }
                            }
                        } catch (Throwable th4) {
                            th2 = th4;
                            throw th4;
                        }
                    } catch (Throwable th5) {
                        if (decorateWithCompression != null) {
                            if (th2 != null) {
                                try {
                                    decorateWithCompression.close();
                                } catch (Throwable th6) {
                                    th2.addSuppressed(th6);
                                }
                            } else {
                                decorateWithCompression.close();
                            }
                        }
                        throw th5;
                    }
                }
            }
            if (rocksDBWriteBatchWrapper != null) {
                if (0 == 0) {
                    rocksDBWriteBatchWrapper.close();
                    return;
                }
                try {
                    rocksDBWriteBatchWrapper.close();
                } catch (Throwable th7) {
                    th.addSuppressed(th7);
                }
            }
        } catch (Throwable th8) {
            if (rocksDBWriteBatchWrapper != null) {
                if (0 != 0) {
                    try {
                        rocksDBWriteBatchWrapper.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    rocksDBWriteBatchWrapper.close();
                }
            }
            throw th8;
        }
    }
}
