package org.apache.flink.runtime.io.network.buffer;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsCollectionContaining;
import org.hamcrest.core.IsNot;
import org.hamcrest.core.IsNull;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;

/* loaded from: input_file:org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.class */
public class NetworkBufferPoolTest extends TestLogger {

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Rule
    public Timeout timeout = new Timeout(10, TimeUnit.SECONDS);

    @Test
    public void testCreatePoolAfterDestroy() {
        try {
            NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 128);
            Assert.assertThat(Integer.valueOf(networkBufferPool.getNumberOfRegisteredBufferPools()), CoreMatchers.is(0));
            networkBufferPool.destroy();
            Assert.assertTrue(networkBufferPool.isDestroyed());
            try {
                networkBufferPool.createBufferPool(2, 2);
                Assert.fail("Should throw an IllegalStateException");
            } catch (IllegalStateException e) {
            }
            try {
                networkBufferPool.createBufferPool(2, 10);
                Assert.fail("Should throw an IllegalStateException");
            } catch (IllegalStateException e2) {
            }
            try {
                networkBufferPool.createBufferPool(2, Integer.MAX_VALUE);
                Assert.fail("Should throw an IllegalStateException");
            } catch (IllegalStateException e3) {
            }
        } catch (Exception e4) {
            e4.printStackTrace();
            Assert.fail(e4.getMessage());
        }
    }

    @Test
    public void testMemoryUsageInTheContextOfMemoryPoolCreation() {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 128);
        Assert.assertThat(Integer.valueOf(networkBufferPool.getTotalNumberOfMemorySegments()), CoreMatchers.is(10));
        Assert.assertThat(Integer.valueOf(networkBufferPool.getNumberOfAvailableMemorySegments()), CoreMatchers.is(10));
        Assert.assertThat(Integer.valueOf(networkBufferPool.getNumberOfUsedMemorySegments()), CoreMatchers.is(0));
        Assert.assertThat(Long.valueOf(networkBufferPool.getTotalMemory()), CoreMatchers.is(1280L));
        Assert.assertThat(Long.valueOf(networkBufferPool.getAvailableMemory()), CoreMatchers.is(1280L));
        Assert.assertThat(Long.valueOf(networkBufferPool.getUsedMemory()), CoreMatchers.is(0L));
    }

    @Test
    public void testMemoryUsageInTheContextOfMemorySegmentAllocation() {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 128);
        Assert.assertThat(networkBufferPool.requestMemorySegment(), CoreMatchers.is(IsNull.notNullValue()));
        Assert.assertThat(Integer.valueOf(networkBufferPool.getTotalNumberOfMemorySegments()), CoreMatchers.is(10));
        Assert.assertThat(Integer.valueOf(networkBufferPool.getNumberOfAvailableMemorySegments()), CoreMatchers.is(9));
        Assert.assertThat(Integer.valueOf(networkBufferPool.getNumberOfUsedMemorySegments()), CoreMatchers.is(1));
        Assert.assertThat(Long.valueOf(networkBufferPool.getTotalMemory()), CoreMatchers.is(1280L));
        Assert.assertThat(Long.valueOf(networkBufferPool.getAvailableMemory()), CoreMatchers.is(1152L));
        Assert.assertThat(Long.valueOf(networkBufferPool.getUsedMemory()), CoreMatchers.is(128L));
    }

    @Test
    public void testMemoryUsageInTheContextOfMemoryPoolDestruction() {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 128);
        networkBufferPool.destroy();
        Assert.assertThat(Integer.valueOf(networkBufferPool.getTotalNumberOfMemorySegments()), CoreMatchers.is(0));
        Assert.assertThat(Integer.valueOf(networkBufferPool.getNumberOfAvailableMemorySegments()), CoreMatchers.is(0));
        Assert.assertThat(Integer.valueOf(networkBufferPool.getNumberOfUsedMemorySegments()), CoreMatchers.is(0));
        Assert.assertThat(Long.valueOf(networkBufferPool.getTotalMemory()), CoreMatchers.is(0L));
        Assert.assertThat(Long.valueOf(networkBufferPool.getAvailableMemory()), CoreMatchers.is(0L));
        Assert.assertThat(Long.valueOf(networkBufferPool.getUsedMemory()), CoreMatchers.is(0L));
    }

    @Test
    public void testDestroyAll() throws IOException {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 128);
        BufferPool createBufferPool = networkBufferPool.createBufferPool(2, 2);
        BufferPool createBufferPool2 = networkBufferPool.createBufferPool(1, 1);
        BufferPool createBufferPool3 = networkBufferPool.createBufferPool(5, Integer.MAX_VALUE);
        Assert.assertEquals(2L, createBufferPool.getNumberOfRequiredMemorySegments());
        Assert.assertEquals(1L, createBufferPool2.getNumberOfRequiredMemorySegments());
        Assert.assertEquals(5L, createBufferPool3.getNumberOfRequiredMemorySegments());
        ArrayList arrayList = new ArrayList(networkBufferPool.getTotalNumberOfMemorySegments());
        for (int i = 0; i < 10; i++) {
            BufferPool[] bufferPoolArr = {createBufferPool, createBufferPool2, createBufferPool3};
            int length = bufferPoolArr.length;
            int i2 = 0;
            while (true) {
                if (i2 >= length) {
                    break;
                }
                Buffer requestBuffer = bufferPoolArr[i2].requestBuffer();
                if (requestBuffer != null) {
                    Assert.assertNotNull(requestBuffer.getMemorySegment());
                    arrayList.add(requestBuffer);
                    break;
                }
                i2++;
            }
        }
        Assert.assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), arrayList.size());
        Assert.assertNull(createBufferPool.requestBuffer());
        Assert.assertNull(createBufferPool2.requestBuffer());
        Assert.assertNull(createBufferPool3.requestBuffer());
        networkBufferPool.destroyAllBufferPools();
        Assert.assertFalse(networkBufferPool.isDestroyed());
        Assert.assertTrue(createBufferPool.isDestroyed());
        Assert.assertTrue(createBufferPool2.isDestroyed());
        Assert.assertTrue(createBufferPool3.isDestroyed());
        Assert.assertEquals(0L, networkBufferPool.getNumberOfRegisteredBufferPools());
        Assert.assertEquals(0L, networkBufferPool.getNumberOfAvailableMemorySegments());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Buffer) it.next()).recycleBuffer();
        }
        Assert.assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), networkBufferPool.getNumberOfAvailableMemorySegments());
        try {
            createBufferPool.requestBuffer();
            Assert.fail("Should fail with an IllegalStateException");
        } catch (IllegalStateException e) {
        }
        try {
            createBufferPool2.requestBuffer();
            Assert.fail("Should fail with an IllegalStateException");
        } catch (IllegalStateException e2) {
        }
        try {
            createBufferPool3.requestBuffer();
            Assert.fail("Should fail with an IllegalStateException");
        } catch (IllegalStateException e3) {
        }
        Assert.assertNotNull(networkBufferPool.createBufferPool(10, Integer.MAX_VALUE));
    }

    @Test
    public void testRequestMemorySegmentsLessThanTotalBuffers() throws IOException {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 128);
        List emptyList = Collections.emptyList();
        try {
            emptyList = networkBufferPool.requestMemorySegments(5);
            Assert.assertEquals(emptyList.size(), 5L);
            networkBufferPool.recycleMemorySegments(emptyList);
            emptyList.clear();
            Assert.assertEquals(networkBufferPool.getNumberOfAvailableMemorySegments(), 10L);
            networkBufferPool.recycleMemorySegments(emptyList);
            networkBufferPool.destroy();
        } catch (Throwable th) {
            networkBufferPool.recycleMemorySegments(emptyList);
            networkBufferPool.destroy();
            throw th;
        }
    }

    @Test
    public void testRequestMemorySegmentsMoreThanTotalBuffers() {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 128);
        try {
            try {
                networkBufferPool.requestMemorySegments(11);
                Assert.fail("Should throw an IOException");
                networkBufferPool.destroy();
            } catch (IOException e) {
                Assert.assertEquals(networkBufferPool.getNumberOfAvailableMemorySegments(), 10L);
                networkBufferPool.destroy();
            }
        } catch (Throwable th) {
            networkBufferPool.destroy();
            throw th;
        }
    }

    @Test(expected = IllegalArgumentException.class)
    public void testRequestMemorySegmentsWithInvalidArgument() throws IOException {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 128);
        networkBufferPool.requestMemorySegments(0);
        networkBufferPool.destroy();
        Assert.fail("Should throw an IllegalArgumentException");
    }

    @Test
    public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, InterruptedException {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 128);
        ArrayList arrayList = new ArrayList(10);
        List emptyList = Collections.emptyList();
        Thread thread = null;
        BufferPool bufferPool = null;
        try {
            bufferPool = networkBufferPool.createBufferPool(5, 10);
            for (int i = 0; i < 10; i++) {
                Buffer requestBuffer = bufferPool.requestBuffer();
                arrayList.add(requestBuffer);
                Assert.assertNotNull(requestBuffer);
            }
            OneShotLatch oneShotLatch = new OneShotLatch();
            thread = new Thread(() -> {
                try {
                    oneShotLatch.trigger();
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                }
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    ((Buffer) it.next()).recycleBuffer();
                }
            });
            thread.start();
            oneShotLatch.await();
            emptyList = networkBufferPool.requestMemorySegments(5);
            Assert.assertThat(emptyList, IsNot.not(IsCollectionContaining.hasItem(CoreMatchers.nullValue())));
            if (thread != null) {
                thread.join();
            }
            if (bufferPool != null) {
                bufferPool.lazyDestroy();
            }
            networkBufferPool.recycleMemorySegments(emptyList);
            networkBufferPool.destroy();
        } catch (Throwable th) {
            if (thread != null) {
                thread.join();
            }
            if (bufferPool != null) {
                bufferPool.lazyDestroy();
            }
            networkBufferPool.recycleMemorySegments(emptyList);
            networkBufferPool.destroy();
            throw th;
        }
    }

    @Test
    public void testRequestMemorySegmentsInterruptable() throws Exception {
        final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 128);
        MemorySegment requestMemorySegment = networkBufferPool.requestMemorySegment();
        Assert.assertNotNull(requestMemorySegment);
        final OneShotLatch oneShotLatch = new OneShotLatch();
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.runtime.io.network.buffer.NetworkBufferPoolTest.1
            public void go() throws IOException {
                oneShotLatch.trigger();
                networkBufferPool.requestMemorySegments(10);
            }
        };
        checkedThread.start();
        oneShotLatch.await();
        Thread.sleep(10L);
        networkBufferPool.destroy();
        requestMemorySegment.free();
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("destroyed");
        try {
            checkedThread.sync();
            networkBufferPool.destroy();
        } catch (Throwable th) {
            networkBufferPool.destroy();
            throw th;
        }
    }

    @Test
    public void testRequestMemorySegmentsInterruptable2() throws Exception {
        final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 128);
        MemorySegment requestMemorySegment = networkBufferPool.requestMemorySegment();
        Assert.assertNotNull(requestMemorySegment);
        final OneShotLatch oneShotLatch = new OneShotLatch();
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.runtime.io.network.buffer.NetworkBufferPoolTest.2
            public void go() throws IOException {
                oneShotLatch.trigger();
                networkBufferPool.requestMemorySegments(10);
            }
        };
        checkedThread.start();
        oneShotLatch.await();
        Thread.sleep(10L);
        checkedThread.interrupt();
        networkBufferPool.recycle(requestMemorySegment);
        try {
            try {
                checkedThread.sync();
                networkBufferPool.destroy();
            } catch (IOException e) {
                Assert.assertThat(e, Matchers.hasProperty("cause", CoreMatchers.instanceOf(InterruptedException.class)));
                networkBufferPool.createBufferPool(10, 10);
                networkBufferPool.destroy();
            }
        } catch (Throwable th) {
            networkBufferPool.destroy();
            throw th;
        }
    }

    @Test
    public void testRequestMemorySegmentsTimeout() throws Exception {
        final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 128, Duration.ofMillis(50L));
        BufferPool createBufferPool = networkBufferPool.createBufferPool(1, 10);
        for (int i = 0; i < 10; i++) {
            createBufferPool.requestBuffer();
        }
        Assert.assertEquals(0L, networkBufferPool.getNumberOfAvailableMemorySegments());
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.runtime.io.network.buffer.NetworkBufferPoolTest.3
            public void go() throws Exception {
                networkBufferPool.requestMemorySegments(2);
            }
        };
        checkedThread.start();
        this.expectedException.expect(IOException.class);
        this.expectedException.expectMessage("Timeout");
        try {
            checkedThread.sync();
            networkBufferPool.destroy();
        } catch (Throwable th) {
            networkBufferPool.destroy();
            throw th;
        }
    }

    @Test
    public void testIsAvailableOrNotAfterRequestAndRecycleSingleSegment() {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(2, 128);
        try {
            Assert.assertTrue(networkBufferPool.getAvailableFuture().isDone());
            MemorySegment memorySegment = (MemorySegment) Preconditions.checkNotNull(networkBufferPool.requestMemorySegment());
            Assert.assertTrue(networkBufferPool.getAvailableFuture().isDone());
            MemorySegment memorySegment2 = (MemorySegment) Preconditions.checkNotNull(networkBufferPool.requestMemorySegment());
            Assert.assertFalse(networkBufferPool.getAvailableFuture().isDone());
            CompletableFuture availableFuture = networkBufferPool.getAvailableFuture();
            networkBufferPool.recycle(memorySegment);
            Assert.assertTrue(availableFuture.isDone());
            Assert.assertTrue(networkBufferPool.getAvailableFuture().isDone());
            networkBufferPool.recycle(memorySegment2);
            Assert.assertTrue(networkBufferPool.getAvailableFuture().isDone());
            networkBufferPool.destroy();
        } catch (Throwable th) {
            networkBufferPool.destroy();
            throw th;
        }
    }

    @Test(timeout = 10000)
    public void testIsAvailableOrNotAfterRequestAndRecycleMultiSegments() throws InterruptedException, IOException {
        final NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 128);
        try {
            Assert.assertTrue(networkBufferPool.getAvailableFuture().isDone());
            List requestMemorySegments = networkBufferPool.requestMemorySegments(5);
            Assert.assertTrue(networkBufferPool.getAvailableFuture().isDone());
            Assert.assertEquals(5L, requestMemorySegments.size());
            List requestMemorySegments2 = networkBufferPool.requestMemorySegments(5);
            Assert.assertFalse(networkBufferPool.getAvailableFuture().isDone());
            Assert.assertEquals(5L, requestMemorySegments2.size());
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            final ArrayList arrayList = new ArrayList(5);
            new CheckedThread() { // from class: org.apache.flink.runtime.io.network.buffer.NetworkBufferPoolTest.4
                public void go() throws Exception {
                    arrayList.addAll(networkBufferPool.requestMemorySegments(5));
                    countDownLatch.countDown();
                }
            }.start();
            CompletableFuture availableFuture = networkBufferPool.getAvailableFuture();
            networkBufferPool.recycleMemorySegments(requestMemorySegments);
            Assert.assertTrue(availableFuture.isDone());
            countDownLatch.await();
            Assert.assertFalse(networkBufferPool.getAvailableFuture().isDone());
            Assert.assertEquals(5L, arrayList.size());
            networkBufferPool.recycleMemorySegments(requestMemorySegments2);
            Assert.assertTrue(networkBufferPool.getAvailableFuture().isDone());
            networkBufferPool.recycleMemorySegments(arrayList);
            Assert.assertTrue(networkBufferPool.getAvailableFuture().isDone());
            networkBufferPool.destroy();
        } catch (Throwable th) {
            networkBufferPool.destroy();
            throw th;
        }
    }

    @Test
    public void testBlockingRequestFromMultiLocalBufferPool() throws IOException, InterruptedException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(20, 128);
        ArrayList<BufferPool> arrayList = new ArrayList(2);
        for (int i = 0; i < 2; i++) {
            try {
                BufferPool createBufferPool = networkBufferPool.createBufferPool(5, 10);
                arrayList.add(createBufferPool);
                Assert.assertTrue(createBufferPool.getAvailableFuture().isDone());
            } finally {
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    ((BufferPool) it.next()).lazyDestroy();
                }
                newFixedThreadPool.shutdown();
                networkBufferPool.destroy();
            }
        }
        ArrayList arrayList2 = new ArrayList(9);
        for (int i2 = 0; i2 < 9; i2++) {
            arrayList2.add(networkBufferPool.requestMemorySegment());
        }
        List requestMemorySegments = networkBufferPool.requestMemorySegments(networkBufferPool.getNumberOfAvailableMemorySegments() - 1);
        Assert.assertTrue(networkBufferPool.getAvailableFuture().isDone());
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            Assert.assertTrue(((BufferPool) it2.next()).getAvailableFuture().isDone());
        }
        CountDownLatch countDownLatch = new CountDownLatch(2);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(20);
        AtomicReference atomicReference = new AtomicReference();
        for (BufferPool bufferPool : arrayList) {
            newFixedThreadPool.submit(() -> {
                try {
                    for (int i3 = 10; i3 > 0; i3--) {
                        try {
                            arrayBlockingQueue.add(bufferPool.requestBufferBuilderBlocking());
                        } catch (Exception e) {
                            atomicReference.set(e);
                            countDownLatch.countDown();
                            return;
                        }
                    }
                    countDownLatch.countDown();
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            });
        }
        while (arrayBlockingQueue.size() + arrayList2.size() + requestMemorySegments.size() < 20) {
            Thread.sleep(100L);
            Assert.assertNull(atomicReference.get());
        }
        CompletableFuture availableFuture = networkBufferPool.getAvailableFuture();
        Assert.assertFalse(availableFuture.isDone());
        ArrayList arrayList3 = new ArrayList(2);
        Iterator it3 = arrayList.iterator();
        while (it3.hasNext()) {
            CompletableFuture availableFuture2 = ((BufferPool) it3.next()).getAvailableFuture();
            arrayList3.add(availableFuture2);
            Assert.assertFalse(availableFuture2.isDone());
        }
        Iterator it4 = arrayList2.iterator();
        while (it4.hasNext()) {
            networkBufferPool.recycle((MemorySegment) it4.next());
        }
        networkBufferPool.recycleMemorySegments(requestMemorySegments);
        Assert.assertTrue(availableFuture.isDone());
        Iterator it5 = arrayList3.iterator();
        while (it5.hasNext()) {
            Assert.assertTrue(((CompletableFuture) it5.next()).isDone());
        }
        countDownLatch.await();
        Assert.assertNull(atomicReference.get());
        Assert.assertEquals(0L, networkBufferPool.getNumberOfAvailableMemorySegments());
        Assert.assertFalse(networkBufferPool.getAvailableFuture().isDone());
        Iterator it6 = arrayList.iterator();
        while (it6.hasNext()) {
            Assert.assertFalse(((BufferPool) it6.next()).getAvailableFuture().isDone());
            Assert.assertEquals(10L, r0.bestEffortGetNumOfUsedBuffers());
        }
        Iterator it7 = arrayBlockingQueue.iterator();
        while (it7.hasNext()) {
            ((BufferBuilder) it7.next()).createBufferConsumer().close();
        }
    }
}
