package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Queue;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.PartitionSortedBufferTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.class */
public class SortMergeResultPartitionTest {
    private static final BufferAvailabilityListener listener = new NoOpBufferAvailablityListener();
    private static final int bufferSize = 1024;
    private static final int totalBuffers = 1000;
    private FileChannelManager fileChannelManager;
    private NetworkBufferPool globalPool;

    @Rule
    public final TemporaryFolder tmpFolder = new TemporaryFolder();

    @Before
    public void setUp() {
        this.fileChannelManager = new FileChannelManagerImpl(new String[]{this.tmpFolder.getRoot().getPath()}, "testing");
        this.globalPool = new NetworkBufferPool(totalBuffers, bufferSize);
    }

    @After
    public void shutdown() throws Exception {
        this.fileChannelManager.close();
        this.globalPool.destroy();
    }

    @Test
    public void testWriteAndRead() throws Exception {
        Random random = new Random();
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, this.globalPool.createBufferPool(100, 100));
        Queue<PartitionSortedBufferTest.DataAndType>[] queueArr = new Queue[10];
        Queue[] queueArr2 = new Queue[10];
        for (int i = 0; i < 10; i++) {
            queueArr[i] = new ArrayDeque();
            queueArr2[i] = new ArrayDeque();
        }
        int[] iArr = new int[10];
        int[] iArr2 = new int[10];
        Arrays.fill(iArr, 0);
        Arrays.fill(iArr2, 0);
        for (int i2 = 0; i2 < totalBuffers; i2++) {
            byte[] bArr = new byte[random.nextInt(2048) + 1];
            random.nextBytes(bArr);
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            if (random.nextBoolean()) {
                createSortMergedPartition.broadcastRecord(wrap);
                for (int i3 = 0; i3 < 10; i3++) {
                    recordDataWritten(wrap, queueArr, i3, iArr, Buffer.DataType.DATA_BUFFER);
                }
            } else {
                int nextInt = random.nextInt(10);
                createSortMergedPartition.emitRecord(wrap, nextInt);
                recordDataWritten(wrap, queueArr, nextInt, iArr, Buffer.DataType.DATA_BUFFER);
            }
        }
        createSortMergedPartition.finish();
        createSortMergedPartition.close();
        for (int i4 = 0; i4 < 10; i4++) {
            recordDataWritten(EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE), queueArr, i4, iArr, Buffer.DataType.EVENT_BUFFER);
        }
        for (int i5 = 0; i5 < 10; i5++) {
            ResultSubpartitionView createSubpartitionView = createSortMergedPartition.createSubpartitionView(i5, listener);
            while (createSubpartitionView.isAvailable(Integer.MAX_VALUE)) {
                Buffer buffer = createSubpartitionView.getNextBuffer().buffer();
                int readableBytes = buffer.readableBytes();
                int i6 = i5;
                iArr2[i6] = iArr2[i6] + readableBytes;
                MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(readableBytes);
                allocateUnpooledSegment.put(0, buffer.getNioBufferReadable(), readableBytes);
                queueArr2[i5].add(new NetworkBuffer(allocateUnpooledSegment, memorySegment -> {
                }, buffer.getDataType(), readableBytes));
                buffer.recycleBuffer();
            }
            createSubpartitionView.releaseAllResources();
        }
        PartitionSortedBufferTest.checkWriteReadResult(10, iArr, iArr2, queueArr, queueArr2);
    }

    private void recordDataWritten(ByteBuffer byteBuffer, Queue<PartitionSortedBufferTest.DataAndType>[] queueArr, int i, int[] iArr, Buffer.DataType dataType) {
        byteBuffer.rewind();
        queueArr[i].add(new PartitionSortedBufferTest.DataAndType(byteBuffer, dataType));
        iArr[i] = iArr[i] + byteBuffer.remaining();
    }

    @Test
    public void testWriteLargeRecord() throws Exception {
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, this.globalPool.createBufferPool(100, 100));
        byte[] bArr = new byte[bufferSize * 100];
        new Random().nextBytes(bArr);
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        createSortMergedPartition.emitRecord(wrap, 0);
        Assert.assertEquals(0L, r0.bestEffortGetNumOfUsedBuffers());
        createSortMergedPartition.finish();
        createSortMergedPartition.close();
        ResultSubpartitionView createSubpartitionView = createSortMergedPartition.createSubpartitionView(0, listener);
        ByteBuffer allocate = ByteBuffer.allocate(bufferSize * 100);
        while (createSubpartitionView.isAvailable(Integer.MAX_VALUE)) {
            Buffer buffer = createSubpartitionView.getNextBuffer().buffer();
            if (buffer.isBuffer()) {
                allocate.put(buffer.getNioBufferReadable());
            }
            buffer.recycleBuffer();
        }
        createSubpartitionView.releaseAllResources();
        wrap.rewind();
        allocate.flip();
        Assert.assertEquals(wrap, allocate);
    }

    @Test
    public void testFlush() throws Exception {
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, this.globalPool.createBufferPool(10, 10));
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize), 1);
        Assert.assertEquals(3L, r0.bestEffortGetNumOfUsedBuffers());
        createSortMergedPartition.flush(0);
        Assert.assertEquals(0L, r0.bestEffortGetNumOfUsedBuffers());
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize), 2);
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize), 3);
        Assert.assertEquals(3L, r0.bestEffortGetNumOfUsedBuffers());
        createSortMergedPartition.flushAll();
        Assert.assertEquals(0L, r0.bestEffortGetNumOfUsedBuffers());
        Assert.assertNull(createSortMergedPartition.getResultFile());
        createSortMergedPartition.finish();
        Assert.assertEquals(3L, createSortMergedPartition.getResultFile().getNumRegions());
        createSortMergedPartition.close();
    }

    @Test(expected = IllegalStateException.class)
    public void testReleaseWhileWriting() throws Exception {
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, this.globalPool.createBufferPool(10, 10));
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize * (10 - 1)), 0);
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize * (10 - 1)), 1);
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize), 2);
        Assert.assertNull(createSortMergedPartition.getResultFile());
        Assert.assertEquals(2L, this.fileChannelManager.getPaths()[0].list().length);
        createSortMergedPartition.release();
        try {
            createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize * 10), 2);
            Assert.fail("Should throw ClosedChannelException.");
        } catch (IllegalStateException e) {
            Assert.assertEquals(0L, this.fileChannelManager.getPaths()[0].list().length);
            throw e;
        }
    }

    @Test
    public void testReleaseWhileReading() throws Exception {
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, this.globalPool.createBufferPool(10, 10));
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize * (10 - 1)), 0);
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize * (10 - 1)), 1);
        createSortMergedPartition.finish();
        createSortMergedPartition.close();
        Assert.assertEquals(2L, createSortMergedPartition.getResultFile().getNumRegions());
        Assert.assertEquals(2L, this.fileChannelManager.getPaths()[0].list().length);
        ResultSubpartitionView createSubpartitionView = createSortMergedPartition.createSubpartitionView(0, listener);
        createSubpartitionView.getNextBuffer().buffer().recycleBuffer();
        createSortMergedPartition.release();
        Assert.assertEquals(2L, createSortMergedPartition.getResultFile().getNumRegions());
        Assert.assertEquals(2L, this.fileChannelManager.getPaths()[0].list().length);
        while (createSubpartitionView.isAvailable(Integer.MAX_VALUE)) {
            createSubpartitionView.getNextBuffer().buffer().recycleBuffer();
        }
        createSubpartitionView.releaseAllResources();
        Assert.assertNull(createSortMergedPartition.getResultFile());
        Assert.assertEquals(0L, this.fileChannelManager.getPaths()[0].list().length);
    }

    @Test
    public void testCloseReleasesAllBuffers() throws Exception {
        BufferPool createBufferPool = this.globalPool.createBufferPool(100, 100);
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, createBufferPool);
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize * (100 - 1)), 5);
        Assert.assertEquals(100, createBufferPool.bestEffortGetNumOfUsedBuffers());
        createSortMergedPartition.close();
        Assert.assertTrue(createBufferPool.isDestroyed());
        Assert.assertEquals(1000L, this.globalPool.getNumberOfAvailableMemorySegments());
    }

    @Test(expected = IllegalStateException.class)
    public void testReadUnfinishedPartition() throws Exception {
        BufferPool createBufferPool = this.globalPool.createBufferPool(10, 10);
        try {
            createSortMergedPartition(10, createBufferPool).createSubpartitionView(0, listener);
        } finally {
            createBufferPool.lazyDestroy();
        }
    }

    @Test(expected = IllegalStateException.class)
    public void testReadReleasedPartition() throws Exception {
        BufferPool createBufferPool = this.globalPool.createBufferPool(10, 10);
        try {
            SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, createBufferPool);
            createSortMergedPartition.finish();
            createSortMergedPartition.release();
            createSortMergedPartition.createSubpartitionView(0, listener);
        } finally {
            createBufferPool.lazyDestroy();
        }
    }

    private SortMergeResultPartition createSortMergedPartition(int i, BufferPool bufferPool) throws IOException {
        SortMergeResultPartition sortMergeResultPartition = new SortMergeResultPartition("SortMergedResultPartitionTest", 0, new ResultPartitionID(), ResultPartitionType.BLOCKING, i, i, bufferSize, new ResultPartitionManager(), this.fileChannelManager.createChannel().getPath(), (BufferCompressor) null, () -> {
            return bufferPool;
        });
        sortMergeResultPartition.setup();
        return sortMergeResultPartition;
    }
}
