package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.util.Preconditions;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsInstanceOf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.class */
public class PipelinedSubpartitionWithReadViewTest {
    ResultPartition resultPartition;
    PipelinedSubpartition subpartition;
    AwaitableBufferAvailablityListener availablityListener;
    PipelinedSubpartitionView readView;

    @Parameterized.Parameter
    public boolean compressionEnabled;

    @Parameterized.Parameters(name = "compressionEnabled = {0}")
    public static Boolean[] parameters() {
        return new Boolean[]{false, true};
    }

    @Before
    public void before() throws IOException {
        setup(ResultPartitionType.PIPELINED);
        this.subpartition = new PipelinedSubpartition(0, this.resultPartition);
        this.availablityListener = new AwaitableBufferAvailablityListener();
        this.readView = this.subpartition.createReadView(this.availablityListener);
    }

    @After
    public void tearDown() {
        this.readView.releaseAllResources();
        this.subpartition.release();
    }

    @Test(expected = IllegalStateException.class)
    public void testAddTwoNonFinishedBuffer() throws IOException {
        this.subpartition.add(BufferBuilderTestUtils.createBufferBuilder().createBufferConsumer());
        this.subpartition.add(BufferBuilderTestUtils.createBufferBuilder().createBufferConsumer());
        Assert.assertNull(this.readView.getNextBuffer());
    }

    @Test
    public void testRelease() {
        this.readView.releaseAllResources();
        this.resultPartition.close();
        Assert.assertFalse(this.resultPartition.getPartitionManager().getUnreleasedPartitions().contains(this.resultPartition.getPartitionId()));
    }

    @Test
    public void testAddEmptyNonFinishedBuffer() throws IOException {
        Assert.assertEquals(0L, this.availablityListener.getNumNotifications());
        BufferBuilder createBufferBuilder = BufferBuilderTestUtils.createBufferBuilder();
        this.subpartition.add(createBufferBuilder.createBufferConsumer());
        Assert.assertEquals(0L, this.availablityListener.getNumNotifications());
        Assert.assertNull(this.readView.getNextBuffer());
        createBufferBuilder.finish();
        this.subpartition.add(BufferBuilderTestUtils.createBufferBuilder().createBufferConsumer());
        Assert.assertEquals(1L, this.subpartition.getBuffersInBacklog());
        Assert.assertEquals(1L, this.availablityListener.getNumNotifications());
        Assert.assertNull(this.readView.getNextBuffer());
        Assert.assertEquals(0L, this.subpartition.getBuffersInBacklog());
    }

    @Test
    public void testAddNonEmptyNotFinishedBuffer() throws Exception {
        Assert.assertEquals(0L, this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledUnfinishedBufferConsumer(1024));
        Assert.assertEquals(0L, this.subpartition.getBuffersInBacklog());
        assertNextBuffer(this.readView, 1024, false, 0, false, false);
    }

    @Test
    public void testUnfinishedBufferBehindFinished() throws Exception {
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1025));
        this.subpartition.add(BufferBuilderTestUtils.createFilledUnfinishedBufferConsumer(1024));
        Assert.assertEquals(1L, this.subpartition.getBuffersInBacklog());
        MatcherAssert.assertThat(Long.valueOf(this.availablityListener.getNumNotifications()), Matchers.greaterThan(0L));
        assertNextBuffer(this.readView, 1025, false, 0, false, true);
        assertNextBuffer(this.readView, 1024, false, 0, false, false);
        assertNoNextBuffer(this.readView);
    }

    @Test
    public void testFlushWithUnfinishedBufferBehindFinished() throws Exception {
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1025));
        this.subpartition.add(BufferBuilderTestUtils.createFilledUnfinishedBufferConsumer(1024));
        long numNotifications = this.availablityListener.getNumNotifications();
        Assert.assertEquals(1L, this.subpartition.getBuffersInBacklog());
        this.subpartition.flush();
        MatcherAssert.assertThat(Long.valueOf(numNotifications), Matchers.greaterThan(0L));
        Assert.assertEquals(numNotifications, this.availablityListener.getNumNotifications());
        Assert.assertEquals(2L, this.subpartition.getBuffersInBacklog());
        assertNextBuffer(this.readView, 1025, true, 1, false, true);
        assertNextBuffer(this.readView, 1024, false, 0, false, false);
        assertNoNextBuffer(this.readView);
    }

    @Test
    public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception {
        this.subpartition.flush();
        Assert.assertEquals(0L, this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1025));
        this.subpartition.add(BufferBuilderTestUtils.createFilledUnfinishedBufferConsumer(1024));
        Assert.assertEquals(1L, this.subpartition.getBuffersInBacklog());
        assertNextBuffer(this.readView, 1025, false, 0, false, true);
        long numNotifications = this.availablityListener.getNumNotifications();
        this.subpartition.flush();
        Assert.assertEquals(numNotifications + 1, this.availablityListener.getNumNotifications());
        this.subpartition.flush();
        Assert.assertEquals(numNotifications + 1, this.availablityListener.getNumNotifications());
        Assert.assertEquals(1L, this.subpartition.getBuffersInBacklog());
        assertNextBuffer(this.readView, 1024, false, 0, false, false);
        assertNoNextBuffer(this.readView);
    }

    @Test
    public void testMultipleEmptyBuffers() throws Exception {
        Assert.assertEquals(0L, this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(0));
        Assert.assertEquals(0L, this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(0));
        Assert.assertEquals(1L, this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(0));
        Assert.assertEquals(1L, this.availablityListener.getNumNotifications());
        Assert.assertEquals(2L, this.subpartition.getBuffersInBacklog());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1024));
        Assert.assertEquals(1L, this.availablityListener.getNumNotifications());
        assertNextBuffer(this.readView, 1024, false, 0, false, true);
    }

    @Test
    public void testEmptyFlush() {
        this.subpartition.flush();
        Assert.assertEquals(0L, this.availablityListener.getNumNotifications());
    }

    @Test
    public void testBasicPipelinedProduceConsumeLogic() throws Exception {
        Assert.assertFalse(this.readView.isAvailable(0));
        assertNoNextBuffer(this.readView);
        Assert.assertFalse(this.readView.isAvailable(0));
        Assert.assertEquals(0L, this.availablityListener.getNumNotifications());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        Assert.assertFalse(this.readView.isAvailable(0));
        Assert.assertEquals(1L, this.subpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(0L, this.subpartition.getBuffersInBacklog());
        Assert.assertEquals(0L, this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals(0L, this.availablityListener.getNumNotifications());
        assertNextBuffer(this.readView, 32768, false, 0, false, true);
        Assert.assertEquals(32768L, this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals(0L, this.subpartition.getBuffersInBacklog());
        assertNoNextBuffer(this.readView);
        Assert.assertEquals(0L, this.subpartition.getBuffersInBacklog());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        Assert.assertFalse(this.readView.isAvailable(0));
        Assert.assertEquals(2L, this.subpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(0L, this.subpartition.getBuffersInBacklog());
        Assert.assertEquals(32768L, this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals(0L, this.availablityListener.getNumNotifications());
        assertNextBuffer(this.readView, 32768, false, 0, false, true);
        Assert.assertEquals(65536L, this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals(0L, this.subpartition.getBuffersInBacklog());
        assertNoNextBuffer(this.readView);
        Assert.assertEquals(0L, this.subpartition.getBuffersInBacklog());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        Assert.assertFalse(this.readView.isAvailable(0));
        this.subpartition.add(BufferBuilderTestUtils.createEventBufferConsumer(32768, Buffer.DataType.EVENT_BUFFER));
        Assert.assertFalse(this.readView.isAvailable(0));
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        Assert.assertFalse(this.readView.isAvailable(0));
        Assert.assertEquals(5L, this.subpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(1L, this.subpartition.getBuffersInBacklog());
        Assert.assertEquals(65536L, this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals(1L, this.availablityListener.getNumNotifications());
        assertNextBuffer(this.readView, 32768, true, 0, true, true);
        Assert.assertEquals(98304L, this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals(0L, this.subpartition.getBuffersInBacklog());
        assertNextEvent(this.readView, 32768, null, false, 0, false, true);
        Assert.assertEquals(131072L, this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals(0L, this.subpartition.getBuffersInBacklog());
        assertNextBuffer(this.readView, 32768, false, 0, false, true);
        Assert.assertEquals(163840L, this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals(0L, this.subpartition.getBuffersInBacklog());
        assertNoNextBuffer(this.readView);
        Assert.assertEquals(0L, this.subpartition.getBuffersInBacklog());
        Assert.assertEquals(5L, this.subpartition.getTotalNumberOfBuffers());
        Assert.assertEquals(163840L, this.subpartition.getTotalNumberOfBytes());
        Assert.assertEquals(1L, this.availablityListener.getNumNotifications());
    }

    @Test
    public void testBarrierOvertaking() throws Exception {
        RecordingChannelStateWriter recordingChannelStateWriter = new RecordingChannelStateWriter();
        this.subpartition.setChannelStateWriter(recordingChannelStateWriter);
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1));
        Assert.assertEquals(0L, this.availablityListener.getNumNotifications());
        Assert.assertEquals(0L, this.availablityListener.getNumPriorityEvents());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(2));
        Assert.assertEquals(1L, this.availablityListener.getNumNotifications());
        Assert.assertEquals(0L, this.availablityListener.getNumPriorityEvents());
        BufferConsumer bufferConsumer = EventSerializer.toBufferConsumer(EndOfSuperstepEvent.INSTANCE, false);
        this.subpartition.add(bufferConsumer);
        Assert.assertEquals(1L, this.availablityListener.getNumNotifications());
        Assert.assertEquals(0L, this.availablityListener.getNumPriorityEvents());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4));
        Assert.assertEquals(1L, this.availablityListener.getNumNotifications());
        Assert.assertEquals(0L, this.availablityListener.getNumPriorityEvents());
        CheckpointOptions checkpointOptions = new CheckpointOptions(CheckpointType.CHECKPOINT, new CheckpointStorageLocationReference(new byte[]{0, 1, 2}), true, true, 0L);
        recordingChannelStateWriter.start(0L, checkpointOptions);
        BufferConsumer bufferConsumer2 = EventSerializer.toBufferConsumer(new CheckpointBarrier(0L, 0L, checkpointOptions), true);
        this.subpartition.add(bufferConsumer2);
        Assert.assertEquals(1L, this.availablityListener.getNumNotifications());
        Assert.assertEquals(1L, this.availablityListener.getNumPriorityEvents());
        List list = recordingChannelStateWriter.getAddedOutput().get(this.subpartition.getSubpartitionInfo());
        Assert.assertEquals(Arrays.asList(1, 2, 4), list.stream().map((v0) -> {
            return v0.getSize();
        }).collect(Collectors.toList()));
        list.forEach((v0) -> {
            v0.recycleBuffer();
        });
        assertNextEvent(this.readView, bufferConsumer2.getWrittenBytes(), CheckpointBarrier.class, true, 2, false, true);
        assertNextBuffer(this.readView, 1, true, 1, false, true);
        assertNextBuffer(this.readView, 2, true, 0, true, true);
        assertNextEvent(this.readView, bufferConsumer.getWrittenBytes(), EndOfSuperstepEvent.class, false, 0, false, true);
        assertNextBuffer(this.readView, 4, false, 0, false, true);
        assertNoNextBuffer(this.readView);
    }

    @Test
    public void testAvailabilityAfterPriority() throws Exception {
        this.subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP);
        BufferConsumer bufferConsumer = EventSerializer.toBufferConsumer(new CheckpointBarrier(0L, 0L, new CheckpointOptions(CheckpointType.CHECKPOINT, new CheckpointStorageLocationReference(new byte[]{0, 1, 2}), true, true, 0L)), true);
        this.subpartition.add(bufferConsumer);
        Assert.assertEquals(1L, this.availablityListener.getNumNotifications());
        Assert.assertEquals(1L, this.availablityListener.getNumPriorityEvents());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1));
        Assert.assertEquals(2L, this.availablityListener.getNumNotifications());
        Assert.assertEquals(1L, this.availablityListener.getNumPriorityEvents());
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(2));
        Assert.assertEquals(2L, this.availablityListener.getNumNotifications());
        Assert.assertEquals(1L, this.availablityListener.getNumPriorityEvents());
        assertNextEvent(this.readView, bufferConsumer.getWrittenBytes(), CheckpointBarrier.class, true, 1, false, true);
        assertNextBuffer(this.readView, 1, false, 0, false, true);
        assertNextBuffer(this.readView, 2, false, 0, false, true);
        assertNoNextBuffer(this.readView);
    }

    @Test
    public void testBacklogConsistentWithNumberOfConsumableBuffers() throws Exception {
        testBacklogConsistentWithNumberOfConsumableBuffers(false, false);
    }

    @Test
    public void testBacklogConsistentWithConsumableBuffersForFlushedPartition() throws Exception {
        testBacklogConsistentWithNumberOfConsumableBuffers(true, false);
    }

    @Test
    public void testBacklogConsistentWithConsumableBuffersForFinishedPartition() throws Exception {
        testBacklogConsistentWithNumberOfConsumableBuffers(false, true);
    }

    private void testBacklogConsistentWithNumberOfConsumableBuffers(boolean z, boolean z2) throws Exception {
        for (int i = 1; i <= 5; i++) {
            if (i < 5 || z2) {
                this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1024));
            } else {
                this.subpartition.add(BufferBuilderTestUtils.createFilledUnfinishedBufferConsumer(1024));
            }
        }
        if (z) {
            this.subpartition.flush();
        }
        if (z2) {
            this.subpartition.finish();
        }
        int buffersInBacklog = this.subpartition.getBuffersInBacklog();
        int i2 = 0;
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        Throwable th = null;
        while (this.readView.isAvailable(Integer.MAX_VALUE)) {
            try {
                try {
                    ResultSubpartition.BufferAndBacklog nextBuffer = this.readView.getNextBuffer();
                    Assert.assertNotNull(nextBuffer);
                    if (nextBuffer.buffer().isBuffer()) {
                        i2++;
                    }
                    Buffer buffer = nextBuffer.buffer();
                    buffer.getClass();
                    closeableRegistry.registerCloseable(buffer::recycleBuffer);
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (closeableRegistry != null) {
                    if (th != null) {
                        try {
                            closeableRegistry.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        closeableRegistry.close();
                    }
                }
                throw th3;
            }
        }
        MatcherAssert.assertThat(Integer.valueOf(buffersInBacklog), Matchers.is(Integer.valueOf(i2)));
        if (closeableRegistry != null) {
            if (0 == 0) {
                closeableRegistry.close();
                return;
            }
            try {
                closeableRegistry.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    @Test
    public void testBlockedByCheckpointAndResumeConsumption() throws IOException, InterruptedException {
        blockSubpartitionByCheckpoint(1);
        this.subpartition.add(BufferBuilderTestUtils.createEventBufferConsumer(32768, Buffer.DataType.EVENT_BUFFER));
        checkNumNotificationsAndAvailability(1);
        resumeConsumptionAndCheckAvailability(0, true);
        assertNextEvent(this.readView, 32768, null, false, 0, false, true);
        blockSubpartitionByCheckpoint(2);
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        this.subpartition.flush();
        checkNumNotificationsAndAvailability(2);
        resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, false);
        assertNextBuffer(this.readView, 32768, false, 0, false, true);
        blockSubpartitionByCheckpoint(3);
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        this.subpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        checkNumNotificationsAndAvailability(3);
        resumeConsumptionAndCheckAvailability(Integer.MAX_VALUE, true);
        assertNextBuffer(this.readView, 32768, false, 0, false, true);
        assertNextBuffer(this.readView, 32768, false, 0, false, true);
    }

    private void blockSubpartitionByCheckpoint(int i) throws IOException, InterruptedException {
        this.subpartition.add(BufferBuilderTestUtils.createEventBufferConsumer(32768, Buffer.DataType.ALIGNED_CHECKPOINT_BARRIER));
        Assert.assertEquals(i, this.availablityListener.getNumNotifications());
        assertNextEvent(this.readView, 32768, null, false, 0, false, true);
    }

    private void checkNumNotificationsAndAvailability(int i) throws IOException, InterruptedException {
        Assert.assertEquals(i, this.availablityListener.getNumNotifications());
        Assert.assertFalse(this.readView.isAvailable(Integer.MAX_VALUE));
        assertNoNextBuffer(this.readView);
    }

    private void resumeConsumptionAndCheckAvailability(int i, boolean z) {
        this.readView.resumeConsumption();
        Assert.assertEquals(Boolean.valueOf(z), Boolean.valueOf(this.readView.isAvailable(i)));
    }

    static void assertNextBuffer(ResultSubpartitionView resultSubpartitionView, int i, boolean z, int i2, boolean z2, boolean z3) throws IOException, InterruptedException {
        assertNextBufferOrEvent(resultSubpartitionView, i, true, null, z, i2, z2, z3);
    }

    static void assertNextEvent(ResultSubpartitionView resultSubpartitionView, int i, Class<? extends AbstractEvent> cls, boolean z, int i2, boolean z2, boolean z3) throws IOException, InterruptedException {
        assertNextBufferOrEvent(resultSubpartitionView, i, false, cls, z, i2, z2, z3);
    }

    private static void assertNextBufferOrEvent(ResultSubpartitionView resultSubpartitionView, int i, boolean z, @Nullable Class<? extends AbstractEvent> cls, boolean z2, int i2, boolean z3, boolean z4) throws IOException, InterruptedException {
        Preconditions.checkArgument(cls == null || !z);
        ResultSubpartition.BufferAndBacklog nextBuffer = resultSubpartitionView.getNextBuffer();
        Assert.assertNotNull(nextBuffer);
        try {
            Assert.assertEquals("buffer size", i, nextBuffer.buffer().readableBytes());
            Assert.assertEquals("buffer or event", Boolean.valueOf(z), Boolean.valueOf(nextBuffer.buffer().isBuffer()));
            if (cls != null) {
                Assert.assertThat(EventSerializer.fromBuffer(nextBuffer.buffer(), ClassLoader.getSystemClassLoader()), IsInstanceOf.instanceOf(cls));
            }
            Assert.assertEquals("data available", Boolean.valueOf(z2), Boolean.valueOf(nextBuffer.isDataAvailable()));
            Assert.assertEquals("data available", Boolean.valueOf(z2), Boolean.valueOf(resultSubpartitionView.isAvailable(Integer.MAX_VALUE)));
            Assert.assertEquals("backlog", i2, nextBuffer.buffersInBacklog());
            Assert.assertEquals("event available", Boolean.valueOf(z3), Boolean.valueOf(nextBuffer.isEventAvailable()));
            Assert.assertEquals("event available", Boolean.valueOf(z3), Boolean.valueOf(resultSubpartitionView.isAvailable(0)));
            Assert.assertFalse("not recycled", nextBuffer.buffer().isRecycled());
            nextBuffer.buffer().recycleBuffer();
            Assert.assertEquals("recycled", Boolean.valueOf(z4), Boolean.valueOf(nextBuffer.buffer().isRecycled()));
        } catch (Throwable th) {
            nextBuffer.buffer().recycleBuffer();
            throw th;
        }
    }

    static void assertNoNextBuffer(ResultSubpartitionView resultSubpartitionView) throws IOException, InterruptedException {
        Assert.assertNull(resultSubpartitionView.getNextBuffer());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setup(ResultPartitionType resultPartitionType) throws IOException {
        this.resultPartition = PartitionTestUtils.createPartition(resultPartitionType, NoOpFileChannelManager.INSTANCE, this.compressionEnabled, 32768);
        this.resultPartition.setup();
    }
}
