package org.apache.flink.runtime.state.heap;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.UUID;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/SpillableStateBackend.class */
public class SpillableStateBackend extends AbstractStateBackend implements ConfigurableStateBackend {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SpillableStateBackend.class);
    private Configuration configuration;
    private transient boolean isInitialized;

    @Nullable
    private File[] localDirectories;
    private transient File[] initializedBasePaths;

    public SpillableStateBackend() {
        this.configuration = new Configuration();
    }

    private SpillableStateBackend(SpillableStateBackend spillableStateBackend, ReadableConfig readableConfig, ClassLoader classLoader) {
        this.configuration = new Configuration();
        this.configuration = new Configuration();
        this.configuration.addAll(SpillableOptions.filter(readableConfig));
        this.configuration.addAll(spillableStateBackend.configuration);
        this.latencyTrackingConfigBuilder = spillableStateBackend.latencyTrackingConfigBuilder.configure(readableConfig);
    }

    @Override // org.apache.flink.runtime.state.ConfigurableStateBackend
    public SpillableStateBackend configure(ReadableConfig readableConfig, ClassLoader classLoader) {
        return new SpillableStateBackend(this, readableConfig, classLoader);
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend, org.apache.flink.runtime.state.StateBackend
    public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment environment, JobID jobID, String str, TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, TaskKvStateRegistry taskKvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> collection, CloseableRegistry closeableRegistry) throws IOException {
        initHeapStatusMonitor();
        lazyInitializeForJob(environment, str);
        LocalRecoveryConfig createLocalRecoveryConfig = environment.getTaskStateManager().createLocalRecoveryConfig();
        HeapPriorityQueueSetFactory heapPriorityQueueSetFactory = new HeapPriorityQueueSetFactory(keyGroupRange, i, 128);
        return new SpillableKeyedStateBackendBuilder(taskKvStateRegistry, typeSerializer, environment.getUserCodeClassLoader().asClassLoader(), i, keyGroupRange, environment.getExecutionConfig(), ttlTimeProvider, this.latencyTrackingConfigBuilder.setMetricGroup(metricGroup).build(), collection, AbstractStateBackend.getCompressionDecorator(environment.getExecutionConfig()), createLocalRecoveryConfig, heapPriorityQueueSetFactory, closeableRegistry, this.configuration, this.initializedBasePaths).build2();
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend, org.apache.flink.runtime.state.StateBackend
    public OperatorStateBackend createOperatorStateBackend(Environment environment, String str, @Nonnull Collection<OperatorStateHandle> collection, CloseableRegistry closeableRegistry) throws Exception {
        return new DefaultOperatorStateBackendBuilder(environment.getUserCodeClassLoader().asClassLoader(), environment.getExecutionConfig(), true, collection, closeableRegistry).build2();
    }

    @Override // org.apache.flink.runtime.state.StateBackend
    public boolean supportsNoClaimRestoreMode() {
        return true;
    }

    @Override // org.apache.flink.runtime.state.StateBackend
    public boolean supportsSavepointFormat(SavepointFormatType savepointFormatType) {
        return true;
    }

    public void setDbStoragePaths(String... strArr) {
        String str;
        if (strArr == null) {
            this.localDirectories = null;
            return;
        }
        if (strArr.length == 0) {
            throw new IllegalArgumentException("empty paths");
        }
        File[] fileArr = new File[strArr.length];
        for (int i = 0; i < strArr.length; i++) {
            String str2 = strArr[i];
            if (str2 == null) {
                throw new IllegalArgumentException("null path");
            }
            URI uri = null;
            try {
                uri = new Path(str2).toUri();
            } catch (Exception e) {
            }
            if (uri == null || uri.getScheme() == null) {
                str = str2;
            } else {
                if (!"file".equalsIgnoreCase(uri.getScheme())) {
                    throw new IllegalArgumentException("Path " + str2 + " has a non-local scheme");
                }
                str = uri.getPath();
            }
            fileArr[i] = new File(str);
            if (!fileArr[i].isAbsolute()) {
                throw new IllegalArgumentException("Relative paths are not supported");
            }
        }
        this.localDirectories = fileArr;
    }

    public String[] getDbStoragePaths() {
        if (this.localDirectories == null) {
            return null;
        }
        String[] strArr = new String[this.localDirectories.length];
        for (int i = 0; i < strArr.length; i++) {
            strArr[i] = this.localDirectories[i].toString();
        }
        return strArr;
    }

    private void lazyInitializeForJob(Environment environment, String str) throws IOException {
        if (this.isInitialized) {
            return;
        }
        if (this.localDirectories == null) {
            String replaceAll = str.replaceAll("[^a-zA-Z0-9\\-]", Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA);
            this.initializedBasePaths = (File[]) Stream.of((Object[]) environment.getIOManager().getSpillingDirectories()).map(file -> {
                return new File(file.getAbsolutePath(), replaceAll);
            }).toArray(i -> {
                return new File[i];
            });
        } else {
            ArrayList arrayList = new ArrayList(this.localDirectories.length);
            StringBuilder sb = new StringBuilder();
            for (File file2 : this.localDirectories) {
                File file3 = new File(file2, UUID.randomUUID().toString());
                if (file3.mkdirs()) {
                    arrayList.add(file2);
                    if (file3.delete()) {
                        LOG.error("Created local files directory '" + file2 + "' cannot be deleted. ");
                    }
                } else {
                    String str2 = "Local files directory '" + file2 + "' does not exist and cannot be created. ";
                    LOG.error(str2);
                    sb.append(str2);
                }
            }
            if (arrayList.isEmpty()) {
                throw new IOException("No local storage directories available. " + ((Object) sb));
            }
            this.initializedBasePaths = (File[]) arrayList.toArray(new File[0]);
        }
        this.isInitialized = true;
    }

    private void initHeapStatusMonitor() {
        long millis = ((Duration) this.configuration.get(SpillableOptions.HEAP_STATUS_CHECK_INTERVAL)).toMillis();
        Preconditions.checkArgument(millis > 0, "Heap status check interval should be larger than 0.");
        HeapStatusMonitor.initStatusMonitor(millis);
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend, org.apache.flink.runtime.state.StateBackend
    public /* bridge */ /* synthetic */ CheckpointableKeyedStateBackend createKeyedStateBackend(Environment environment, JobID jobID, String str, TypeSerializer typeSerializer, int i, KeyGroupRange keyGroupRange, TaskKvStateRegistry taskKvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection collection, CloseableRegistry closeableRegistry) throws Exception {
        return createKeyedStateBackend(environment, jobID, str, typeSerializer, i, keyGroupRange, taskKvStateRegistry, ttlTimeProvider, metricGroup, (Collection<KeyedStateHandle>) collection, closeableRegistry);
    }
}
