package org.apache.flink.runtime.checkpoint;

import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.class */
public class CheckpointSubsumeHelper {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointSubsumeHelper.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper$SubsumeAction.class */
    public interface SubsumeAction {
        void subsume(CompletedCheckpoint completedCheckpoint) throws Exception;
    }

    CheckpointSubsumeHelper() {
    }

    public static void subsume(Deque<CompletedCheckpoint> deque, int i, SubsumeAction subsumeAction) throws Exception {
        if (deque.isEmpty() || deque.size() <= i) {
            return;
        }
        CompletedCheckpoint peekLast = deque.peekLast();
        Optional<CompletedCheckpoint> latestNotSavepoint = getLatestNotSavepoint(deque);
        Iterator<CompletedCheckpoint> it = deque.iterator();
        Iterator<CompletedCheckpoint> descendingIterator = deque.descendingIterator();
        HashSet hashSet = new HashSet();
        for (int i2 = 0; i2 < i; i2++) {
            hashSet.addAll((Collection) getAllMaterializedHandles(descendingIterator.next()).stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList()));
        }
        while (deque.size() > i && it.hasNext()) {
            CompletedCheckpoint next = it.next();
            if (canSubsume(next, peekLast, latestNotSavepoint)) {
                it.remove();
                try {
                    getAllMaterializedHandles(next).forEach(list -> {
                        list.removeAll(hashSet);
                    });
                    subsumeAction.subsume(next);
                } catch (Exception e) {
                    LOG.warn("Fail to subsume the old checkpoint.", e);
                }
            }
        }
    }

    private static List<List<KeyedStateHandle>> getAllMaterializedHandles(CompletedCheckpoint completedCheckpoint) {
        return (List) completedCheckpoint.getOperatorStates().values().stream().flatMap(operatorState -> {
            return operatorState.getSubtaskStates().values().stream();
        }).flatMap(operatorSubtaskState -> {
            return operatorSubtaskState.getManagedKeyedState().stream();
        }).filter(keyedStateHandle -> {
            return keyedStateHandle instanceof ChangelogStateBackendHandle;
        }).map(keyedStateHandle2 -> {
            return ((ChangelogStateBackendHandle) keyedStateHandle2).getMaterializedStateHandles();
        }).collect(Collectors.toList());
    }

    private static Optional<CompletedCheckpoint> getLatestNotSavepoint(Deque<CompletedCheckpoint> deque) {
        Iterator<CompletedCheckpoint> descendingIterator = deque.descendingIterator();
        while (descendingIterator.hasNext()) {
            CompletedCheckpoint next = descendingIterator.next();
            if (!next.getProperties().isSavepoint()) {
                return Optional.of(next);
            }
        }
        return Optional.empty();
    }

    private static boolean canSubsume(CompletedCheckpoint completedCheckpoint, CompletedCheckpoint completedCheckpoint2, Optional<CompletedCheckpoint> optional) {
        if (completedCheckpoint == completedCheckpoint2) {
            return false;
        }
        if (completedCheckpoint.getProperties().isSavepoint() || completedCheckpoint2.getProperties().isSynchronous()) {
            return true;
        }
        return optional.filter(completedCheckpoint3 -> {
            return completedCheckpoint3 != completedCheckpoint;
        }).isPresent();
    }
}
