package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

@NotThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.class */
public class SortMergeResultPartition extends ResultPartition {
    private static final int NUM_WRITE_BUFFER_BYTES = 8388608;
    private static final int EXPECTED_WRITE_BATCH_SIZE = 512;
    private final Object lock;

    @GuardedBy("lock")
    private PartitionedFile resultFile;
    private final List<MemorySegment> writeSegments;
    private boolean hasNotifiedEndOfUserRecords;
    private final int networkBufferSize;

    @GuardedBy("lock")
    private PartitionedFileWriter fileWriter;
    private final String resultFileBasePath;
    private final int[] subpartitionOrder;
    private final BatchShuffleReadBufferPool readBufferPool;
    private final SortMergeResultPartitionReadScheduler readScheduler;
    private int numBuffersForSort;
    private boolean useHashBuffer;
    private DataBuffer broadcastDataBuffer;
    private DataBuffer unicastDataBuffer;

    public SortMergeResultPartition(String str, int i, ResultPartitionID resultPartitionID, ResultPartitionType resultPartitionType, int i2, int i3, BatchShuffleReadBufferPool batchShuffleReadBufferPool, Executor executor, ResultPartitionManager resultPartitionManager, String str2, @Nullable BufferCompressor bufferCompressor, SupplierWithException<BufferPool, IOException> supplierWithException) {
        super(str, i, resultPartitionID, resultPartitionType, i2, i3, resultPartitionManager, bufferCompressor, supplierWithException);
        this.lock = new Object();
        this.writeSegments = new ArrayList();
        this.resultFileBasePath = (String) Preconditions.checkNotNull(str2);
        this.readBufferPool = (BatchShuffleReadBufferPool) Preconditions.checkNotNull(batchShuffleReadBufferPool);
        this.networkBufferSize = batchShuffleReadBufferPool.getBufferSize();
        this.subpartitionOrder = getRandomSubpartitionOrder(i2);
        this.readScheduler = new SortMergeResultPartitionReadScheduler(i2, batchShuffleReadBufferPool, executor, this.lock);
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void setup() throws IOException {
        synchronized (this.lock) {
            if (isReleased()) {
                throw new IOException("Result partition has been released.");
            }
            try {
                this.fileWriter = new PartitionedFileWriter(this.numSubpartitions, 4194304, this.resultFileBasePath);
            } catch (Throwable th) {
                throw new IOException("Failed to create file writer.", th);
            }
        }
        this.readBufferPool.initialize();
        super.setup();
        int numberOfRequiredMemorySegments = this.bufferPool.getNumberOfRequiredMemorySegments();
        if (numberOfRequiredMemorySegments < 2) {
            throw new IOException(String.format("Too few sort buffers, please increase %s.", NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS));
        }
        int i = 0;
        if (numberOfRequiredMemorySegments >= 2 * this.numSubpartitions) {
            this.useHashBuffer = true;
        } else {
            i = this.networkBufferSize >= 8388608 ? 1 : Math.min(512, 8388608 / this.networkBufferSize);
        }
        int min = Math.min(numberOfRequiredMemorySegments / 2, i);
        this.numBuffersForSort = numberOfRequiredMemorySegments - min;
        for (int i2 = 0; i2 < min; i2++) {
            try {
                this.writeSegments.add(this.bufferPool.requestMemorySegmentBlocking());
            } catch (InterruptedException e) {
                throw new IOException(e);
            }
        }
        LOG.info("Sort-merge partition {} initialized, num sort buffers: {}, num write buffers: {}.", getPartitionId(), Integer.valueOf(this.numBuffersForSort), Integer.valueOf(min));
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    protected void releaseInternal() {
        synchronized (this.lock) {
            if (this.resultFile == null && this.fileWriter != null) {
                this.fileWriter.releaseQuietly();
            }
            this.readScheduler.release().thenRun(() -> {
                synchronized (this.lock) {
                    if (this.resultFile != null) {
                        this.resultFile.deleteQuietly();
                        this.resultFile = null;
                    }
                }
            });
        }
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void emitRecord(ByteBuffer byteBuffer, int i) throws IOException {
        emit(byteBuffer, i, Buffer.DataType.DATA_BUFFER, false);
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void broadcastRecord(ByteBuffer byteBuffer) throws IOException {
        broadcast(byteBuffer, Buffer.DataType.DATA_BUFFER);
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void broadcastEvent(AbstractEvent abstractEvent, boolean z) throws IOException {
        Buffer buffer = EventSerializer.toBuffer(abstractEvent, z);
        try {
            broadcast(buffer.getNioBufferReadable(), buffer.getDataType());
            buffer.recycleBuffer();
        } catch (Throwable th) {
            buffer.recycleBuffer();
            throw th;
        }
    }

    private void broadcast(ByteBuffer byteBuffer, Buffer.DataType dataType) throws IOException {
        emit(byteBuffer, 0, dataType, true);
    }

    private void emit(ByteBuffer byteBuffer, int i, Buffer.DataType dataType, boolean z) throws IOException {
        checkInProduceState();
        DataBuffer broadcastDataBuffer = z ? getBroadcastDataBuffer() : getUnicastDataBuffer();
        if (broadcastDataBuffer.append(byteBuffer, i, dataType)) {
            if (!broadcastDataBuffer.hasRemaining()) {
                broadcastDataBuffer.reset();
                writeLargeRecord(byteBuffer, i, dataType, z);
                return;
            }
            flushDataBuffer(broadcastDataBuffer, z);
            broadcastDataBuffer.reset();
            if (byteBuffer.hasRemaining()) {
                emit(byteBuffer, i, dataType, z);
            }
        }
    }

    private void releaseDataBuffer(DataBuffer dataBuffer) {
        if (dataBuffer != null) {
            dataBuffer.release();
        }
    }

    private DataBuffer getUnicastDataBuffer() throws IOException {
        flushBroadcastDataBuffer();
        if (this.unicastDataBuffer != null && !this.unicastDataBuffer.isFinished()) {
            return this.unicastDataBuffer;
        }
        this.unicastDataBuffer = createNewDataBuffer();
        return this.unicastDataBuffer;
    }

    private DataBuffer getBroadcastDataBuffer() throws IOException {
        flushUnicastDataBuffer();
        if (this.broadcastDataBuffer != null && !this.broadcastDataBuffer.isFinished()) {
            return this.broadcastDataBuffer;
        }
        this.broadcastDataBuffer = createNewDataBuffer();
        return this.broadcastDataBuffer;
    }

    private DataBuffer createNewDataBuffer() {
        return this.useHashBuffer ? new HashBasedDataBuffer(this.bufferPool, this.numSubpartitions, this.numBuffersForSort, this.subpartitionOrder) : new SortBasedDataBuffer(this.bufferPool, this.numSubpartitions, this.networkBufferSize, this.numBuffersForSort, this.subpartitionOrder);
    }

    private void flushDataBuffer(DataBuffer dataBuffer, boolean z) throws IOException {
        if (dataBuffer == null || dataBuffer.isReleased() || !dataBuffer.hasRemaining()) {
            return;
        }
        ArrayDeque arrayDeque = new ArrayDeque(this.writeSegments);
        int min = this.useHashBuffer ? 512 : Math.min(512, arrayDeque.size());
        ArrayList arrayList = new ArrayList(min);
        this.fileWriter.startNewRegion(z);
        while (true) {
            if (arrayList.size() >= min) {
                writeBuffers(arrayList);
                arrayDeque = new ArrayDeque(this.writeSegments);
            }
            BufferWithChannel nextBuffer = dataBuffer.getNextBuffer((MemorySegment) arrayDeque.poll());
            if (nextBuffer == null) {
                writeBuffers(arrayList);
                return;
            } else {
                updateStatistics(nextBuffer.getBuffer(), z);
                arrayList.add(compressBufferIfPossible(nextBuffer));
            }
        }
    }

    private void flushBroadcastDataBuffer() throws IOException {
        if (this.broadcastDataBuffer != null) {
            this.broadcastDataBuffer.finish();
            flushDataBuffer(this.broadcastDataBuffer, true);
            this.broadcastDataBuffer.release();
            this.broadcastDataBuffer = null;
        }
    }

    private void flushUnicastDataBuffer() throws IOException {
        if (this.unicastDataBuffer != null) {
            this.unicastDataBuffer.finish();
            flushDataBuffer(this.unicastDataBuffer, false);
            this.unicastDataBuffer.release();
            this.unicastDataBuffer = null;
        }
    }

    private BufferWithChannel compressBufferIfPossible(BufferWithChannel bufferWithChannel) {
        Buffer buffer = bufferWithChannel.getBuffer();
        return !canBeCompressed(buffer) ? bufferWithChannel : new BufferWithChannel(((BufferCompressor) Preconditions.checkNotNull(this.bufferCompressor)).compressToOriginalBuffer(buffer), bufferWithChannel.getChannelIndex());
    }

    private void updateStatistics(Buffer buffer, boolean z) {
        this.numBuffersOut.inc(z ? this.numSubpartitions : 1L);
        long readableBytes = buffer.readableBytes();
        this.numBytesProduced.inc(readableBytes);
        this.numBytesOut.inc(z ? readableBytes * this.numSubpartitions : readableBytes);
    }

    private void writeLargeRecord(ByteBuffer byteBuffer, int i, Buffer.DataType dataType, boolean z) throws IOException {
        Preconditions.checkState(!this.useHashBuffer, "No buffers available for writing.");
        this.fileWriter.startNewRegion(z);
        ArrayList arrayList = new ArrayList();
        ArrayDeque arrayDeque = new ArrayDeque(this.writeSegments);
        while (byteBuffer.hasRemaining()) {
            if (arrayDeque.isEmpty()) {
                this.fileWriter.writeBuffers(arrayList);
                arrayList.clear();
                arrayDeque = new ArrayDeque(this.writeSegments);
            }
            int min = Math.min(byteBuffer.remaining(), this.networkBufferSize);
            MemorySegment memorySegment = (MemorySegment) Preconditions.checkNotNull(arrayDeque.poll());
            memorySegment.put(0, byteBuffer, min);
            NetworkBuffer networkBuffer = new NetworkBuffer(memorySegment, memorySegment2 -> {
            }, dataType, min);
            BufferWithChannel bufferWithChannel = new BufferWithChannel(networkBuffer, i);
            updateStatistics(networkBuffer, z);
            arrayList.add(compressBufferIfPossible(bufferWithChannel));
        }
        this.fileWriter.writeBuffers(arrayList);
    }

    private void writeBuffers(List<BufferWithChannel> list) throws IOException {
        this.fileWriter.writeBuffers(list);
        list.forEach(bufferWithChannel -> {
            bufferWithChannel.getBuffer().recycleBuffer();
        });
        list.clear();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void notifyEndOfData(StopMode stopMode) throws IOException {
        if (this.hasNotifiedEndOfUserRecords) {
            return;
        }
        broadcastEvent(new EndOfData(stopMode), false);
        this.hasNotifiedEndOfUserRecords = true;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void finish() throws IOException {
        broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
        Preconditions.checkState(this.unicastDataBuffer == null, "The unicast sort buffer should be either null or released.");
        flushBroadcastDataBuffer();
        synchronized (this.lock) {
            Preconditions.checkState(!isReleased(), "Result partition is already released.");
            this.resultFile = this.fileWriter.finish();
            super.finish();
            LOG.info("New partitioned file produced: {}.", this.resultFile);
        }
    }

    private void releaseWriteBuffers() {
        if (this.bufferPool != null) {
            Iterator<MemorySegment> it = this.writeSegments.iterator();
            while (it.hasNext()) {
                this.bufferPool.recycle(it.next());
            }
            this.writeSegments.clear();
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter, java.lang.AutoCloseable
    public void close() {
        releaseWriteBuffers();
        releaseDataBuffer(this.unicastDataBuffer);
        releaseDataBuffer(this.broadcastDataBuffer);
        super.close();
        IOUtils.closeQuietly(this.fileWriter);
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public ResultSubpartitionView createSubpartitionView(int i, BufferAvailabilityListener bufferAvailabilityListener) throws IOException {
        SortMergeSubpartitionReader createSubpartitionReader;
        synchronized (this.lock) {
            Preconditions.checkElementIndex(i, this.numSubpartitions, "Subpartition not found.");
            Preconditions.checkState(!isReleased(), "Partition released.");
            Preconditions.checkState(isFinished(), "Trying to read unfinished blocking partition.");
            if (!this.resultFile.isReadable()) {
                throw new PartitionNotFoundException(getPartitionId());
            }
            createSubpartitionReader = this.readScheduler.createSubpartitionReader(bufferAvailabilityListener, i, this.resultFile);
        }
        return createSubpartitionReader;
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void flushAll() {
        try {
            flushUnicastDataBuffer();
            flushBroadcastDataBuffer();
        } catch (IOException e) {
            LOG.error("Failed to flush the current sort buffer.", (Throwable) e);
        }
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void flush(int i) {
        try {
            flushUnicastDataBuffer();
            flushBroadcastDataBuffer();
        } catch (IOException e) {
            LOG.error("Failed to flush the current sort buffer.", (Throwable) e);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.AvailabilityProvider
    public CompletableFuture<?> getAvailableFuture() {
        return AVAILABLE;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    public int getNumberOfQueuedBuffers() {
        return 0;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    public long getSizeOfQueuedBuffersUnsafe() {
        return 0L;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    public int getNumberOfQueuedBuffers(int i) {
        return 0;
    }

    private int[] getRandomSubpartitionOrder(int i) {
        int[] iArr = new int[i];
        int nextInt = new Random().nextInt(i);
        for (int i2 = 0; i2 < i; i2++) {
            iArr[(i2 + nextInt) % i] = i2;
        }
        return iArr;
    }

    @VisibleForTesting
    PartitionedFile getResultFile() {
        PartitionedFile partitionedFile;
        synchronized (this.lock) {
            partitionedFile = this.resultFile;
        }
        return partitionedFile;
    }
}
