package org.apache.flink.table.runtime.operators.aggregate.window;

import java.time.ZoneId;
import java.util.TimeZone;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
import org.apache.flink.table.runtime.operators.over.utils.LateDataWrapStartEnd;
import org.apache.flink.table.runtime.operators.window.latedata.LateDataCollectorProvider;
import org.apache.flink.table.runtime.operators.window.tvf.common.ClockService;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.class */
public class LocalSlicingWindowAggOperator extends AbstractStreamOperator<RowData> implements OneInputStreamOperator<RowData, RowData> {
    private static final long serialVersionUID = 1;
    private static final ClockService CLOCK_SERVICE = ClockService.ofSystem();
    private final RowDataKeySelector keySelector;
    private final SliceAssigner sliceAssigner;
    private final long windowInterval;
    private final WindowBuffer.LocalFactory windowBufferFactory;
    private final LateDataCollectorProvider lateDataCollectorProvider;
    private final LateDataWrapStartEnd lateDataWrapper;
    private final ZoneId shiftTimezone;
    private final boolean useDayLightSaving;
    protected transient TimestampedCollector<RowData> collector;
    protected transient Collector<StreamRecord<RowData>> lateDataCollector;
    private transient long currentWatermark;
    private transient long nextTriggerWatermark;
    private transient WindowBuffer windowBuffer;

    public LocalSlicingWindowAggOperator(RowDataKeySelector rowDataKeySelector, SliceAssigner sliceAssigner, WindowBuffer.LocalFactory localFactory, LateDataCollectorProvider lateDataCollectorProvider, ZoneId zoneId, LateDataWrapStartEnd lateDataWrapStartEnd) {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.keySelector = rowDataKeySelector;
        this.sliceAssigner = sliceAssigner;
        this.windowInterval = sliceAssigner.getSliceEndInterval();
        this.windowBufferFactory = localFactory;
        this.lateDataCollectorProvider = lateDataCollectorProvider;
        this.shiftTimezone = zoneId;
        this.useDayLightSaving = TimeZone.getTimeZone(zoneId).useDaylightTime();
        this.lateDataWrapper = lateDataWrapStartEnd;
    }

    public void open() throws Exception {
        super.open();
        this.collector = new TimestampedCollector<>(this.output);
        this.collector.eraseTimestamp();
        this.lateDataCollector = this.lateDataCollectorProvider.wrap(this.collector);
        this.windowBuffer = this.windowBufferFactory.create(getContainingTask(), getContainingTask().getEnvironment().getMemoryManager(), computeMemorySize(), getRuntimeContext(), this.collector, this.shiftTimezone);
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        RowData rowData = (RowData) streamRecord.getValue();
        RowData rowData2 = (RowData) this.keySelector.getKey(rowData);
        long assignSliceEnd = this.sliceAssigner.assignSliceEnd(rowData, CLOCK_SERVICE);
        if (TimeWindowUtil.isWindowFired(assignSliceEnd, this.currentWatermark, this.shiftTimezone) && TimeWindowUtil.isWindowFired(this.sliceAssigner.getLastWindowEnd(assignSliceEnd), this.currentWatermark, this.shiftTimezone)) {
            long j = this.nextTriggerWatermark + serialVersionUID;
            RowData apply = this.lateDataWrapper.apply(rowData, j - this.windowInterval, j);
            if (apply == rowData) {
                this.lateDataCollector.collect(streamRecord);
            } else {
                this.lateDataCollector.collect(new StreamRecord(apply));
            }
        }
        this.windowBuffer.addElement(rowData2, assignSliceEnd, rowData);
    }

    public void processWatermark(Watermark watermark) throws Exception {
        if (watermark.getTimestamp() > this.currentWatermark) {
            this.currentWatermark = watermark.getTimestamp();
            if (this.currentWatermark >= this.nextTriggerWatermark) {
                this.windowBuffer.advanceProgress(this.currentWatermark);
                this.nextTriggerWatermark = TimeWindowUtil.getNextTriggerWatermark(this.currentWatermark, this.windowInterval, this.shiftTimezone, this.useDayLightSaving);
            }
        }
        super.processWatermark(watermark);
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        this.windowBuffer.flush();
    }

    public void persistStateForMigration() throws Exception {
        this.windowBuffer.flush();
    }

    public void close() throws Exception {
        super.close();
        this.collector = null;
        if (this.windowBuffer != null) {
            this.windowBuffer.close();
        }
    }

    private long computeMemorySize() {
        Environment environment = getContainingTask().getEnvironment();
        return environment.getMemoryManager().computeMemorySize(getOperatorConfig().getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.OPERATOR, environment.getJobConfiguration(), environment.getTaskManagerInfo().getConfiguration(), environment.getUserCodeClassLoader().asClassLoader()));
    }
}
