package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.flink.runtime.operators.sort.StageRunner;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/operators/sort/SorterInputGateway.class */
public final class SorterInputGateway<E> {
    private static final Logger LOG = LoggerFactory.getLogger(SorterInputGateway.class);
    private final LargeRecordHandler<E> largeRecords;
    private final StageRunner.StageMessageDispatcher<E> dispatcher;
    private long bytesUntilSpilling;
    private CircularElement<E> currentBuffer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SorterInputGateway(StageRunner.StageMessageDispatcher<E> stageMessageDispatcher, @Nullable LargeRecordHandler<E> largeRecordHandler, long j) {
        this.bytesUntilSpilling = j;
        this.largeRecords = largeRecordHandler;
        this.dispatcher = (StageRunner.StageMessageDispatcher) Preconditions.checkNotNull(stageMessageDispatcher);
        if (this.bytesUntilSpilling < 1) {
            this.dispatcher.send(StageRunner.SortStage.SORT, CircularElement.spillingMarker());
        }
    }

    public void writeRecord(E e) throws IOException, InterruptedException {
        if (this.currentBuffer == null) {
            this.currentBuffer = this.dispatcher.take(StageRunner.SortStage.READ);
            if (!this.currentBuffer.getBuffer().isEmpty()) {
                throw new IOException("New buffer is not empty.");
            }
        }
        InMemorySorter<E> buffer = this.currentBuffer.getBuffer();
        long occupancy = buffer.getOccupancy();
        if (buffer.write(e)) {
            signalSpillingIfNecessary(buffer.getOccupancy() - occupancy);
            return;
        }
        signalSpillingIfNecessary(buffer.getCapacity() - occupancy);
        if (occupancy == 0) {
            writeLarge(e, buffer);
            this.currentBuffer.getBuffer().reset();
        } else {
            this.dispatcher.send(StageRunner.SortStage.SORT, this.currentBuffer);
            this.currentBuffer = null;
            writeRecord(e);
        }
    }

    public void finishReading() {
        if (this.currentBuffer != null && !this.currentBuffer.getBuffer().isEmpty()) {
            this.dispatcher.send(StageRunner.SortStage.SORT, this.currentBuffer);
        }
        this.dispatcher.send(StageRunner.SortStage.SORT, CircularElement.endMarker());
        LOG.debug("Reading thread done.");
    }

    private void writeLarge(E e, InMemorySorter<E> inMemorySorter) throws IOException {
        if (this.largeRecords == null) {
            throw new IOException("The record exceeds the maximum size of a sort buffer (current maximum: " + inMemorySorter.getCapacity() + " bytes).");
        }
        LOG.debug("Large record did not fit into a fresh sort buffer. Putting into large record store.");
        this.largeRecords.addRecord(e);
    }

    private void signalSpillingIfNecessary(long j) {
        if (this.bytesUntilSpilling <= 0) {
            return;
        }
        this.bytesUntilSpilling -= j;
        if (this.bytesUntilSpilling < 1) {
            this.dispatcher.send(StageRunner.SortStage.SORT, CircularElement.spillingMarker());
            this.bytesUntilSpilling = 0L;
        }
    }
}
