package org.apache.flink.streaming.runtime.io.checkpointing;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker.class */
public class CheckpointBarrierTracker extends CheckpointBarrierHandler {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointBarrierTracker.class);
    private static final int MAX_CHECKPOINTS_TO_TRACK = 50;
    private final CheckpointableInputs inputs;
    private int numOpenChannels;
    private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
    private long latestPendingCheckpointID;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker$CheckpointBarrierCount.class */
    public static final class CheckpointBarrierCount {
        private final long checkpointId;
        private final int targetChannelCount;
        private final Set<InputChannelInfo> alignedChannels;

        @Nullable
        private final CheckpointBarrier pendingCheckpoint;
        private boolean aborted;

        CheckpointBarrierCount(CheckpointBarrier checkpointBarrier, InputChannelInfo inputChannelInfo, int i) {
            this.alignedChannels = new HashSet();
            Preconditions.checkNotNull(checkpointBarrier);
            this.checkpointId = checkpointBarrier.getId();
            this.pendingCheckpoint = checkpointBarrier;
            this.alignedChannels.add(inputChannelInfo);
            this.targetChannelCount = i;
        }

        CheckpointBarrierCount(long j, InputChannelInfo inputChannelInfo, int i) {
            this.alignedChannels = new HashSet();
            this.checkpointId = j;
            this.pendingCheckpoint = null;
            this.alignedChannels.add(inputChannelInfo);
            this.targetChannelCount = i;
        }

        public long checkpointId() {
            return this.checkpointId;
        }

        @Nullable
        public CheckpointBarrier getPendingCheckpoint() {
            return this.pendingCheckpoint;
        }

        public int getTargetChannelCount() {
            return this.targetChannelCount;
        }

        public int markChannelAligned(InputChannelInfo inputChannelInfo) {
            this.alignedChannels.add(inputChannelInfo);
            return this.alignedChannels.size();
        }

        public boolean isAborted() {
            return this.aborted;
        }

        public boolean markAborted() {
            boolean z = !this.aborted;
            this.aborted = true;
            return z;
        }

        public String toString() {
            return isAborted() ? String.format("checkpointID=%d - ABORTED", Long.valueOf(this.checkpointId)) : String.format("checkpointID=%d, count=%d, targetChannelCount=%d", Long.valueOf(this.checkpointId), Integer.valueOf(this.alignedChannels.size()), Integer.valueOf(this.targetChannelCount));
        }
    }

    public CheckpointBarrierTracker(CheckpointableInputs checkpointableInputs, CheckpointableTask checkpointableTask, Clock clock, boolean z) {
        super(checkpointableTask, clock, z);
        this.latestPendingCheckpointID = -1L;
        this.inputs = checkpointableInputs;
        this.numOpenChannels = Arrays.stream(checkpointableInputs.get()).mapToInt((v0) -> {
            return v0.getNumberOfInputChannels();
        }).sum();
        this.pendingCheckpoints = new ArrayDeque<>();
    }

    @Override // org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler
    public void processBarrier(CheckpointBarrier checkpointBarrier, InputChannelInfo inputChannelInfo, boolean z) throws IOException {
        long id = checkpointBarrier.getId();
        if (checkpointBarrier.getId() > this.latestPendingCheckpointID && this.numOpenChannels == 1) {
            markAlignmentStartAndEnd(id, checkpointBarrier.getTimestamp());
            notifyCheckpoint(checkpointBarrier);
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received barrier for checkpoint {} from channel {}", Long.valueOf(id), inputChannelInfo);
        }
        CheckpointBarrierCount checkpointBarrierCount = null;
        int i = 0;
        Iterator<CheckpointBarrierCount> it = this.pendingCheckpoints.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            CheckpointBarrierCount next = it.next();
            if (next.checkpointId() == id) {
                checkpointBarrierCount = next;
                break;
            }
            i++;
        }
        if (checkpointBarrierCount == null) {
            if (id > this.latestPendingCheckpointID) {
                markAlignmentStart(id, checkpointBarrier.getTimestamp());
                this.latestPendingCheckpointID = id;
                this.pendingCheckpoints.addLast(new CheckpointBarrierCount(checkpointBarrier, inputChannelInfo, this.numOpenChannels));
                if (this.pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {
                    this.pendingCheckpoints.pollFirst();
                    return;
                }
                return;
            }
            return;
        }
        if (checkpointBarrierCount.markChannelAligned(inputChannelInfo) == checkpointBarrierCount.getTargetChannelCount()) {
            for (int i2 = 0; i2 <= i; i2++) {
                this.pendingCheckpoints.pollFirst();
            }
            if (checkpointBarrierCount.isAborted()) {
                return;
            }
            triggerCheckpointOnAligned(checkpointBarrierCount);
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler
    public void processBarrierAnnouncement(CheckpointBarrier checkpointBarrier, int i, InputChannelInfo inputChannelInfo) throws IOException {
    }

    @Override // org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler
    public void processCancellationBarrier(CancelCheckpointMarker cancelCheckpointMarker, InputChannelInfo inputChannelInfo) throws IOException {
        CheckpointBarrierCount peekFirst;
        long checkpointId = cancelCheckpointMarker.getCheckpointId();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received cancellation barrier for checkpoint {}", Long.valueOf(checkpointId));
        }
        if (cancelCheckpointMarker.getCheckpointId() > this.latestPendingCheckpointID && this.numOpenChannels == 1) {
            resetAlignment();
            notifyAbortOnCancellationBarrier(checkpointId);
            return;
        }
        while (true) {
            peekFirst = this.pendingCheckpoints.peekFirst();
            if (peekFirst == null || peekFirst.checkpointId() >= checkpointId) {
                break;
            }
            this.pendingCheckpoints.removeFirst();
            if (peekFirst.markAborted()) {
                notifyAbortOnCancellationBarrier(peekFirst.checkpointId());
            }
        }
        if (peekFirst != null && peekFirst.checkpointId() == checkpointId) {
            if (peekFirst.markAborted()) {
                notifyAbortOnCancellationBarrier(checkpointId);
            }
            if (peekFirst.markChannelAligned(inputChannelInfo) == peekFirst.getTargetChannelCount()) {
                this.pendingCheckpoints.removeFirst();
                return;
            }
            return;
        }
        if (checkpointId > this.latestPendingCheckpointID) {
            notifyAbortOnCancellationBarrier(checkpointId);
            this.latestPendingCheckpointID = checkpointId;
            CheckpointBarrierCount checkpointBarrierCount = new CheckpointBarrierCount(cancelCheckpointMarker.getCheckpointId(), inputChannelInfo, this.numOpenChannels);
            checkpointBarrierCount.markAborted();
            this.pendingCheckpoints.addFirst(checkpointBarrierCount);
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler
    public void processEndOfPartition(InputChannelInfo inputChannelInfo) throws IOException {
        this.numOpenChannels--;
        if (isCheckpointAfterTasksFinishedEnabled()) {
            checkAlignmentOnEndOfPartitionIfEnabled(inputChannelInfo);
            return;
        }
        while (!this.pendingCheckpoints.isEmpty()) {
            CheckpointBarrierCount removeFirst = this.pendingCheckpoints.removeFirst();
            if (removeFirst.markAborted()) {
                notifyAbort(removeFirst.checkpointId(), new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
            }
        }
        resetAlignment();
    }

    private void checkAlignmentOnEndOfPartitionIfEnabled(InputChannelInfo inputChannelInfo) throws IOException {
        CheckpointBarrierCount checkpointBarrierCount = null;
        int size = this.pendingCheckpoints.size();
        Iterator<CheckpointBarrierCount> descendingIterator = this.pendingCheckpoints.descendingIterator();
        while (true) {
            if (!descendingIterator.hasNext()) {
                break;
            }
            CheckpointBarrierCount next = descendingIterator.next();
            size--;
            if (next.markChannelAligned(inputChannelInfo) == next.getTargetChannelCount()) {
                checkpointBarrierCount = next;
                break;
            }
        }
        if (checkpointBarrierCount != null) {
            for (int i = 0; i <= size; i++) {
                this.pendingCheckpoints.pollFirst();
            }
            if (checkpointBarrierCount.isAborted()) {
                return;
            }
            triggerCheckpointOnAligned(checkpointBarrierCount);
        }
    }

    private void triggerCheckpointOnAligned(CheckpointBarrierCount checkpointBarrierCount) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("All the channels are aligned for checkpoint {}", Long.valueOf(checkpointBarrierCount.checkpointId()));
        }
        if (checkpointBarrierCount.checkpointId == this.latestPendingCheckpointID) {
            markAlignmentEnd();
        }
        Preconditions.checkState(checkpointBarrierCount.getPendingCheckpoint() != null, "Pending checkpoint barrier mustexists for non-aborted checkpoints.");
        notifyCheckpoint(checkpointBarrierCount.getPendingCheckpoint());
    }

    @Override // org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler
    public long getLatestCheckpointId() {
        if (this.pendingCheckpoints.isEmpty()) {
            return -1L;
        }
        return this.pendingCheckpoints.peekLast().checkpointId();
    }

    @Override // org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler
    public void updateInputs() {
        this.numOpenChannels = Arrays.stream(this.inputs.get()).mapToInt((v0) -> {
            return v0.getNumberOfUnfinishedInputChannels();
        }).sum();
    }

    @Override // org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler
    public boolean isCheckpointPending() {
        return !this.pendingCheckpoints.isEmpty();
    }

    @VisibleForTesting
    int getNumOpenChannels() {
        return this.numOpenChannels;
    }

    @VisibleForTesting
    List<Long> getPendingCheckpointIds() {
        return (List) this.pendingCheckpoints.stream().map((v0) -> {
            return v0.checkpointId();
        }).collect(Collectors.toList());
    }
}
