package org.apache.flink.contrib.streaming.state;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.PriorityQueueFlag;
import org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreResult;
import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase;
import org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy;
import org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy;
import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.class */
public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBuilder<K> {
    static final String DB_INSTANCE_DIR_STRING = "db";
    private final String operatorIdentifier;
    private final RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType;
    private final LocalRecoveryConfig localRecoveryConfig;
    private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
    private final RocksDBResourceContainer optionsContainer;
    private final File instanceBasePath;
    private final File instanceRocksDBPath;
    private final MetricGroup metricGroup;
    private boolean enableIncrementalCheckpointing;
    private RocksDBNativeMetricOptions nativeMetricOptions;
    private int numberOfTransferingThreads;
    private long writeBatchSize;
    private RocksDB injectedTestDB;
    private ColumnFamilyHandle injectedDefaultColumnFamilyHandle;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder$SnapshotStrategy.class */
    public static final class SnapshotStrategy<K> {
        final RocksDBSnapshotStrategyBase<K> checkpointSnapshotStrategy;
        final RocksDBSnapshotStrategyBase<K> savepointSnapshotStrategy;

        SnapshotStrategy(RocksDBSnapshotStrategyBase<K> rocksDBSnapshotStrategyBase, RocksDBSnapshotStrategyBase<K> rocksDBSnapshotStrategyBase2) {
            this.checkpointSnapshotStrategy = rocksDBSnapshotStrategyBase;
            this.savepointSnapshotStrategy = rocksDBSnapshotStrategyBase2;
        }
    }

    public RocksDBKeyedStateBackendBuilder(String str, ClassLoader classLoader, File file, RocksDBResourceContainer rocksDBResourceContainer, Function<String, ColumnFamilyOptions> function, TaskKvStateRegistry taskKvStateRegistry, TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, LocalRecoveryConfig localRecoveryConfig, RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> collection, StreamCompressionDecorator streamCompressionDecorator, CloseableRegistry closeableRegistry) {
        super(taskKvStateRegistry, typeSerializer, classLoader, i, keyGroupRange, executionConfig, ttlTimeProvider, collection, streamCompressionDecorator, closeableRegistry);
        this.writeBatchSize = RocksDBConfigurableOptions.WRITE_BATCH_SIZE.defaultValue().getBytes();
        this.operatorIdentifier = str;
        this.priorityQueueStateType = priorityQueueStateType;
        this.localRecoveryConfig = localRecoveryConfig;
        this.columnFamilyOptionsFactory = (Function) Preconditions.checkNotNull(function);
        this.optionsContainer = rocksDBResourceContainer;
        this.instanceBasePath = file;
        this.instanceRocksDBPath = new File(file, DB_INSTANCE_DIR_STRING);
        this.metricGroup = metricGroup;
        this.enableIncrementalCheckpointing = false;
        this.nativeMetricOptions = new RocksDBNativeMetricOptions();
        this.numberOfTransferingThreads = RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue().intValue();
    }

    @VisibleForTesting
    RocksDBKeyedStateBackendBuilder(String str, ClassLoader classLoader, File file, RocksDBResourceContainer rocksDBResourceContainer, Function<String, ColumnFamilyOptions> function, TaskKvStateRegistry taskKvStateRegistry, TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, LocalRecoveryConfig localRecoveryConfig, RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> collection, StreamCompressionDecorator streamCompressionDecorator, RocksDB rocksDB, ColumnFamilyHandle columnFamilyHandle, CloseableRegistry closeableRegistry) {
        this(str, classLoader, file, rocksDBResourceContainer, function, taskKvStateRegistry, typeSerializer, i, keyGroupRange, executionConfig, localRecoveryConfig, priorityQueueStateType, ttlTimeProvider, metricGroup, collection, streamCompressionDecorator, closeableRegistry);
        this.injectedTestDB = rocksDB;
        this.injectedDefaultColumnFamilyHandle = columnFamilyHandle;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RocksDBKeyedStateBackendBuilder<K> setEnableIncrementalCheckpointing(boolean z) {
        this.enableIncrementalCheckpointing = z;
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RocksDBKeyedStateBackendBuilder<K> setNativeMetricOptions(RocksDBNativeMetricOptions rocksDBNativeMetricOptions) {
        this.nativeMetricOptions = rocksDBNativeMetricOptions;
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RocksDBKeyedStateBackendBuilder<K> setNumberOfTransferingThreads(int i) {
        this.numberOfTransferingThreads = i;
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RocksDBKeyedStateBackendBuilder<K> setWriteBatchSize(long j) {
        Preconditions.checkArgument(j >= 0, "Write batch size should be non negative.");
        this.writeBatchSize = j;
        return this;
    }

    private static void checkAndCreateDirectory(File file) throws IOException {
        if (file.exists()) {
            if (!file.isDirectory()) {
                throw new IOException("Not a directory: " + file);
            }
        } else if (!file.mkdirs()) {
            throw new IOException(String.format("Could not create RocksDB data directory at %s.", file));
        }
    }

    @Override // org.apache.flink.runtime.state.StateBackendBuilder
    /* renamed from: build */
    public AbstractKeyedStateBackend build2() throws BackendBuildingException {
        RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper = null;
        ColumnFamilyHandle columnFamilyHandle = null;
        RocksDBNativeMetricMonitor rocksDBNativeMetricMonitor = null;
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> linkedHashMap = new LinkedHashMap<>();
        RocksDB rocksDB = null;
        AbstractRocksDBRestoreOperation<K> abstractRocksDBRestoreOperation = null;
        RocksDbTtlCompactFiltersManager rocksDbTtlCompactFiltersManager = new RocksDbTtlCompactFiltersManager(this.ttlTimeProvider);
        ResourceGuard resourceGuard = new ResourceGuard();
        int computeRequiredBytesInKeyGroupPrefix = RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(this.numberOfKeyGroups);
        try {
            UUID randomUUID = UUID.randomUUID();
            SortedMap<Long, Set<StateHandleID>> treeMap = new TreeMap();
            long j = -1;
            if (this.injectedTestDB != null) {
                rocksDB = this.injectedTestDB;
                columnFamilyHandle = this.injectedDefaultColumnFamilyHandle;
                rocksDBNativeMetricMonitor = this.nativeMetricOptions.isEnabled() ? new RocksDBNativeMetricMonitor(this.nativeMetricOptions, this.metricGroup, rocksDB) : null;
            } else {
                prepareDirectories();
                abstractRocksDBRestoreOperation = getRocksDBRestoreOperation(computeRequiredBytesInKeyGroupPrefix, this.cancelStreamRegistry, linkedHashMap, rocksDbTtlCompactFiltersManager);
                RocksDBRestoreResult restore = abstractRocksDBRestoreOperation.restore();
                rocksDB = restore.getDb();
                columnFamilyHandle = restore.getDefaultColumnFamilyHandle();
                rocksDBNativeMetricMonitor = restore.getNativeMetricMonitor();
                if (abstractRocksDBRestoreOperation instanceof RocksDBIncrementalRestoreOperation) {
                    randomUUID = restore.getBackendUID();
                    treeMap = restore.getRestoredSstFiles();
                    j = restore.getLastCompletedCheckpointId();
                }
            }
            rocksDBWriteBatchWrapper = new RocksDBWriteBatchWrapper(rocksDB, this.optionsContainer.getWriteOptions(), this.writeBatchSize);
            RocksDBSerializedCompositeKeyBuilder rocksDBSerializedCompositeKeyBuilder = new RocksDBSerializedCompositeKeyBuilder(this.keySerializerProvider.currentSchemaSerializer(), computeRequiredBytesInKeyGroupPrefix, 32);
            SnapshotStrategy<K> initializeSavepointAndCheckpointStrategies = initializeSavepointAndCheckpointStrategies(closeableRegistry, resourceGuard, linkedHashMap, computeRequiredBytesInKeyGroupPrefix, rocksDB, randomUUID, treeMap, j);
            PriorityQueueSetFactory initPriorityQueueFactory = initPriorityQueueFactory(computeRequiredBytesInKeyGroupPrefix, linkedHashMap, rocksDB, rocksDBWriteBatchWrapper, rocksDBNativeMetricMonitor);
            InternalKeyContextImpl internalKeyContextImpl = new InternalKeyContextImpl(this.keyGroupRange, this.numberOfKeyGroups);
            this.logger.info("Finished building RocksDB keyed state-backend at {}.", this.instanceBasePath);
            return new RocksDBKeyedStateBackend(this.userCodeClassLoader, this.instanceBasePath, this.optionsContainer, this.columnFamilyOptionsFactory, this.kvStateRegistry, this.keySerializerProvider.currentSchemaSerializer(), this.executionConfig, this.ttlTimeProvider, rocksDB, linkedHashMap, computeRequiredBytesInKeyGroupPrefix, closeableRegistry, this.keyGroupCompressionDecorator, resourceGuard, initializeSavepointAndCheckpointStrategies.checkpointSnapshotStrategy, initializeSavepointAndCheckpointStrategies.savepointSnapshotStrategy, rocksDBWriteBatchWrapper, columnFamilyHandle, rocksDBNativeMetricMonitor, rocksDBSerializedCompositeKeyBuilder, initPriorityQueueFactory, rocksDbTtlCompactFiltersManager, internalKeyContextImpl, this.writeBatchSize);
        } catch (Throwable th) {
            ArrayList arrayList = new ArrayList(linkedHashMap.values().size());
            IOUtils.closeQuietly(closeableRegistry);
            IOUtils.closeQuietly(rocksDBWriteBatchWrapper);
            RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(arrayList, columnFamilyHandle);
            IOUtils.closeQuietly(columnFamilyHandle);
            IOUtils.closeQuietly(rocksDBNativeMetricMonitor);
            for (RocksDBKeyedStateBackend.RocksDbKvStateInfo rocksDbKvStateInfo : linkedHashMap.values()) {
                RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(arrayList, rocksDbKvStateInfo.columnFamilyHandle);
                IOUtils.closeQuietly(rocksDbKvStateInfo.columnFamilyHandle);
            }
            IOUtils.closeQuietly(rocksDB);
            IOUtils.closeQuietly(abstractRocksDBRestoreOperation);
            IOUtils.closeAllQuietly(arrayList);
            IOUtils.closeQuietly(this.optionsContainer);
            rocksDbTtlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
            linkedHashMap.clear();
            try {
                FileUtils.deleteDirectory(this.instanceBasePath);
            } catch (Exception e) {
                this.logger.warn("Failed to delete base path for RocksDB: " + this.instanceBasePath, (Throwable) e);
            }
            if (th instanceof BackendBuildingException) {
                throw ((BackendBuildingException) th);
            }
            this.logger.error("Caught unexpected exception.", th);
            throw new BackendBuildingException("Caught unexpected exception.", th);
        }
    }

    private AbstractRocksDBRestoreOperation<K> getRocksDBRestoreOperation(int i, CloseableRegistry closeableRegistry, LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> linkedHashMap, RocksDbTtlCompactFiltersManager rocksDbTtlCompactFiltersManager) {
        DBOptions dbOptions = this.optionsContainer.getDbOptions();
        if (this.restoreStateHandles.isEmpty()) {
            return new RocksDBNoneRestoreOperation(this.keyGroupRange, i, this.numberOfTransferingThreads, closeableRegistry, this.userCodeClassLoader, linkedHashMap, this.keySerializerProvider, this.instanceBasePath, this.instanceRocksDBPath, dbOptions, this.columnFamilyOptionsFactory, this.nativeMetricOptions, this.metricGroup, this.restoreStateHandles, rocksDbTtlCompactFiltersManager, this.optionsContainer.getWriteBufferManagerCapacity());
        }
        KeyedStateHandle next = this.restoreStateHandles.iterator().next();
        PriorityQueueFlag priorityQueueFlag = this.priorityQueueStateType == RocksDBStateBackend.PriorityQueueStateType.HEAP ? PriorityQueueFlag.THROW_ON_PRIORITY_QUEUE : PriorityQueueFlag.RESTORE_PRIORITY_QUEUE;
        return next instanceof IncrementalKeyedStateHandle ? new RocksDBIncrementalRestoreOperation(this.operatorIdentifier, this.keyGroupRange, i, this.numberOfTransferingThreads, closeableRegistry, this.userCodeClassLoader, linkedHashMap, this.keySerializerProvider, this.instanceBasePath, this.instanceRocksDBPath, dbOptions, this.columnFamilyOptionsFactory, this.nativeMetricOptions, this.metricGroup, this.restoreStateHandles, rocksDbTtlCompactFiltersManager, this.writeBatchSize, this.optionsContainer.getWriteBufferManagerCapacity(), priorityQueueFlag) : new RocksDBFullRestoreOperation(this.keyGroupRange, i, this.numberOfTransferingThreads, closeableRegistry, this.userCodeClassLoader, linkedHashMap, this.keySerializerProvider, this.instanceBasePath, this.instanceRocksDBPath, dbOptions, this.columnFamilyOptionsFactory, this.nativeMetricOptions, this.metricGroup, this.restoreStateHandles, rocksDbTtlCompactFiltersManager, this.writeBatchSize, this.optionsContainer.getWriteBufferManagerCapacity(), priorityQueueFlag);
    }

    private SnapshotStrategy<K> initializeSavepointAndCheckpointStrategies(CloseableRegistry closeableRegistry, ResourceGuard resourceGuard, LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> linkedHashMap, int i, RocksDB rocksDB, UUID uuid, SortedMap<Long, Set<StateHandleID>> sortedMap, long j) {
        RocksDBSnapshotStrategyBase rocksFullSnapshotStrategy = new RocksFullSnapshotStrategy(rocksDB, resourceGuard, this.keySerializerProvider.currentSchemaSerializer(), linkedHashMap, this.keyGroupRange, i, this.localRecoveryConfig, closeableRegistry, this.keyGroupCompressionDecorator);
        return new SnapshotStrategy<>(this.enableIncrementalCheckpointing ? new RocksIncrementalSnapshotStrategy(rocksDB, resourceGuard, this.keySerializerProvider.currentSchemaSerializer(), linkedHashMap, this.keyGroupRange, i, this.localRecoveryConfig, closeableRegistry, this.instanceBasePath, uuid, sortedMap, j, this.numberOfTransferingThreads) : rocksFullSnapshotStrategy, rocksFullSnapshotStrategy);
    }

    private PriorityQueueSetFactory initPriorityQueueFactory(int i, Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> map, RocksDB rocksDB, RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper, RocksDBNativeMetricMonitor rocksDBNativeMetricMonitor) {
        PriorityQueueSetFactory rocksDBPriorityQueueSetFactory;
        switch (this.priorityQueueStateType) {
            case HEAP:
                rocksDBPriorityQueueSetFactory = new HeapPriorityQueueSetFactory(this.keyGroupRange, this.numberOfKeyGroups, 128);
                break;
            case ROCKSDB:
                rocksDBPriorityQueueSetFactory = new RocksDBPriorityQueueSetFactory(this.keyGroupRange, i, this.numberOfKeyGroups, map, rocksDB, this.optionsContainer.getReadOptions(), rocksDBWriteBatchWrapper, rocksDBNativeMetricMonitor, this.columnFamilyOptionsFactory, this.optionsContainer.getWriteBufferManagerCapacity());
                break;
            default:
                throw new IllegalArgumentException("Unknown priority queue state type: " + this.priorityQueueStateType);
        }
        return rocksDBPriorityQueueSetFactory;
    }

    private void prepareDirectories() throws IOException {
        checkAndCreateDirectory(this.instanceBasePath);
        if (this.instanceRocksDBPath.exists()) {
            FileUtils.deleteDirectory(this.instanceBasePath);
        }
    }
}
