package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.PullingAsyncDataInput;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/CheckpointedInputGate.class */
public class CheckpointedInputGate implements PullingAsyncDataInput<BufferOrEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointedInputGate.class);
    private final CheckpointBarrierHandler barrierHandler;
    private final InputGate inputGate;
    private final int channelIndexOffset;
    private final BufferStorage bufferStorage;
    private boolean endOfInputGate;
    private boolean isFinished;

    public CheckpointedInputGate(InputGate inputGate, BufferStorage bufferStorage, String str, @Nullable AbstractInvokable abstractInvokable) {
        this(inputGate, bufferStorage, new CheckpointBarrierAligner(inputGate.getNumberOfInputChannels(), str, abstractInvokable));
    }

    public CheckpointedInputGate(InputGate inputGate, BufferStorage bufferStorage, CheckpointBarrierHandler checkpointBarrierHandler) {
        this(inputGate, bufferStorage, checkpointBarrierHandler, 0);
    }

    public CheckpointedInputGate(InputGate inputGate, BufferStorage bufferStorage, CheckpointBarrierHandler checkpointBarrierHandler, int i) {
        this.inputGate = inputGate;
        this.channelIndexOffset = i;
        this.bufferStorage = (BufferStorage) Preconditions.checkNotNull(bufferStorage);
        this.barrierHandler = checkpointBarrierHandler;
    }

    public CompletableFuture<?> getAvailableFuture() {
        return this.bufferStorage.isEmpty() ? this.inputGate.getAvailableFuture() : AVAILABLE;
    }

    public Optional<BufferOrEvent> pollNext() throws Exception {
        Optional<BufferOrEvent> pollNext;
        while (true) {
            if (this.bufferStorage.isEmpty()) {
                pollNext = this.inputGate.pollNext();
            } else {
                pollNext = this.bufferStorage.pollNext();
                if (!pollNext.isPresent()) {
                    return pollNext();
                }
            }
            if (!pollNext.isPresent()) {
                return handleEmptyBuffer();
            }
            BufferOrEvent bufferOrEvent = pollNext.get();
            if (this.barrierHandler.isBlocked(offsetChannelIndex(bufferOrEvent.getChannelIndex()))) {
                this.bufferStorage.add(bufferOrEvent);
                if (this.bufferStorage.isFull()) {
                    this.barrierHandler.checkpointSizeLimitExceeded(this.bufferStorage.getMaxBufferedBytes());
                    this.bufferStorage.rollOver();
                }
            } else {
                if (bufferOrEvent.isBuffer()) {
                    return pollNext;
                }
                if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
                    CheckpointBarrier event = bufferOrEvent.getEvent();
                    if (!this.endOfInputGate && this.barrierHandler.processBarrier(event, offsetChannelIndex(bufferOrEvent.getChannelIndex()), this.bufferStorage.getPendingBytes())) {
                        this.bufferStorage.rollOver();
                    }
                } else {
                    if (bufferOrEvent.getEvent().getClass() != CancelCheckpointMarker.class) {
                        if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class && this.barrierHandler.processEndOfPartition()) {
                            this.bufferStorage.rollOver();
                        }
                        return pollNext;
                    }
                    if (this.barrierHandler.processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent())) {
                        this.bufferStorage.rollOver();
                    }
                }
            }
        }
    }

    private int offsetChannelIndex(int i) {
        return i + this.channelIndexOffset;
    }

    private Optional<BufferOrEvent> handleEmptyBuffer() throws Exception {
        if (!this.inputGate.isFinished()) {
            return Optional.empty();
        }
        if (this.endOfInputGate) {
            this.isFinished = true;
            return Optional.empty();
        }
        this.endOfInputGate = true;
        this.barrierHandler.releaseBlocksAndResetBarriers();
        this.bufferStorage.rollOver();
        return pollNext();
    }

    public boolean isEmpty() {
        return this.bufferStorage.isEmpty();
    }

    public boolean isFinished() {
        return this.isFinished;
    }

    public void cleanup() throws IOException {
        this.bufferStorage.close();
    }

    public long getLatestCheckpointId() {
        return this.barrierHandler.getLatestCheckpointId();
    }

    public long getAlignmentDurationNanos() {
        return this.barrierHandler.getAlignmentDurationNanos();
    }

    public int getNumberOfInputChannels() {
        return this.inputGate.getNumberOfInputChannels();
    }

    public String toString() {
        return this.barrierHandler.toString();
    }
}
