package org.apache.flink.runtime.io.network.api.serialization;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.Random;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.core.fs.RefCountedFile;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.FileBasedBufferIterator;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.class */
public final class SpanningWrapper {
    private static final Logger LOG = LoggerFactory.getLogger(SpanningWrapper.class);
    private String[] tempDirs;
    private final DataInputDeserializer serializationReadBuffer;
    private final int fileBufferSize;
    private FileChannel spillingChannel;
    private byte[] buffer;
    private int recordLength;
    private int accumulatedRecordBytes;
    private MemorySegment leftOverData;
    private int leftOverStart;
    private int leftOverLimit;
    private RefCountedFile spillFile;
    private DataInputViewStreamWrapper spillFileReader;
    private final int thresholdForSpilling;
    private final byte[] initialBuffer = new byte[1024];
    private final Random rnd = new Random();
    final ByteBuffer lengthBuffer = ByteBuffer.allocate(4);

    /* JADX INFO: Access modifiers changed from: package-private */
    public SpanningWrapper(String[] strArr, int i, int i2) {
        this.tempDirs = strArr;
        this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
        this.recordLength = -1;
        this.serializationReadBuffer = new DataInputDeserializer();
        this.buffer = this.initialBuffer;
        this.thresholdForSpilling = i;
        this.fileBufferSize = i2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void transferFrom(NonSpanningWrapper nonSpanningWrapper, int i) throws IOException {
        updateLength(i);
        this.accumulatedRecordBytes = isAboveSpillingThreshold() ? spill(nonSpanningWrapper) : nonSpanningWrapper.copyContentTo(this.buffer);
        nonSpanningWrapper.clear();
    }

    private boolean isAboveSpillingThreshold() {
        return this.recordLength > this.thresholdForSpilling;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addNextChunkFromMemorySegment(MemorySegment memorySegment, int i, int i2) throws IOException {
        int i3 = i + i2;
        int readLength = isReadingLength() ? readLength(memorySegment, i, i2) : 0;
        int i4 = i + readLength;
        int i5 = i2 - readLength;
        if (i5 == 0) {
            return;
        }
        int min = Math.min(this.recordLength - this.accumulatedRecordBytes, i5);
        if (min > 0) {
            copyFromSegment(memorySegment, i4, min);
        }
        if (i5 > min) {
            this.leftOverData = memorySegment;
            this.leftOverStart = i4 + min;
            this.leftOverLimit = i3;
        }
    }

    private void copyFromSegment(MemorySegment memorySegment, int i, int i2) throws IOException {
        if (this.spillingChannel == null) {
            copyIntoBuffer(memorySegment, i, i2);
        } else {
            copyIntoFile(memorySegment, i, i2);
        }
    }

    private void copyIntoFile(MemorySegment memorySegment, int i, int i2) throws IOException {
        FileUtils.writeCompletely(this.spillingChannel, memorySegment.wrap(i, i2));
        this.accumulatedRecordBytes += i2;
        if (hasFullRecord()) {
            this.spillingChannel.close();
            this.spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(this.spillFile.getFile()), this.fileBufferSize));
        }
    }

    private void copyIntoBuffer(MemorySegment memorySegment, int i, int i2) {
        memorySegment.get(i, this.buffer, this.accumulatedRecordBytes, i2);
        this.accumulatedRecordBytes += i2;
        if (hasFullRecord()) {
            this.serializationReadBuffer.setBuffer(this.buffer, 0, this.recordLength);
        }
    }

    private int readLength(MemorySegment memorySegment, int i, int i2) throws IOException {
        int min = Math.min(this.lengthBuffer.remaining(), i2);
        memorySegment.get(i, this.lengthBuffer, min);
        if (!this.lengthBuffer.hasRemaining()) {
            updateLength(this.lengthBuffer.getInt(0));
        }
        return min;
    }

    private void updateLength(int i) throws IOException {
        this.lengthBuffer.clear();
        this.recordLength = i;
        if (isAboveSpillingThreshold()) {
            this.spillingChannel = createSpillingChannel();
        } else {
            ensureBufferCapacity(i);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CloseableIterator<Buffer> getUnconsumedSegment() throws IOException {
        return isReadingLength() ? NonSpanningWrapper.singleBufferIterator(MemorySegmentFactory.wrapCopy(this.lengthBuffer.array(), 0, this.lengthBuffer.position())) : isAboveSpillingThreshold() ? createSpilledDataIterator() : this.recordLength == -1 ? CloseableIterator.empty() : NonSpanningWrapper.singleBufferIterator(copyDataBuffer());
    }

    private CloseableIterator<Buffer> createSpilledDataIterator() throws IOException {
        if (this.spillingChannel != null && this.spillingChannel.isOpen()) {
            this.spillingChannel.force(false);
        }
        CloseableIterator[] closeableIteratorArr = new CloseableIterator[3];
        closeableIteratorArr[0] = toSingleBufferIterator(MemorySegmentFactory.wrapInt(this.recordLength));
        closeableIteratorArr[1] = new FileBasedBufferIterator(this.spillFile, Math.min(this.accumulatedRecordBytes, this.recordLength), this.fileBufferSize);
        closeableIteratorArr[2] = this.leftOverData == null ? CloseableIterator.empty() : toSingleBufferIterator(MemorySegmentFactory.wrapCopy(this.leftOverData.getArray(), this.leftOverStart, this.leftOverLimit));
        return CloseableIterator.flatten(closeableIteratorArr);
    }

    private MemorySegment copyDataBuffer() throws IOException {
        int i = this.leftOverLimit - this.leftOverStart;
        int i2 = 4 + this.accumulatedRecordBytes + i;
        DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(i2);
        dataOutputSerializer.writeInt(this.recordLength);
        dataOutputSerializer.write(this.buffer, 0, this.accumulatedRecordBytes);
        if (this.leftOverData != null) {
            dataOutputSerializer.write(this.leftOverData, this.leftOverStart, i);
        }
        MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(i2);
        allocateUnpooledSegment.put(0, dataOutputSerializer.getSharedBuffer(), 0, allocateUnpooledSegment.size());
        return allocateUnpooledSegment;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void transferLeftOverTo(NonSpanningWrapper nonSpanningWrapper) {
        nonSpanningWrapper.clear();
        if (this.leftOverData != null) {
            nonSpanningWrapper.initializeFromMemorySegment(this.leftOverData, this.leftOverStart, this.leftOverLimit);
        }
        clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasFullRecord() {
        return this.recordLength >= 0 && this.accumulatedRecordBytes >= this.recordLength;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumGatheredBytes() {
        return this.accumulatedRecordBytes + (this.recordLength >= 0 ? 4 : this.lengthBuffer.position());
    }

    public void clear() {
        this.buffer = this.initialBuffer;
        this.serializationReadBuffer.releaseArrays();
        this.recordLength = -1;
        this.lengthBuffer.clear();
        this.leftOverData = null;
        this.leftOverStart = 0;
        this.leftOverLimit = 0;
        this.accumulatedRecordBytes = 0;
        if (this.spillingChannel != null) {
            IOUtils.closeQuietly(this.spillingChannel);
        }
        if (this.spillFileReader != null) {
            IOUtils.closeQuietly(this.spillFileReader);
        }
        if (this.spillFile != null) {
            IOUtils.closeQuietly(() -> {
                this.spillFile.release();
            });
        }
        this.spillingChannel = null;
        this.spillFileReader = null;
        this.spillFile = null;
    }

    public DataInputView getInputView() {
        return this.spillFileReader == null ? this.serializationReadBuffer : this.spillFileReader;
    }

    private void ensureBufferCapacity(int i) {
        if (this.buffer.length < i) {
            byte[] bArr = new byte[Math.max(i, this.buffer.length * 2)];
            System.arraycopy(this.buffer, 0, bArr, 0, this.accumulatedRecordBytes);
            this.buffer = bArr;
        }
    }

    private FileChannel createSpillingChannel() throws IOException {
        if (this.spillFile != null) {
            throw new IllegalStateException("Spilling file already exists.");
        }
        int nextInt = this.rnd.nextInt(this.tempDirs.length);
        for (int i = 0; i < 10; i++) {
            int length = (nextInt + i) % this.tempDirs.length;
            String str = this.tempDirs[length];
            File file = new File(str, randomString(this.rnd) + ".inputchannel");
            try {
            } catch (IOException e) {
                if (this.tempDirs.length <= 1) {
                    throw e;
                }
                LOG.warn("Caught an IOException when creating spill file: " + str + ". Attempt " + i, e);
                this.tempDirs = (String[]) ArrayUtils.remove(this.tempDirs, length);
            }
            if (file.createNewFile()) {
                this.spillFile = new RefCountedFile(file);
                return new RandomAccessFile(file, "rw").getChannel();
            }
            continue;
        }
        throw new IOException("Could not find a unique file channel name in '" + Arrays.toString(this.tempDirs) + "' for spilling large records during deserialization.");
    }

    private static String randomString(Random random) {
        byte[] bArr = new byte[20];
        random.nextBytes(bArr);
        return StringUtils.byteToHexString(bArr);
    }

    private int spill(NonSpanningWrapper nonSpanningWrapper) throws IOException {
        ByteBuffer wrapIntoByteBuffer = nonSpanningWrapper.wrapIntoByteBuffer();
        int remaining = wrapIntoByteBuffer.remaining();
        FileUtils.writeCompletely(this.spillingChannel, wrapIntoByteBuffer);
        return remaining;
    }

    private boolean isReadingLength() {
        return this.lengthBuffer.position() > 0;
    }

    private static CloseableIterator<Buffer> toSingleBufferIterator(MemorySegment memorySegment) {
        return CloseableIterator.ofElement(new NetworkBuffer(memorySegment, FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, memorySegment.size()), (v0) -> {
            v0.recycleBuffer();
        });
    }
}
