package org.apache.flink.runtime.state.heap;

import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.AsyncSnapshotCallable;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.SnapshotExecutionType;
import org.apache.flink.runtime.state.SnapshotResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
import org.apache.flink.runtime.state.StateObject;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/SpillableSnapshotStrategyRunner.class */
public class SpillableSnapshotStrategyRunner<T extends StateObject, SR extends SnapshotResources> extends SnapshotStrategyRunner<T, SR> {

    @NotNull
    private final CheckpointManager<T> checkpointManager;

    public SpillableSnapshotStrategyRunner(@NotNull String str, @NotNull SnapshotStrategy<T, SR> snapshotStrategy, @NotNull CloseableRegistry closeableRegistry, @NotNull SnapshotExecutionType snapshotExecutionType, @NotNull CheckpointManager<T> checkpointManager) {
        super(str, snapshotStrategy, closeableRegistry, snapshotExecutionType);
        this.checkpointManager = checkpointManager;
    }

    @Override // org.apache.flink.runtime.state.SnapshotStrategyRunner
    @NotNull
    public RunnableFuture<SnapshotResult<T>> snapshot(final long j, long j2, @Nonnull final CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        final SR syncPrepareResources = this.snapshotStrategy.syncPrepareResources(j);
        logCompletedInternal("{} ({}, synchronous part) in thread {} took {} ms.", checkpointStreamFactory, currentTimeMillis);
        final SnapshotStrategy.SnapshotResultSupplier<T> asyncSnapshot = this.snapshotStrategy.asyncSnapshot(syncPrepareResources, j, j2, checkpointStreamFactory, checkpointOptions);
        AsyncSnapshotCallable<SnapshotResult<T>>.AsyncSnapshotTask asyncSnapshotFutureTask = new AsyncSnapshotCallable<SnapshotResult<T>>() { // from class: org.apache.flink.runtime.state.heap.SpillableSnapshotStrategyRunner.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.flink.runtime.state.AsyncSnapshotCallable
            public SnapshotResult<T> callInternal() throws Exception {
                return asyncSnapshot.get(this.snapshotCloseableRegistry);
            }

            @Override // org.apache.flink.runtime.state.AsyncSnapshotCallable
            protected void cleanupProvidedResources() {
                SpillableSnapshotStrategyRunner.this.checkpointManager.unRegisterCheckpoint(j);
                if (syncPrepareResources != null) {
                    syncPrepareResources.release();
                }
            }

            @Override // org.apache.flink.runtime.state.AsyncSnapshotCallable
            protected void logAsyncSnapshotComplete(long j3) {
                SpillableSnapshotStrategyRunner.this.logCompletedInternal("{} ({}, asynchronous part) in thread {} took {} ms.", checkpointStreamFactory, j3);
            }
        }.toAsyncSnapshotFutureTask(this.cancelStreamRegistry);
        this.checkpointManager.registerCheckpoint(j, asyncSnapshotFutureTask);
        if (this.executionType == SnapshotExecutionType.SYNCHRONOUS) {
            asyncSnapshotFutureTask.run();
        }
        return asyncSnapshotFutureTask;
    }

    @Override // org.apache.flink.runtime.state.SnapshotStrategyRunner
    public String toString() {
        return "SpillableSnapshotStrategy {" + this.description + "}";
    }
}
