package org.apache.flink.table.runtime.operators.aggregate.window.buffers;

import java.io.EOFException;
import java.util.Iterator;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
import org.apache.flink.table.runtime.operators.aggregate.window.combines.WindowCombineFunction;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.typeutils.WindowKeySerializer;
import org.apache.flink.table.runtime.util.KeyValueIterator;
import org.apache.flink.table.runtime.util.WindowKey;
import org.apache.flink.table.runtime.util.collections.binary.WindowBytesMultiMap;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.class */
public final class RecordsWindowBuffer implements WindowBuffer {
    private final WindowCombineFunction combineFunction;
    private final WindowBytesMultiMap recordsBuffer;
    private final WindowKey reuseWindowKey;
    private final RowDataSerializer recordSerializer;
    private long minTriggerTime = Long.MAX_VALUE;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer$Factory.class */
    public static final class Factory implements WindowBuffer.Factory {
        private static final long serialVersionUID = 1;
        private final LogicalType[] keyTypes;
        private final RowType inputType;

        public Factory(LogicalType[] logicalTypeArr, RowType rowType) {
            this.keyTypes = logicalTypeArr;
            this.inputType = rowType;
        }

        @Override // org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer.Factory
        public WindowBuffer create(Object obj, MemoryManager memoryManager, long j, WindowCombineFunction windowCombineFunction) {
            return new RecordsWindowBuffer(obj, memoryManager, j, windowCombineFunction, this.keyTypes, this.inputType);
        }
    }

    public RecordsWindowBuffer(Object obj, MemoryManager memoryManager, long j, WindowCombineFunction windowCombineFunction, LogicalType[] logicalTypeArr, RowType rowType) {
        this.combineFunction = windowCombineFunction;
        LogicalType[] logicalTypeArr2 = (LogicalType[]) rowType.getFields().stream().map((v0) -> {
            return v0.getType();
        }).toArray(i -> {
            return new LogicalType[i];
        });
        this.recordsBuffer = new WindowBytesMultiMap(obj, memoryManager, j, logicalTypeArr, logicalTypeArr2);
        this.recordSerializer = new RowDataSerializer(logicalTypeArr2);
        this.reuseWindowKey = new WindowKeySerializer(logicalTypeArr.length).m169createInstance();
    }

    @Override // org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer
    public void addElement(BinaryRowData binaryRowData, long j, RowData rowData) throws Exception {
        this.minTriggerTime = Math.min(j - 1, this.minTriggerTime);
        this.reuseWindowKey.replace(j, binaryRowData);
        try {
            this.recordsBuffer.append(this.recordsBuffer.lookup(this.reuseWindowKey), this.recordSerializer.toBinaryRow(rowData));
        } catch (EOFException e) {
            flush();
            addElement(binaryRowData, j, rowData);
        }
    }

    @Override // org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer
    public void advanceProgress(long j) throws Exception {
        if (j >= this.minTriggerTime) {
            flush();
        }
    }

    @Override // org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer
    public void flush() throws Exception {
        if (this.recordsBuffer.getNumKeys() > 0) {
            KeyValueIterator<WindowKey, Iterator<RowData>> entryIterator = this.recordsBuffer.getEntryIterator();
            while (entryIterator.advanceNext()) {
                this.combineFunction.combine(entryIterator.getKey(), entryIterator.getValue());
            }
            this.recordsBuffer.reset();
            this.minTriggerTime = Long.MAX_VALUE;
        }
    }

    @Override // org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer
    public void close() throws Exception {
        this.recordsBuffer.free();
    }
}
