package org.apache.flink.contrib.streaming.state.snapshot;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
import org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.AsyncSnapshotCallable;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.ResourceGuard;
import org.apache.flink.util.function.SupplierWithException;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksIterator;
import org.rocksdb.Snapshot;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.class */
public class RocksFullSnapshotStrategy<K> extends RocksDBSnapshotStrategyBase<K> {
    private static final String DESCRIPTION = "Asynchronous full RocksDB snapshot";

    @Nonnull
    private final StreamCompressionDecorator keyGroupCompressionDecorator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy$MetaData.class */
    public static class MetaData {
        final RocksDBKeyedStateBackend.RocksDbKvStateInfo rocksDbKvStateInfo;
        final StateSnapshotTransformer<byte[]> stateSnapshotTransformer;

        private MetaData(RocksDBKeyedStateBackend.RocksDbKvStateInfo rocksDbKvStateInfo, StateSnapshotTransformer<byte[]> stateSnapshotTransformer) {
            this.rocksDbKvStateInfo = rocksDbKvStateInfo;
            this.stateSnapshotTransformer = stateSnapshotTransformer;
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.class */
    private class SnapshotAsynchronousPartCallable extends AsyncSnapshotCallable<SnapshotResult<KeyedStateHandle>> {

        @Nonnull
        private final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier;

        @Nonnull
        private final ResourceGuard.Lease dbLease;

        @Nonnull
        private final Snapshot snapshot;

        @Nonnull
        private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;

        @Nonnull
        private List<MetaData> metaData;

        @Nonnull
        private final String logPathString;
        static final /* synthetic */ boolean $assertionsDisabled;

        SnapshotAsynchronousPartCallable(@Nonnull SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplierWithException, @Nonnull ResourceGuard.Lease lease, @Nonnull Snapshot snapshot, @Nonnull List<StateMetaInfoSnapshot> list, @Nonnull List<RocksDBKeyedStateBackend.RocksDbKvStateInfo> list2, @Nonnull String str) {
            this.checkpointStreamSupplier = supplierWithException;
            this.dbLease = lease;
            this.snapshot = snapshot;
            this.stateMetaInfoSnapshots = list;
            this.metaData = RocksFullSnapshotStrategy.fillMetaData(list2);
            this.logPathString = str;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.runtime.state.AsyncSnapshotCallable
        public SnapshotResult<KeyedStateHandle> callInternal() throws Exception {
            KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(RocksFullSnapshotStrategy.this.keyGroupRange);
            CheckpointStreamWithResultProvider checkpointStreamWithResultProvider = this.checkpointStreamSupplier.get();
            this.snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider);
            writeSnapshotToOutputStream(checkpointStreamWithResultProvider, keyGroupRangeOffsets);
            if (this.snapshotCloseableRegistry.unregisterCloseable(checkpointStreamWithResultProvider)) {
                return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult(), keyGroupRangeOffsets);
            }
            throw new IOException("Stream is already unregistered/closed.");
        }

        @Override // org.apache.flink.runtime.state.AsyncSnapshotCallable
        protected void cleanupProvidedResources() {
            RocksFullSnapshotStrategy.this.db.releaseSnapshot(this.snapshot);
            IOUtils.closeQuietly(this.snapshot);
            IOUtils.closeQuietly(this.dbLease);
        }

        @Override // org.apache.flink.runtime.state.AsyncSnapshotCallable
        protected void logAsyncSnapshotComplete(long j) {
            RocksFullSnapshotStrategy.this.logAsyncCompleted(this.logPathString, j);
        }

        private void writeSnapshotToOutputStream(@Nonnull CheckpointStreamWithResultProvider checkpointStreamWithResultProvider, @Nonnull KeyGroupRangeOffsets keyGroupRangeOffsets) throws IOException, InterruptedException {
            ArrayList arrayList = new ArrayList(this.metaData.size());
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(checkpointStreamWithResultProvider.getCheckpointOutputStream());
            ReadOptions readOptions = new ReadOptions();
            try {
                readOptions.setSnapshot(this.snapshot);
                writeKVStateMetaData(arrayList, readOptions, dataOutputViewStreamWrapper);
                writeKVStateData(arrayList, checkpointStreamWithResultProvider, keyGroupRangeOffsets);
                Iterator<Tuple2<RocksIteratorWrapper, Integer>> it = arrayList.iterator();
                while (it.hasNext()) {
                    IOUtils.closeQuietly(it.next().f0);
                }
                IOUtils.closeQuietly(readOptions);
            } catch (Throwable th) {
                Iterator<Tuple2<RocksIteratorWrapper, Integer>> it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    IOUtils.closeQuietly(it2.next().f0);
                }
                IOUtils.closeQuietly(readOptions);
                throw th;
            }
        }

        private void writeKVStateMetaData(List<Tuple2<RocksIteratorWrapper, Integer>> list, ReadOptions readOptions, DataOutputView dataOutputView) throws IOException {
            int i = 0;
            for (MetaData metaData : this.metaData) {
                list.add(Tuple2.of(RocksFullSnapshotStrategy.getRocksIterator(RocksFullSnapshotStrategy.this.db, metaData.rocksDbKvStateInfo.columnFamilyHandle, metaData.stateSnapshotTransformer, readOptions), Integer.valueOf(i)));
                i++;
            }
            new KeyedBackendSerializationProxy(RocksFullSnapshotStrategy.this.keySerializer, this.stateMetaInfoSnapshots, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, RocksFullSnapshotStrategy.this.keyGroupCompressionDecorator)).write(dataOutputView);
        }

        private void writeKVStateData(List<Tuple2<RocksIteratorWrapper, Integer>> list, CheckpointStreamWithResultProvider checkpointStreamWithResultProvider, KeyGroupRangeOffsets keyGroupRangeOffsets) throws IOException, InterruptedException {
            byte[] bArr = null;
            byte[] bArr2 = null;
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = null;
            OutputStream outputStream = null;
            CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream = checkpointStreamWithResultProvider.getCheckpointOutputStream();
            try {
                RocksStatesPerKeyGroupMergeIterator rocksStatesPerKeyGroupMergeIterator = new RocksStatesPerKeyGroupMergeIterator(list, RocksFullSnapshotStrategy.this.keyGroupPrefixBytes);
                Throwable th = null;
                try {
                    try {
                        if (rocksStatesPerKeyGroupMergeIterator.isValid()) {
                            keyGroupRangeOffsets.setKeyGroupOffset(rocksStatesPerKeyGroupMergeIterator.keyGroup(), checkpointOutputStream.getPos());
                            outputStream = RocksFullSnapshotStrategy.this.keyGroupCompressionDecorator.decorateWithCompression(checkpointOutputStream);
                            dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(outputStream);
                            dataOutputViewStreamWrapper.writeShort(rocksStatesPerKeyGroupMergeIterator.kvStateId());
                            bArr = rocksStatesPerKeyGroupMergeIterator.key();
                            bArr2 = rocksStatesPerKeyGroupMergeIterator.value();
                            rocksStatesPerKeyGroupMergeIterator.next();
                        }
                        while (rocksStatesPerKeyGroupMergeIterator.isValid()) {
                            if (!$assertionsDisabled && RocksSnapshotUtil.hasMetaDataFollowsFlag(bArr)) {
                                throw new AssertionError();
                            }
                            if (rocksStatesPerKeyGroupMergeIterator.isNewKeyGroup() || rocksStatesPerKeyGroupMergeIterator.isNewKeyValueState()) {
                                checkInterrupted();
                                RocksSnapshotUtil.setMetaDataFollowsFlagInKey(bArr);
                            }
                            writeKeyValuePair(bArr, bArr2, dataOutputViewStreamWrapper);
                            if (rocksStatesPerKeyGroupMergeIterator.isNewKeyGroup()) {
                                dataOutputViewStreamWrapper.writeShort(65535);
                                outputStream.close();
                                keyGroupRangeOffsets.setKeyGroupOffset(rocksStatesPerKeyGroupMergeIterator.keyGroup(), checkpointOutputStream.getPos());
                                outputStream = RocksFullSnapshotStrategy.this.keyGroupCompressionDecorator.decorateWithCompression(checkpointOutputStream);
                                dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(outputStream);
                                dataOutputViewStreamWrapper.writeShort(rocksStatesPerKeyGroupMergeIterator.kvStateId());
                            } else if (rocksStatesPerKeyGroupMergeIterator.isNewKeyValueState()) {
                                dataOutputViewStreamWrapper.writeShort(rocksStatesPerKeyGroupMergeIterator.kvStateId());
                            }
                            bArr = rocksStatesPerKeyGroupMergeIterator.key();
                            bArr2 = rocksStatesPerKeyGroupMergeIterator.value();
                            rocksStatesPerKeyGroupMergeIterator.next();
                        }
                        if (rocksStatesPerKeyGroupMergeIterator != null) {
                            if (0 != 0) {
                                try {
                                    rocksStatesPerKeyGroupMergeIterator.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                rocksStatesPerKeyGroupMergeIterator.close();
                            }
                        }
                        if (bArr != null) {
                            if (!$assertionsDisabled && RocksSnapshotUtil.hasMetaDataFollowsFlag(bArr)) {
                                throw new AssertionError();
                            }
                            RocksSnapshotUtil.setMetaDataFollowsFlagInKey(bArr);
                            writeKeyValuePair(bArr, bArr2, dataOutputViewStreamWrapper);
                            dataOutputViewStreamWrapper.writeShort(65535);
                            outputStream.close();
                            outputStream = null;
                        }
                    } finally {
                    }
                } finally {
                }
            } finally {
                IOUtils.closeQuietly(outputStream);
            }
        }

        private void writeKeyValuePair(byte[] bArr, byte[] bArr2, DataOutputView dataOutputView) throws IOException {
            BytePrimitiveArraySerializer.INSTANCE.serialize(bArr, dataOutputView);
            BytePrimitiveArraySerializer.INSTANCE.serialize(bArr2, dataOutputView);
        }

        private void checkInterrupted() throws InterruptedException {
            if (Thread.currentThread().isInterrupted()) {
                throw new InterruptedException("RocksDB snapshot interrupted.");
            }
        }

        static {
            $assertionsDisabled = !RocksFullSnapshotStrategy.class.desiredAssertionStatus();
        }
    }

    public RocksFullSnapshotStrategy(@Nonnull RocksDB rocksDB, @Nonnull ResourceGuard resourceGuard, @Nonnull TypeSerializer<K> typeSerializer, @Nonnull LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> linkedHashMap, @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int i, @Nonnull LocalRecoveryConfig localRecoveryConfig, @Nonnull CloseableRegistry closeableRegistry, @Nonnull StreamCompressionDecorator streamCompressionDecorator) {
        super(DESCRIPTION, rocksDB, resourceGuard, typeSerializer, linkedHashMap, keyGroupRange, i, localRecoveryConfig, closeableRegistry);
        this.keyGroupCompressionDecorator = streamCompressionDecorator;
    }

    @Override // org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase
    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> doSnapshot(long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        SupplierWithException<CheckpointStreamWithResultProvider, Exception> createCheckpointStreamSupplier = createCheckpointStreamSupplier(j, checkpointStreamFactory, checkpointOptions);
        ArrayList arrayList = new ArrayList(this.kvStateInformation.size());
        ArrayList arrayList2 = new ArrayList(this.kvStateInformation.size());
        for (RocksDBKeyedStateBackend.RocksDbKvStateInfo rocksDbKvStateInfo : this.kvStateInformation.values()) {
            arrayList.add(rocksDbKvStateInfo.metaInfo.snapshot());
            arrayList2.add(rocksDbKvStateInfo);
        }
        return new SnapshotAsynchronousPartCallable(createCheckpointStreamSupplier, this.rocksDBResourceGuard.acquireResource(), this.db.getSnapshot(), arrayList, arrayList2, checkpointStreamFactory.toString()).toAsyncSnapshotFutureTask(this.cancelStreamRegistry);
    }

    @Override // org.apache.flink.api.common.state.CheckpointListener
    public void notifyCheckpointComplete(long j) {
    }

    @Override // org.apache.flink.api.common.state.CheckpointListener
    public void notifyCheckpointAborted(long j) {
    }

    private SupplierWithException<CheckpointStreamWithResultProvider, Exception> createCheckpointStreamSupplier(long j, CheckpointStreamFactory checkpointStreamFactory, CheckpointOptions checkpointOptions) {
        return (!this.localRecoveryConfig.isLocalRecoveryEnabled() || checkpointOptions.getCheckpointType().isSavepoint()) ? () -> {
            return CheckpointStreamWithResultProvider.createSimpleStream(CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory);
        } : () -> {
            return CheckpointStreamWithResultProvider.createDuplicatingStream(j, CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory, this.localRecoveryConfig.getLocalStateDirectoryProvider());
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<MetaData> fillMetaData(List<RocksDBKeyedStateBackend.RocksDbKvStateInfo> list) {
        ArrayList arrayList = new ArrayList(list.size());
        for (RocksDBKeyedStateBackend.RocksDbKvStateInfo rocksDbKvStateInfo : list) {
            StateSnapshotTransformer<byte[]> stateSnapshotTransformer = null;
            if (rocksDbKvStateInfo.metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) {
                stateSnapshotTransformer = ((RegisteredKeyValueStateBackendMetaInfo) rocksDbKvStateInfo.metaInfo).getStateSnapshotTransformFactory().createForSerializedState().orElse(null);
            }
            arrayList.add(new MetaData(rocksDbKvStateInfo, stateSnapshotTransformer));
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static RocksIteratorWrapper getRocksIterator(RocksDB rocksDB, ColumnFamilyHandle columnFamilyHandle, StateSnapshotTransformer<byte[]> stateSnapshotTransformer, ReadOptions readOptions) {
        RocksIterator newIterator = rocksDB.newIterator(columnFamilyHandle, readOptions);
        return stateSnapshotTransformer == null ? new RocksIteratorWrapper(newIterator) : new RocksTransformingIteratorWrapper(newIterator, stateSnapshotTransformer);
    }
}
