package org.apache.flink.streaming.api.operators.async.queue;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.class */
public final class OrderedStreamElementQueue<OUT> implements StreamElementQueue<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) OrderedStreamElementQueue.class);
    private final int capacity;
    private final Queue<StreamElementQueueEntry<OUT>> queue;

    public OrderedStreamElementQueue(int i) {
        Preconditions.checkArgument(i > 0, "The capacity must be larger than 0.");
        this.capacity = i;
        this.queue = new ArrayDeque(i);
    }

    @Override // org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue
    public boolean hasCompletedElements() {
        return !this.queue.isEmpty() && this.queue.peek().isDone();
    }

    @Override // org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue
    public void emitCompletedElement(TimestampedCollector<OUT> timestampedCollector) {
        if (hasCompletedElements()) {
            this.queue.poll().emitResult(timestampedCollector);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue
    public List<StreamElement> values() {
        ArrayList arrayList = new ArrayList(this.queue.size());
        Iterator<StreamElementQueueEntry<OUT>> it = this.queue.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getInputElement());
        }
        return arrayList;
    }

    @Override // org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue
    public boolean isEmpty() {
        return this.queue.isEmpty();
    }

    @Override // org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue
    public int size() {
        return this.queue.size();
    }

    @Override // org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue
    public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {
        if (this.queue.size() >= this.capacity) {
            LOG.debug("Failed to put element into ordered stream element queue because it was full ({}/{}).", Integer.valueOf(this.queue.size()), Integer.valueOf(this.capacity));
            return Optional.empty();
        }
        StreamElementQueueEntry<OUT> createEntry = createEntry(streamElement);
        this.queue.add(createEntry);
        LOG.debug("Put element into ordered stream element queue. New filling degree ({}/{}).", Integer.valueOf(this.queue.size()), Integer.valueOf(this.capacity));
        return Optional.of(createEntry);
    }

    private StreamElementQueueEntry<OUT> createEntry(StreamElement streamElement) {
        if (streamElement.isRecord()) {
            return new StreamRecordQueueEntry((StreamRecord) streamElement);
        }
        if (streamElement.isWatermark()) {
            return new WatermarkQueueEntry((Watermark) streamElement);
        }
        throw new UnsupportedOperationException("Cannot enqueue " + streamElement);
    }
}
