package org.apache.flink.table.runtime.operators.aggregate.window.processors;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
import org.apache.flink.table.runtime.operators.aggregate.window.combines.WindowCombineFunction;
import org.apache.flink.table.runtime.operators.window.slicing.ClockService;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor;
import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.class */
public abstract class AbstractWindowAggProcessor implements SlicingWindowProcessor<Long> {
    private static final long serialVersionUID = 1;
    protected final GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler;
    protected final WindowBuffer.Factory windowBufferFactory;
    protected final WindowCombineFunction.Factory combineFactory;
    protected final SliceAssigner sliceAssigner;
    protected final LogicalType[] accumulatorTypes;
    protected final boolean isEventTime;
    protected transient long currentProgress;
    protected transient SlicingWindowProcessor.Context<Long> ctx;
    protected transient ClockService clockService;
    protected transient InternalTimerService<Long> timerService;
    protected transient NamespaceAggsHandleFunction<Long> aggregator;
    protected transient WindowBuffer windowBuffer;
    protected transient WindowValueState<Long> windowState;
    protected transient JoinedRowData reuseOutput;

    public AbstractWindowAggProcessor(GeneratedNamespaceAggsHandleFunction<Long> generatedNamespaceAggsHandleFunction, WindowBuffer.Factory factory, WindowCombineFunction.Factory factory2, SliceAssigner sliceAssigner, LogicalType[] logicalTypeArr) {
        this.genAggsHandler = generatedNamespaceAggsHandleFunction;
        this.windowBufferFactory = factory;
        this.combineFactory = factory2;
        this.sliceAssigner = sliceAssigner;
        this.accumulatorTypes = logicalTypeArr;
        this.isEventTime = sliceAssigner.isEventTime();
    }

    @Override // org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor
    public void open(SlicingWindowProcessor.Context<Long> context) throws Exception {
        this.ctx = context;
        LongSerializer longSerializer = LongSerializer.INSTANCE;
        this.windowState = new WindowValueState<>((ValueState) this.ctx.getKeyedStateBackend().getOrCreateKeyedState(longSerializer, new ValueStateDescriptor("window-aggs", new RowDataSerializer(this.accumulatorTypes))));
        this.clockService = ClockService.of(this.ctx.getTimerService());
        this.timerService = this.ctx.getTimerService();
        this.aggregator = (NamespaceAggsHandleFunction) this.genAggsHandler.newInstance(this.ctx.getRuntimeContext().getUserCodeClassLoader());
        this.aggregator.open(new PerWindowStateDataViewStore(this.ctx.getKeyedStateBackend(), longSerializer, this.ctx.getRuntimeContext()));
        this.windowBuffer = this.windowBufferFactory.create(this.ctx.getOperatorOwner(), this.ctx.getMemoryManager(), this.ctx.getMemorySize(), this.combineFactory.create(this.ctx.getRuntimeContext(), this.ctx.getTimerService(), this.ctx.getKeyedStateBackend(), this.windowState, this.isEventTime));
        this.reuseOutput = new JoinedRowData();
        this.currentProgress = Long.MIN_VALUE;
    }

    @Override // org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor
    public boolean processElement(BinaryRowData binaryRowData, RowData rowData) throws Exception {
        long assignSliceEnd = this.sliceAssigner.assignSliceEnd(rowData, this.clockService);
        if (!this.isEventTime) {
            this.timerService.registerProcessingTimeTimer(Long.valueOf(assignSliceEnd), assignSliceEnd - serialVersionUID);
        }
        if (this.isEventTime && assignSliceEnd - serialVersionUID <= this.currentProgress) {
            return true;
        }
        this.windowBuffer.addElement(binaryRowData, assignSliceEnd, rowData);
        return false;
    }

    @Override // org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor
    public void advanceProgress(long j) throws Exception {
        if (j > this.currentProgress) {
            this.currentProgress = j;
            this.windowBuffer.advanceProgress(this.currentProgress);
        }
    }

    @Override // org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor
    public void prepareCheckpoint() throws Exception {
        this.windowBuffer.flush();
    }

    @Override // org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor
    public void clearWindow(Long l) throws Exception {
        for (Long l2 : this.sliceAssigner.expiredSlices(l.longValue())) {
            this.windowState.clear(l2);
            this.aggregator.cleanup(l2);
        }
    }

    @Override // org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor
    public void close() throws Exception {
        if (this.aggregator != null) {
            this.aggregator.close();
        }
        if (this.windowBuffer != null) {
            this.windowBuffer.close();
        }
    }

    @Override // org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor
    public TypeSerializer<Long> createWindowSerializer() {
        return LongSerializer.INSTANCE;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void collect(RowData rowData) {
        this.reuseOutput.replace((RowData) this.ctx.getKeyedStateBackend().getCurrentKey(), rowData);
        this.ctx.output(this.reuseOutput);
    }
}
