package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import javax.annotation.Nullable;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.class */
public class SortMergeSubpartitionReader implements ResultSubpartitionView, BufferRecycler {
    private static final int NUM_READ_BUFFERS = 2;
    private final SortMergeResultPartition partition;
    private final BufferAvailabilityListener availabilityListener;
    private final Queue<MemorySegment> readBuffers = new ArrayDeque();
    private final Queue<Buffer> buffersRead = new ArrayDeque();
    private final PartitionedFileReader fileReader;
    private int dataBufferBacklog;
    private boolean isReleased;
    private int sequenceNumber;

    public SortMergeSubpartitionReader(int i, int i2, int i3, SortMergeResultPartition sortMergeResultPartition, BufferAvailabilityListener bufferAvailabilityListener, PartitionedFile partitionedFile) throws IOException {
        this.partition = (SortMergeResultPartition) Preconditions.checkNotNull(sortMergeResultPartition);
        this.availabilityListener = (BufferAvailabilityListener) Preconditions.checkNotNull(bufferAvailabilityListener);
        this.dataBufferBacklog = i2;
        for (int i4 = 0; i4 < 2; i4++) {
            this.readBuffers.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(i3, null));
        }
        this.fileReader = new PartitionedFileReader(partitionedFile, i);
        try {
            readBuffers();
        } catch (Throwable th) {
            IOUtils.closeQuietly(this.fileReader);
            throw th;
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    @Nullable
    public ResultSubpartition.BufferAndBacklog getNextBuffer() {
        Preconditions.checkState(!this.isReleased, "Reader is already released.");
        Buffer poll = this.buffersRead.poll();
        if (poll == null) {
            return null;
        }
        if (poll.isBuffer()) {
            this.dataBufferBacklog--;
        }
        Buffer peek = this.buffersRead.peek();
        Buffer.DataType dataType = peek == null ? Buffer.DataType.NONE : peek.getDataType();
        int i = this.dataBufferBacklog;
        int i2 = this.sequenceNumber;
        this.sequenceNumber = i2 + 1;
        return ResultSubpartition.BufferAndBacklog.fromBufferAndLookahead(poll, dataType, i, i2);
    }

    void readBuffers() throws IOException {
        while (true) {
            MemorySegment poll = this.readBuffers.poll();
            if (poll == null) {
                return;
            }
            Buffer readBuffer = this.fileReader.readBuffer(poll, this);
            if (readBuffer == null) {
                this.readBuffers.add(poll);
                return;
            }
            this.buffersRead.add(readBuffer);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifyDataAvailable() {
        if (this.buffersRead.isEmpty()) {
            return;
        }
        this.availabilityListener.notifyDataAvailable();
    }

    @Override // org.apache.flink.runtime.io.network.buffer.BufferRecycler
    public void recycle(MemorySegment memorySegment) {
        if (!this.isReleased) {
            this.readBuffers.add(memorySegment);
        }
        if (this.isReleased || this.readBuffers.size() != 2) {
            return;
        }
        try {
            readBuffers();
        } catch (IOException e) {
            ExceptionUtils.rethrow(e, "Failed to read next buffer.");
        }
        notifyDataAvailable();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void releaseAllResources() {
        this.isReleased = true;
        this.buffersRead.clear();
        this.readBuffers.clear();
        IOUtils.closeQuietly(this.fileReader);
        this.partition.releaseReader(this);
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public boolean isReleased() {
        return this.isReleased;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void resumeConsumption() {
        throw new UnsupportedOperationException("Method should never be called.");
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public Throwable getFailureCause() {
        return null;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public boolean isAvailable(int i) {
        return i > 0 ? !this.buffersRead.isEmpty() : (this.buffersRead.isEmpty() || this.buffersRead.peek().isBuffer()) ? false : true;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public int unsynchronizedGetNumberOfQueuedBuffers() {
        return 0;
    }
}
