package org.apache.flink.runtime.operators;

import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
import org.apache.flink.runtime.operators.sort.InMemorySorter;
import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/GroupReduceCombineDriver.class */
public class GroupReduceCombineDriver<IN, OUT> implements Driver<GroupCombineFunction<IN, OUT>, OUT> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) GroupReduceCombineDriver.class);
    private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
    private TaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext;
    private InMemorySorter<IN> sorter;
    private GroupCombineFunction<IN, OUT> combiner;
    private TypeSerializer<IN> serializer;
    private TypeComparator<IN> groupingComparator;
    private Collector<OUT> output;
    private List<MemorySegment> memory;
    private long oversizedRecordCount;
    private QuickSort sortAlgo = new QuickSort();
    private volatile boolean running = true;
    private boolean objectReuseEnabled = false;

    @Override // org.apache.flink.runtime.operators.Driver
    public void setup(TaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext) {
        this.taskContext = taskContext;
        this.running = true;
    }

    @Override // org.apache.flink.runtime.operators.Driver
    public int getNumberOfInputs() {
        return 1;
    }

    @Override // org.apache.flink.runtime.operators.Driver
    public Class<GroupCombineFunction<IN, OUT>> getStubType() {
        return GroupCombineFunction.class;
    }

    @Override // org.apache.flink.runtime.operators.Driver
    public int getNumberOfDriverComparators() {
        return 2;
    }

    @Override // org.apache.flink.runtime.operators.Driver
    public void prepare() throws Exception {
        DriverStrategy driverStrategy = this.taskContext.getTaskConfig().getDriverStrategy();
        if (driverStrategy != DriverStrategy.SORTED_GROUP_COMBINE) {
            throw new Exception("Invalid strategy " + driverStrategy + " for group reduce combiner.");
        }
        this.serializer = this.taskContext.getInputSerializer(0).getSerializer();
        TypeComparator<X> driverComparator = this.taskContext.getDriverComparator(0);
        this.groupingComparator = (TypeComparator<IN>) this.taskContext.getDriverComparator(1);
        this.combiner = this.taskContext.getStub();
        this.output = this.taskContext.getOutputCollector();
        MemoryManager memoryManager = this.taskContext.getMemoryManager();
        this.memory = memoryManager.allocatePages(this.taskContext.getContainingTask(), memoryManager.computeNumberOfPages(this.taskContext.getTaskConfig().getRelativeMemoryDriver()));
        if (!driverComparator.supportsSerializationWithKeyNormalization() || this.serializer.getLength() <= 0 || this.serializer.getLength() > 32) {
            this.sorter = new NormalizedKeySorter(this.serializer, driverComparator.duplicate2(), this.memory);
        } else {
            this.sorter = new FixedLengthRecordSorter(this.serializer, driverComparator.duplicate2(), this.memory);
        }
        this.objectReuseEnabled = this.taskContext.getExecutionConfig().isObjectReuseEnabled();
        if (LOG.isDebugEnabled()) {
            LOG.debug("GroupReduceCombineDriver object reuse: {}.", this.objectReuseEnabled ? "ENABLED" : "DISABLED");
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.operators.Driver
    public void run() throws Exception {
        Object next;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Combiner starting.");
        }
        MutableObjectIterator<X> input = this.taskContext.getInput(0);
        TypeSerializer<IN> typeSerializer = this.serializer;
        if (this.objectReuseEnabled) {
            IN mo6078createInstance = typeSerializer.mo6078createInstance();
            while (this.running) {
                Object next2 = input.next(mo6078createInstance);
                mo6078createInstance = next2;
                if (next2 == 0) {
                    break;
                } else if (!this.sorter.write(mo6078createInstance)) {
                    sortAndCombineAndRetryWrite(mo6078createInstance);
                }
            }
        } else {
            while (this.running && (next = input.next()) != null) {
                if (!this.sorter.write(next)) {
                    sortAndCombineAndRetryWrite(next);
                }
            }
        }
        if (this.running) {
            sortAndCombine();
        }
    }

    private void sortAndCombine() throws Exception {
        if (this.sorter.isEmpty()) {
            return;
        }
        InMemorySorter<IN> inMemorySorter = this.sorter;
        this.sortAlgo.sort(inMemorySorter);
        GroupCombineFunction<IN, OUT> groupCombineFunction = this.combiner;
        Collector<OUT> collector = this.output;
        if (this.objectReuseEnabled) {
            ReusingKeyGroupedIterator reusingKeyGroupedIterator = new ReusingKeyGroupedIterator(inMemorySorter.getIterator(), this.serializer, this.groupingComparator);
            while (this.running && reusingKeyGroupedIterator.nextKey()) {
                groupCombineFunction.combine(reusingKeyGroupedIterator.getValues(), collector);
            }
            return;
        }
        NonReusingKeyGroupedIterator nonReusingKeyGroupedIterator = new NonReusingKeyGroupedIterator(inMemorySorter.getIterator(), this.groupingComparator);
        while (this.running && nonReusingKeyGroupedIterator.nextKey()) {
            groupCombineFunction.combine(nonReusingKeyGroupedIterator.getValues(), collector);
        }
    }

    private void sortAndCombineAndRetryWrite(IN in) throws Exception {
        sortAndCombine();
        this.sorter.reset();
        if (this.sorter.write(in)) {
            return;
        }
        this.oversizedRecordCount++;
        LOG.debug("Cannot write record to fresh sort buffer, record is too large. Oversized record count: {}", Long.valueOf(this.oversizedRecordCount));
        this.combiner.combine(Collections.singleton(in), this.output);
        this.sorter.reset();
    }

    @Override // org.apache.flink.runtime.operators.Driver
    public void cleanup() throws Exception {
        if (this.sorter != null) {
            this.sorter.dispose();
        }
        this.taskContext.getMemoryManager().release(this.memory);
    }

    @Override // org.apache.flink.runtime.operators.Driver
    public void cancel() {
        this.running = false;
        if (this.sorter != null) {
            try {
                this.sorter.dispose();
            } catch (Exception e) {
            }
        }
        this.taskContext.getMemoryManager().release(this.memory);
    }

    public long getOversizedRecordCount() {
        return this.oversizedRecordCount;
    }
}
