package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/PendingCheckpoint.class */
public class PendingCheckpoint {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
    private final Object lock = new Object();
    private final JobID jobId;
    private final long checkpointId;
    private final long checkpointTimestamp;
    private final Map<OperatorID, OperatorState> operatorStates;
    private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;
    private final List<MasterState> masterState;
    private final Set<ExecutionAttemptID> acknowledgedTasks;
    private final CheckpointProperties props;
    private final CheckpointStorageLocation targetLocation;
    private final CompletableFuture<CompletedCheckpoint> onCompletionPromise;
    private final Executor executor;
    private int numAcknowledgedTasks;
    private boolean discarded;

    @Nullable
    private PendingCheckpointStats statsCallback;
    private volatile ScheduledFuture<?> cancellerHandle;

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/PendingCheckpoint$TaskAcknowledgeResult.class */
    public enum TaskAcknowledgeResult {
        SUCCESS,
        DUPLICATE,
        UNKNOWN,
        DISCARDED
    }

    public PendingCheckpoint(JobID jobID, long j, long j2, Map<ExecutionAttemptID, ExecutionVertex> map, CheckpointProperties checkpointProperties, CheckpointStorageLocation checkpointStorageLocation, Executor executor) {
        Preconditions.checkArgument(map.size() > 0, "Checkpoint needs at least one vertex that commits the checkpoint");
        this.jobId = (JobID) Preconditions.checkNotNull(jobID);
        this.checkpointId = j;
        this.checkpointTimestamp = j2;
        this.notYetAcknowledgedTasks = (Map) Preconditions.checkNotNull(map);
        this.props = (CheckpointProperties) Preconditions.checkNotNull(checkpointProperties);
        this.targetLocation = (CheckpointStorageLocation) Preconditions.checkNotNull(checkpointStorageLocation);
        this.executor = (Executor) Preconditions.checkNotNull(executor);
        this.operatorStates = new HashMap();
        this.masterState = new ArrayList();
        this.acknowledgedTasks = new HashSet(map.size());
        this.onCompletionPromise = new CompletableFuture<>();
    }

    public JobID getJobId() {
        return this.jobId;
    }

    public long getCheckpointId() {
        return this.checkpointId;
    }

    public long getCheckpointTimestamp() {
        return this.checkpointTimestamp;
    }

    public int getNumberOfNonAcknowledgedTasks() {
        return this.notYetAcknowledgedTasks.size();
    }

    public int getNumberOfAcknowledgedTasks() {
        return this.numAcknowledgedTasks;
    }

    public Map<OperatorID, OperatorState> getOperatorStates() {
        return this.operatorStates;
    }

    public boolean isFullyAcknowledged() {
        return this.notYetAcknowledgedTasks.isEmpty() && !this.discarded;
    }

    public boolean isAcknowledgedBy(ExecutionAttemptID executionAttemptID) {
        return !this.notYetAcknowledgedTasks.containsKey(executionAttemptID);
    }

    public boolean isDiscarded() {
        return this.discarded;
    }

    public boolean canBeSubsumed() {
        return !this.props.forceCheckpoint();
    }

    CheckpointProperties getProps() {
        return this.props;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setStatsCallback(@Nullable PendingCheckpointStats pendingCheckpointStats) {
        this.statsCallback = pendingCheckpointStats;
    }

    public boolean setCancellerHandle(ScheduledFuture<?> scheduledFuture) {
        synchronized (this.lock) {
            if (this.cancellerHandle != null) {
                throw new IllegalStateException("A canceller handle was already set");
            }
            if (this.discarded) {
                return false;
            }
            this.cancellerHandle = scheduledFuture;
            return true;
        }
    }

    public CompletableFuture<CompletedCheckpoint> getCompletionFuture() {
        return this.onCompletionPromise;
    }

    /* JADX WARN: Type inference failed for: r0v14, types: [java.io.OutputStream, org.apache.flink.runtime.state.CheckpointMetadataOutputStream] */
    public CompletedCheckpoint finalizeCheckpoint() throws IOException {
        CompletedCheckpoint completedCheckpoint;
        synchronized (this.lock) {
            Preconditions.checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
            try {
                SavepointV2 savepointV2 = new SavepointV2(this.checkpointId, this.operatorStates.values(), this.masterState);
                ?? createMetadataOutputStream = this.targetLocation.createMetadataOutputStream();
                Throwable th = null;
                try {
                    try {
                        Checkpoints.storeCheckpointMetadata(savepointV2, (OutputStream) createMetadataOutputStream);
                        CompletedCheckpointStorageLocation closeAndFinalizeCheckpoint = createMetadataOutputStream.closeAndFinalizeCheckpoint();
                        if (createMetadataOutputStream != 0) {
                            if (0 != 0) {
                                try {
                                    createMetadataOutputStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                createMetadataOutputStream.close();
                            }
                        }
                        completedCheckpoint = new CompletedCheckpoint(this.jobId, this.checkpointId, this.checkpointTimestamp, System.currentTimeMillis(), this.operatorStates, this.masterState, this.props, closeAndFinalizeCheckpoint);
                        this.onCompletionPromise.complete(completedCheckpoint);
                        PendingCheckpointStats pendingCheckpointStats = this.statsCallback;
                        if (pendingCheckpointStats != null) {
                            completedCheckpoint.setDiscardCallback(pendingCheckpointStats.reportCompletedCheckpoint(closeAndFinalizeCheckpoint.getExternalPointer()));
                        }
                        dispose(false);
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (createMetadataOutputStream != 0) {
                        if (th != null) {
                            try {
                                createMetadataOutputStream.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            createMetadataOutputStream.close();
                        }
                    }
                    throw th3;
                }
            } catch (Throwable th5) {
                this.onCompletionPromise.completeExceptionally(th5);
                ExceptionUtils.rethrowIOException(th5);
                return null;
            }
        }
        return completedCheckpoint;
    }

    public TaskAcknowledgeResult acknowledgeTask(ExecutionAttemptID executionAttemptID, TaskStateSnapshot taskStateSnapshot, CheckpointMetrics checkpointMetrics) {
        synchronized (this.lock) {
            if (this.discarded) {
                return TaskAcknowledgeResult.DISCARDED;
            }
            ExecutionVertex remove = this.notYetAcknowledgedTasks.remove(executionAttemptID);
            if (remove == null) {
                if (this.acknowledgedTasks.contains(executionAttemptID)) {
                    return TaskAcknowledgeResult.DUPLICATE;
                }
                return TaskAcknowledgeResult.UNKNOWN;
            }
            this.acknowledgedTasks.add(executionAttemptID);
            List<OperatorID> operatorIDs = remove.getJobVertex().getOperatorIDs();
            int parallelSubtaskIndex = remove.getParallelSubtaskIndex();
            long currentTimeMillis = System.currentTimeMillis();
            long j = 0;
            if (taskStateSnapshot != null) {
                for (OperatorID operatorID : operatorIDs) {
                    OperatorSubtaskState subtaskStateByOperatorID = taskStateSnapshot.getSubtaskStateByOperatorID(operatorID);
                    if (subtaskStateByOperatorID == null) {
                        subtaskStateByOperatorID = new OperatorSubtaskState();
                    }
                    OperatorState operatorState = this.operatorStates.get(operatorID);
                    if (operatorState == null) {
                        operatorState = new OperatorState(operatorID, remove.getTotalNumberOfParallelSubtasks(), remove.getMaxParallelism());
                        this.operatorStates.put(operatorID, operatorState);
                    }
                    operatorState.putState(parallelSubtaskIndex, subtaskStateByOperatorID);
                    j += subtaskStateByOperatorID.getStateSize();
                }
            }
            this.numAcknowledgedTasks++;
            PendingCheckpointStats pendingCheckpointStats = this.statsCallback;
            if (pendingCheckpointStats != null) {
                pendingCheckpointStats.reportSubtaskStats(remove.getJobvertexId(), new SubtaskStateStats(parallelSubtaskIndex, currentTimeMillis, j, checkpointMetrics.getSyncDurationMillis(), checkpointMetrics.getAsyncDurationMillis(), checkpointMetrics.getBytesBufferedInAlignment(), checkpointMetrics.getAlignmentDurationNanos() / 1000000));
            }
            return TaskAcknowledgeResult.SUCCESS;
        }
    }

    public void addMasterState(MasterState masterState) {
        Preconditions.checkNotNull(masterState);
        synchronized (this.lock) {
            if (!this.discarded) {
                this.masterState.add(masterState);
            }
        }
    }

    public void abortExpired() {
        try {
            Exception exc = new Exception("Checkpoint expired before completing");
            this.onCompletionPromise.completeExceptionally(exc);
            reportFailedCheckpoint(exc);
        } finally {
            dispose(true);
        }
    }

    public void abortSubsumed() {
        try {
            Exception exc = new Exception("Checkpoints has been subsumed");
            this.onCompletionPromise.completeExceptionally(exc);
            reportFailedCheckpoint(exc);
            if (this.props.forceCheckpoint()) {
                throw new IllegalStateException("Bug: forced checkpoints must never be subsumed");
            }
        } finally {
            dispose(true);
        }
    }

    public void abortDeclined() {
        abortWithCause(new Exception("Checkpoint was declined (tasks not ready)"));
    }

    public void abortError(@Nonnull Throwable th) {
        abortWithCause(new Exception("Checkpoint failed: " + th.getMessage(), th));
    }

    private void abortWithCause(@Nonnull Exception exc) {
        try {
            this.onCompletionPromise.completeExceptionally(exc);
            reportFailedCheckpoint(exc);
        } finally {
            dispose(true);
        }
    }

    private void dispose(boolean z) {
        synchronized (this.lock) {
            try {
                this.numAcknowledgedTasks = -1;
                if (!this.discarded && z) {
                    this.executor.execute(new Runnable() { // from class: org.apache.flink.runtime.checkpoint.PendingCheckpoint.1
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                StateUtil.bestEffortDiscardAllStateObjects(PendingCheckpoint.this.operatorStates.values());
                                PendingCheckpoint.this.targetLocation.disposeOnFailure();
                            } catch (Throwable th) {
                                PendingCheckpoint.LOG.warn("Could not properly dispose the private states in the pending checkpoint {} of job {}.", new Object[]{Long.valueOf(PendingCheckpoint.this.checkpointId), PendingCheckpoint.this.jobId, th});
                            } finally {
                                PendingCheckpoint.this.operatorStates.clear();
                            }
                        }
                    });
                }
                this.discarded = true;
                this.notYetAcknowledgedTasks.clear();
                this.acknowledgedTasks.clear();
                cancelCanceller();
            } catch (Throwable th) {
                this.discarded = true;
                this.notYetAcknowledgedTasks.clear();
                this.acknowledgedTasks.clear();
                cancelCanceller();
                throw th;
            }
        }
    }

    private void cancelCanceller() {
        try {
            ScheduledFuture<?> scheduledFuture = this.cancellerHandle;
            if (scheduledFuture != null) {
                scheduledFuture.cancel(false);
            }
        } catch (Exception e) {
            LOG.warn("Error while cancelling checkpoint timeout task", e);
        }
    }

    private void reportFailedCheckpoint(Exception exc) {
        PendingCheckpointStats pendingCheckpointStats = this.statsCallback;
        if (pendingCheckpointStats != null) {
            pendingCheckpointStats.reportFailedCheckpoint(System.currentTimeMillis(), exc);
        }
    }

    public String toString() {
        return String.format("Pending Checkpoint %d @ %d - confirmed=%d, pending=%d", Long.valueOf(this.checkpointId), Long.valueOf(this.checkpointTimestamp), Integer.valueOf(getNumberOfAcknowledgedTasks()), Integer.valueOf(getNumberOfNonAcknowledgedTasks()));
    }
}
