package org.apache.flink.table.runtime.operators.window.groupwindow.internal;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
import org.apache.flink.table.runtime.operators.window.Window;
import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction;
import org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowSet;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiConsumerWithException;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/window/groupwindow/internal/MergingWindowProcessFunction.class */
public class MergingWindowProcessFunction<K, W extends Window> extends InternalWindowProcessFunction<K, W> {
    private static final long serialVersionUID = -2866771637946397223L;
    private final MergingWindowAssigner<W> windowAssigner;
    private final TypeSerializer<W> windowSerializer;
    private transient MergingWindowSet<W> mergingWindows;
    private transient MergingWindowProcessFunction<K, W>.MergingFunctionImpl mergingFunction;
    private List<W> reuseActualWindows;

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/window/groupwindow/internal/MergingWindowProcessFunction$DefaultAccMergingConsumer.class */
    public static class DefaultAccMergingConsumer<W extends Window> implements BiConsumerWithException<W, Collection<W>, Throwable> {
        private final InternalWindowProcessFunction.Context<?, W> ctx;
        private final NamespaceAggsHandleFunctionBase<W> windowAggregator;

        public DefaultAccMergingConsumer(InternalWindowProcessFunction.Context<?, W> context, NamespaceAggsHandleFunctionBase<W> namespaceAggsHandleFunctionBase) {
            this.ctx = context;
            this.windowAggregator = namespaceAggsHandleFunctionBase;
        }

        public void accept(W w, Collection<W> collection) throws Throwable {
            RowData windowAccumulators = this.ctx.getWindowAccumulators(w);
            if (windowAccumulators == null) {
                windowAccumulators = this.windowAggregator.createAccumulators();
            }
            this.windowAggregator.setAccumulators(w, windowAccumulators);
            for (W w2 : collection) {
                RowData windowAccumulators2 = this.ctx.getWindowAccumulators(w2);
                if (windowAccumulators2 != null) {
                    this.windowAggregator.merge(w2, windowAccumulators2);
                }
                this.ctx.clearWindowState(w2);
                this.ctx.clearPreviousState(w2);
            }
            this.ctx.setWindowAccumulators(w, this.windowAggregator.getAccumulators());
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/window/groupwindow/internal/MergingWindowProcessFunction$MergingContext.class */
    public interface MergingContext<K, W extends Window> extends InternalWindowProcessFunction.Context<K, W> {
        BiConsumerWithException<W, Collection<W>, Throwable> getWindowStateMergingConsumer();
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/window/groupwindow/internal/MergingWindowProcessFunction$MergingFunctionImpl.class */
    private class MergingFunctionImpl implements MergingWindowSet.MergeFunction<W> {
        private final BiConsumerWithException<W, Collection<W>, Throwable> accMergingConsumer;

        public MergingFunctionImpl(BiConsumerWithException<W, Collection<W>, Throwable> biConsumerWithException) {
            this.accMergingConsumer = biConsumerWithException;
        }

        public void merge(W w, Collection<W> collection, W w2, Collection<W> collection2) throws Exception {
            long epochMillsForTimer = TimeWindowUtil.toEpochMillsForTimer(w.maxTimestamp(), MergingWindowProcessFunction.this.ctx.getShiftTimeZone());
            if (MergingWindowProcessFunction.this.windowAssigner.isEventTime() && epochMillsForTimer + MergingWindowProcessFunction.this.allowedLateness <= MergingWindowProcessFunction.this.ctx.currentWatermark()) {
                UnsupportedOperationException unsupportedOperationException = new UnsupportedOperationException("The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: " + MergingWindowProcessFunction.this.ctx.currentWatermark() + " window: " + unsupportedOperationException);
                throw unsupportedOperationException;
            }
            if (!MergingWindowProcessFunction.this.windowAssigner.isEventTime() && epochMillsForTimer <= MergingWindowProcessFunction.this.ctx.currentProcessingTime()) {
                UnsupportedOperationException unsupportedOperationException2 = new UnsupportedOperationException("The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: " + MergingWindowProcessFunction.this.ctx.currentProcessingTime() + " window: " + unsupportedOperationException2);
                throw unsupportedOperationException2;
            }
            MergingWindowProcessFunction.this.ctx.onMerge(w, collection2);
            for (W w3 : collection) {
                MergingWindowProcessFunction.this.ctx.clearTrigger(w3);
                MergingWindowProcessFunction.this.ctx.deleteCleanupTimer(w3);
            }
            if (collection2.isEmpty()) {
                return;
            }
            try {
                this.accMergingConsumer.accept(w2, collection2);
            } catch (Throwable th) {
                throw new RuntimeException("Should not happen", th);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowSet.MergeFunction
        public /* bridge */ /* synthetic */ void merge(Object obj, Collection collection, Object obj2, Collection collection2) throws Exception {
            merge((Collection) obj, (Collection<Collection>) collection, (Collection) obj2, (Collection<Collection>) collection2);
        }
    }

    public MergingWindowProcessFunction(MergingWindowAssigner<W> mergingWindowAssigner, NamespaceAggsHandleFunctionBase<W> namespaceAggsHandleFunctionBase, TypeSerializer<W> typeSerializer, long j) {
        super(mergingWindowAssigner, namespaceAggsHandleFunctionBase, j);
        this.windowAssigner = mergingWindowAssigner;
        this.windowSerializer = typeSerializer;
    }

    @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction
    public void open(InternalWindowProcessFunction.Context<K, W> context) throws Exception {
        Preconditions.checkArgument(context instanceof MergingContext);
        super.open(context);
        this.mergingWindows = new MergingWindowSet<>(this.windowAssigner, context.getPartitionedState(new MapStateDescriptor("session-window-mapping", this.windowSerializer, this.windowSerializer)));
        this.mergingFunction = new MergingFunctionImpl(((MergingContext) context).getWindowStateMergingConsumer());
    }

    @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction
    public Collection<W> assignStateNamespace(RowData rowData, long j) throws Exception {
        Collection<W> assignWindows = this.windowAssigner.assignWindows(rowData, j);
        this.mergingWindows.initializeCache(this.ctx.currentKey());
        this.reuseActualWindows = new ArrayList(1);
        Iterator<W> it = assignWindows.iterator();
        while (it.hasNext()) {
            W addWindow = this.mergingWindows.addWindow(it.next(), this.mergingFunction);
            if (isWindowLate(addWindow)) {
                this.mergingWindows.retireWindow(addWindow);
            } else {
                this.reuseActualWindows.add(addWindow);
            }
        }
        ArrayList arrayList = new ArrayList(this.reuseActualWindows.size());
        Iterator<W> it2 = this.reuseActualWindows.iterator();
        while (it2.hasNext()) {
            arrayList.add(this.mergingWindows.getStateWindow(it2.next()));
        }
        return arrayList;
    }

    @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction
    public Collection<W> assignActualWindows(RowData rowData, long j) throws Exception {
        return this.reuseActualWindows;
    }

    @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction
    public void prepareAggregateAccumulatorForEmit(W w) throws Exception {
        W stateWindow = this.mergingWindows.getStateWindow(w);
        RowData windowAccumulators = this.ctx.getWindowAccumulators(stateWindow);
        if (windowAccumulators == null) {
            windowAccumulators = this.windowAggregator.createAccumulators();
        }
        this.windowAggregator.setAccumulators(stateWindow, windowAccumulators);
    }

    @Override // org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction
    public void cleanWindowIfNeeded(W w, long j) throws Exception {
        if (isCleanupTime(w, j)) {
            this.ctx.clearTrigger(w);
            this.ctx.clearWindowState(this.mergingWindows.getStateWindow(w));
            this.mergingWindows.initializeCache(this.ctx.currentKey());
            this.mergingWindows.retireWindow(w);
        }
    }

    public W getStateWindow(W w) throws Exception {
        return this.mergingWindows.getStateWindow(w);
    }
}
