package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.CloseableRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/AsyncSnapshotCallable.class */
public abstract class AsyncSnapshotCallable<T> implements Callable<T> {
    private static final String CANCELLATION_EXCEPTION_MSG = "Async snapshot was cancelled.";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AsyncSnapshotCallable.class);

    @Nonnull
    private final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry();

    @Nonnull
    private final AtomicBoolean resourceCleanupOwnershipTaken = new AtomicBoolean(false);

    /* loaded from: input_file:org/apache/flink/runtime/state/AsyncSnapshotCallable$AsyncSnapshotTask.class */
    public class AsyncSnapshotTask extends FutureTask<T> {

        @Nonnull
        private final CloseableRegistry taskRegistry;

        @Nonnull
        private final Closeable cancelOnClose;

        private AsyncSnapshotTask(@Nonnull CloseableRegistry closeableRegistry) throws IOException {
            super(AsyncSnapshotCallable.this);
            this.cancelOnClose = () -> {
                cancel(true);
            };
            this.taskRegistry = closeableRegistry;
            closeableRegistry.registerCloseable(this.cancelOnClose);
        }

        @Override // java.util.concurrent.FutureTask, java.util.concurrent.Future
        public boolean cancel(boolean z) {
            boolean cancel = super.cancel(z);
            if (z) {
                AsyncSnapshotCallable.this.cancel();
            }
            return cancel;
        }

        @Override // java.util.concurrent.FutureTask
        protected void done() {
            super.done();
            this.taskRegistry.unregisterCloseable(this.cancelOnClose);
        }
    }

    @Override // java.util.concurrent.Callable
    public T call() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            if (this.resourceCleanupOwnershipTaken.compareAndSet(false, true)) {
                try {
                    T callInternal = callInternal();
                    logAsyncSnapshotComplete(currentTimeMillis);
                    closeSnapshotIO();
                    cleanup();
                    return callInternal;
                } catch (Exception e) {
                    if (!this.snapshotCloseableRegistry.isClosed()) {
                        throw e;
                    }
                    closeSnapshotIO();
                    cleanup();
                }
            }
            throw new CancellationException(CANCELLATION_EXCEPTION_MSG);
        } catch (Throwable th) {
            closeSnapshotIO();
            cleanup();
            throw th;
        }
    }

    @VisibleForTesting
    protected void cancel() {
        closeSnapshotIO();
        if (this.resourceCleanupOwnershipTaken.compareAndSet(false, true)) {
            cleanup();
        }
    }

    public AsyncSnapshotCallable<T>.AsyncSnapshotTask toAsyncSnapshotFutureTask(@Nonnull CloseableRegistry closeableRegistry) throws IOException {
        return new AsyncSnapshotTask(closeableRegistry);
    }

    protected abstract T callInternal() throws Exception;

    protected abstract void cleanupProvidedResources();

    protected void logAsyncSnapshotComplete(long j) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerCloseableForCancellation(@Nullable Closeable closeable) throws IOException {
        this.snapshotCloseableRegistry.registerCloseable(closeable);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean unregisterCloseableFromCancellation(@Nullable Closeable closeable) {
        return this.snapshotCloseableRegistry.unregisterCloseable(closeable);
    }

    private void cleanup() {
        cleanupProvidedResources();
    }

    private void closeSnapshotIO() {
        try {
            this.snapshotCloseableRegistry.close();
        } catch (IOException e) {
            LOG.warn("Could not properly close incremental snapshot streams.", (Throwable) e);
        }
    }
}
