package org.apache.flink.runtime.io.disk;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xbill.DNS.TTL;

/* loaded from: input_file:org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.class */
public class BatchShuffleReadBufferPool {
    private static final int NUM_BYTES_PER_REQUEST = 8388608;
    private final long totalBytes;
    private final int numTotalBuffers;
    private final int bufferSize;
    private final int numBuffersPerRequest;
    private final Set<Object> bufferRequesters = ConcurrentHashMap.newKeySet();

    @GuardedBy("buffers")
    private final Queue<MemorySegment> buffers = new ArrayDeque();

    @GuardedBy("buffers")
    private long lastBufferOperationTimestamp = System.nanoTime();

    @GuardedBy("buffers")
    private boolean destroyed;

    @GuardedBy("buffers")
    private boolean initialized;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BatchShuffleReadBufferPool.class);
    private static final Duration WAITING_TIME = Duration.ofSeconds(2);

    public BatchShuffleReadBufferPool(long j, int i) {
        Preconditions.checkArgument(j > 0, "Total memory size must be positive.");
        Preconditions.checkArgument(i > 0, "Size of buffer must be positive.");
        Preconditions.checkArgument(j >= ((long) i), String.format("Illegal configuration, config value for '%s' must be no smaller than '%s', please increase '%s' to at least %d bytes.", TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key(), TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(), TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key(), Integer.valueOf(i)));
        this.totalBytes = j;
        this.bufferSize = i;
        this.numTotalBuffers = (int) Math.min(j / i, TTL.MAX_VALUE);
        this.numBuffersPerRequest = Math.min(this.numTotalBuffers, Math.max(1, 8388608 / i));
    }

    @VisibleForTesting
    long getTotalBytes() {
        return this.totalBytes;
    }

    @VisibleForTesting
    public int getNumTotalBuffers() {
        return this.numTotalBuffers;
    }

    @VisibleForTesting
    public int getAvailableBuffers() {
        int size;
        synchronized (this.buffers) {
            size = this.buffers.size();
        }
        return size;
    }

    public int getNumBuffersPerRequest() {
        return this.numBuffersPerRequest;
    }

    public int getMaxConcurrentRequests() {
        if (this.numBuffersPerRequest > 0) {
            return this.numTotalBuffers / this.numBuffersPerRequest;
        }
        return 0;
    }

    public int getBufferSize() {
        return this.bufferSize;
    }

    public void initialize() {
        synchronized (this.buffers) {
            Preconditions.checkState(!this.destroyed, "Buffer pool is already destroyed.");
            if (this.initialized) {
                return;
            }
            this.initialized = true;
            for (int i = 0; i < this.numTotalBuffers; i++) {
                try {
                    this.buffers.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(this.bufferSize));
                } catch (OutOfMemoryError e) {
                    int size = this.buffers.size();
                    this.buffers.forEach((v0) -> {
                        v0.free();
                    });
                    this.buffers.clear();
                    throw new OutOfMemoryError(String.format("Can't allocate enough direct buffer for batch shuffle read buffer pool (bytes allocated: %d, bytes still needed: %d). To avoid the exception, you need to do one of the following adjustments: 1) If you have ever decreased %s, you need to undo the decrement; 2) If you ever increased %s, you should also increase %s; 3) If neither the above cases, it usually means some other parts of your application have consumed too many direct memory and the value of %s should be increased.", Integer.valueOf(size * this.bufferSize), Integer.valueOf((this.numTotalBuffers - size) * this.bufferSize), TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key(), TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key(), TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key(), TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key()));
                }
            }
            LOG.info("Batch shuffle IO buffer pool initialized: numBuffers={}, bufferSize={}.", Integer.valueOf(this.numTotalBuffers), Integer.valueOf(this.bufferSize));
        }
    }

    public void registerRequester(Object obj) {
        this.bufferRequesters.add(obj);
    }

    public void unregisterRequester(Object obj) {
        this.bufferRequesters.remove(obj);
    }

    public int getAverageBuffersPerRequester() {
        return Math.max(1, this.numTotalBuffers / Math.max(1, this.bufferRequesters.size()));
    }

    public List<MemorySegment> requestBuffers() throws Exception {
        ArrayList arrayList = new ArrayList(this.numBuffersPerRequest);
        synchronized (this.buffers) {
            Preconditions.checkState(!this.destroyed, "Buffer pool is already destroyed.");
            if (!this.initialized) {
                initialize();
            }
            Deadline fromNow = Deadline.fromNow(WAITING_TIME);
            while (this.buffers.size() < this.numBuffersPerRequest) {
                Preconditions.checkState(!this.destroyed, "Buffer pool is already destroyed.");
                this.buffers.wait(WAITING_TIME.toMillis());
                if (!fromNow.hasTimeLeft()) {
                    return arrayList;
                }
            }
            while (arrayList.size() < this.numBuffersPerRequest) {
                arrayList.add(this.buffers.poll());
            }
            this.lastBufferOperationTimestamp = System.nanoTime();
            return arrayList;
        }
    }

    public void recycle(MemorySegment memorySegment) {
        Preconditions.checkArgument(memorySegment != null, "Buffer must be not null.");
        recycle(Collections.singletonList(memorySegment));
    }

    public void recycle(Collection<MemorySegment> collection) {
        Preconditions.checkArgument(collection != null, "Buffer list must be not null.");
        if (collection.isEmpty()) {
            return;
        }
        synchronized (this.buffers) {
            Preconditions.checkState(this.initialized, "Recycling a buffer before initialization.");
            if (this.destroyed) {
                collection.forEach((v0) -> {
                    v0.free();
                });
                return;
            }
            this.buffers.addAll(collection);
            this.lastBufferOperationTimestamp = System.nanoTime();
            if (this.buffers.size() >= this.numBuffersPerRequest) {
                this.buffers.notifyAll();
            }
        }
    }

    public long getLastBufferOperationTimestamp() {
        long j;
        synchronized (this.buffers) {
            j = this.lastBufferOperationTimestamp;
        }
        return j;
    }

    public void destroy() {
        synchronized (this.buffers) {
            this.destroyed = true;
            this.buffers.clear();
            this.buffers.notifyAll();
        }
    }

    public boolean isDestroyed() {
        boolean z;
        synchronized (this.buffers) {
            z = this.destroyed;
        }
        return z;
    }
}
