package org.apache.flink.streaming.runtime.operators.windowing;

import java.io.Serializable;
import java.util.Collection;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalAppendingState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMergingState;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.windowing.assigners.BaseAlignedWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindowPaneAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.values.MergeListStateValues;
import org.apache.flink.streaming.runtime.operators.windowing.values.MergeStateValues;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.class */
public class WindowOperator<K, IN, ACC, OUT, W extends Window> extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>> implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {
    private static final long serialVersionUID = 1;
    protected final WindowAssigner<? super IN, W> windowAssigner;
    private final KeySelector<IN, K> keySelector;
    private final Trigger<? super IN, ? super W> trigger;
    private final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
    protected final TypeSerializer<K> keySerializer;
    protected final TypeSerializer<W> windowSerializer;
    protected final long allowedLateness;
    protected final OutputTag<IN> lateDataOutputTag;
    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
    protected transient Counter numLateRecordsDropped;
    private transient InternalAppendingState<K, W, IN, ACC, ACC> windowState;
    private transient InternalMergingState<K, W, IN, ACC, ACC> windowMergingState;
    private transient InternalListState<K, VoidNamespace, Tuple2<W, W>> mergingSetsState;
    protected transient TimestampedCollector<OUT> timestampedCollector;
    protected transient WindowOperator<K, IN, ACC, OUT, W>.Context triggerContext;
    protected transient WindowOperator<K, IN, ACC, OUT, W>.WindowContext processContext;
    protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
    protected transient InternalTimerService<W> internalTimerService;
    private SlidingTimeWindowPaneAssigner<W> slidingTimeWindowPaneAssigner;
    private MergeStateValues<Collection<IN>, AppendingState<IN, ACC>, Collection<IN>> mergeListStateValues;
    private long paneSize;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperator$AbstractPerWindowStateStore.class */
    public abstract class AbstractPerWindowStateStore extends DefaultKeyedStateStore {
        protected W window;

        public AbstractPerWindowStateStore(KeyedStateBackend<?> keyedStateBackend, ExecutionConfig executionConfig) {
            super(keyedStateBackend, executionConfig);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperator$Context.class */
    public class Context implements Trigger.OnMergeContext {
        protected K key;
        protected W window;
        protected Collection<W> mergedWindows;

        public Context(K k, W w) {
            this.key = k;
            this.window = w;
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public MetricGroup getMetricGroup() {
            return WindowOperator.this.getMetricGroup();
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public long getCurrentWatermark() {
            return WindowOperator.this.internalTimerService.currentWatermark();
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public <S extends Serializable> ValueState<S> getKeyValueState(String str, Class<S> cls, S s) {
            Preconditions.checkNotNull(cls, "The state type class must not be null");
            try {
                return getKeyValueState(str, (TypeInformation<TypeInformation<S>>) TypeExtractor.getForClass(cls), (TypeInformation<S>) s);
            } catch (Exception e) {
                throw new RuntimeException("Cannot analyze type '" + cls.getName() + "' from the class alone, due to generic type parameters. Please specify the TypeInformation directly.", e);
            }
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public <S extends Serializable> ValueState<S> getKeyValueState(String str, TypeInformation<S> typeInformation, S s) {
            Preconditions.checkNotNull(str, "The name of the state must not be null");
            Preconditions.checkNotNull(typeInformation, "The state type information must not be null");
            return getPartitionedState(new ValueStateDescriptor(str, typeInformation.createSerializer(WindowOperator.this.getExecutionConfig()), s));
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            try {
                return (S) WindowOperator.this.getPartitionedState(this.window, WindowOperator.this.windowSerializer, stateDescriptor);
            } catch (Exception e) {
                throw new RuntimeException("Could not retrieve state", e);
            }
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.OnMergeContext
        public <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            if (this.mergedWindows == null || this.mergedWindows.size() <= 0) {
                return;
            }
            try {
                InternalMergingState internalMergingState = (MergingState) WindowOperator.this.getKeyedStateBackend().getOrCreateKeyedState(WindowOperator.this.windowSerializer, stateDescriptor);
                if (!(internalMergingState instanceof InternalMergingState)) {
                    throw new IllegalArgumentException("The given state descriptor does not refer to a mergeable state (MergingState)");
                }
                internalMergingState.mergeNamespaces(this.window, this.mergedWindows);
            } catch (Exception e) {
                throw new RuntimeException("Error while merging state.", e);
            }
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public long getCurrentProcessingTime() {
            return WindowOperator.this.internalTimerService.currentProcessingTime();
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void registerProcessingTimeTimer(long j) {
            WindowOperator.this.internalTimerService.registerProcessingTimeTimer(this.window, j);
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void registerEventTimeTimer(long j) {
            WindowOperator.this.internalTimerService.registerEventTimeTimer(this.window, j);
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void deleteProcessingTimeTimer(long j) {
            WindowOperator.this.internalTimerService.deleteProcessingTimeTimer(this.window, j);
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void deleteEventTimeTimer(long j) {
            WindowOperator.this.internalTimerService.deleteEventTimeTimer(this.window, j);
        }

        public TriggerResult onElement(StreamRecord<IN> streamRecord) throws Exception {
            return WindowOperator.this.trigger.onElement(streamRecord.getValue(), streamRecord.getTimestamp(), this.window, this);
        }

        public TriggerResult onProcessingTime(long j) throws Exception {
            return WindowOperator.this.trigger.onProcessingTime(j, this.window, this);
        }

        public TriggerResult onEventTime(long j) throws Exception {
            return WindowOperator.this.trigger.onEventTime(j, this.window, this);
        }

        public void onMerge(Collection<W> collection) throws Exception {
            this.mergedWindows = collection;
            WindowOperator.this.trigger.onMerge(this.window, this);
        }

        public void clear() throws Exception {
            WindowOperator.this.trigger.clear(this.window, this);
        }

        public String toString() {
            return "Context{key=" + this.key + ", window=" + this.window + '}';
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperator$MergingWindowStateStore.class */
    public class MergingWindowStateStore extends WindowOperator<K, IN, ACC, OUT, W>.AbstractPerWindowStateStore {
        public MergingWindowStateStore(KeyedStateBackend<?> keyedStateBackend, ExecutionConfig executionConfig) {
            super(keyedStateBackend, executionConfig);
        }

        public <T> ValueState<T> getState(ValueStateDescriptor<T> valueStateDescriptor) {
            throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
        }

        public <T> ListState<T> getListState(ListStateDescriptor<T> listStateDescriptor) {
            throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
        }

        public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> reducingStateDescriptor) {
            throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
        }

        public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> aggregatingStateDescriptor) {
            throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
        }

        public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> foldingStateDescriptor) {
            throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
        }

        public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> mapStateDescriptor) {
            throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperator$PerWindowStateStore.class */
    public class PerWindowStateStore extends WindowOperator<K, IN, ACC, OUT, W>.AbstractPerWindowStateStore {
        public PerWindowStateStore(KeyedStateBackend<?> keyedStateBackend, ExecutionConfig executionConfig) {
            super(keyedStateBackend, executionConfig);
        }

        protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
            return (S) this.keyedStateBackend.getPartitionedState(this.window, WindowOperator.this.windowSerializer, stateDescriptor);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperator$Timer.class */
    protected static class Timer<K, W extends Window> implements Comparable<Timer<K, W>> {
        protected long timestamp;
        protected K key;
        protected W window;

        public Timer(long j, K k, W w) {
            this.timestamp = j;
            this.key = k;
            this.window = w;
        }

        @Override // java.lang.Comparable
        public int compareTo(Timer<K, W> timer) {
            return Long.compare(this.timestamp, timer.timestamp);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Timer timer = (Timer) obj;
            return this.timestamp == timer.timestamp && this.key.equals(timer.key) && this.window.equals(timer.window);
        }

        public int hashCode() {
            return (31 * ((31 * ((int) (this.timestamp ^ (this.timestamp >>> 32)))) + this.key.hashCode())) + this.window.hashCode();
        }

        public String toString() {
            return "Timer{timestamp=" + this.timestamp + ", key=" + this.key + ", window=" + this.window + '}';
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperator$WindowContext.class */
    public class WindowContext implements InternalWindowFunction.InternalWindowContext {
        protected W window;
        protected WindowOperator<K, IN, ACC, OUT, W>.AbstractPerWindowStateStore windowState;

        public WindowContext(W w) {
            this.window = w;
            this.windowState = WindowOperator.this.windowAssigner instanceof MergingWindowAssigner ? new MergingWindowStateStore(WindowOperator.this.getKeyedStateBackend(), WindowOperator.this.getExecutionConfig()) : new PerWindowStateStore(WindowOperator.this.getKeyedStateBackend(), WindowOperator.this.getExecutionConfig());
        }

        public String toString() {
            return "WindowContext{Window = " + this.window.toString() + "}";
        }

        public void clear() throws Exception {
            ((InternalWindowFunction) WindowOperator.this.userFunction).clear(this.window, this);
        }

        @Override // org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction.InternalWindowContext
        public long currentProcessingTime() {
            return WindowOperator.this.internalTimerService.currentProcessingTime();
        }

        @Override // org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction.InternalWindowContext
        public long currentWatermark() {
            return WindowOperator.this.internalTimerService.currentWatermark();
        }

        @Override // org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction.InternalWindowContext
        public KeyedStateStore windowState() {
            this.windowState.window = this.window;
            return this.windowState;
        }

        @Override // org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction.InternalWindowContext
        public KeyedStateStore globalState() {
            return WindowOperator.this.getKeyedStateStore();
        }

        @Override // org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction.InternalWindowContext
        public <X> void output(OutputTag<X> outputTag, X x) {
            if (outputTag == null) {
                throw new IllegalArgumentException("OutputTag must not be null.");
            }
            WindowOperator.this.output.collect(outputTag, new StreamRecord<>(x, this.window.maxTimestamp()));
        }
    }

    public WindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> typeSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> typeSerializer2, StateDescriptor<? extends AppendingState<IN, ACC>, ?> stateDescriptor, InternalWindowFunction<ACC, OUT, K, W> internalWindowFunction, Trigger<? super IN, ? super W> trigger, long j, OutputTag<IN> outputTag) {
        super(internalWindowFunction);
        this.triggerContext = new Context(null, null);
        this.paneSize = 1L;
        Preconditions.checkArgument(!(windowAssigner instanceof BaseAlignedWindowAssigner), "The " + windowAssigner.getClass().getSimpleName() + " cannot be used with a WindowOperator. This assigner is only used with the AccumulatingProcessingTimeWindowOperator and the AggregatingProcessingTimeWindowOperator");
        Preconditions.checkArgument(j >= 0);
        Preconditions.checkArgument(stateDescriptor == null || stateDescriptor.isSerializerInitialized(), "window state serializer is not properly initialized");
        this.windowAssigner = (WindowAssigner) Preconditions.checkNotNull(windowAssigner);
        this.windowSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        this.keySelector = (KeySelector) Preconditions.checkNotNull(keySelector);
        this.keySerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer2);
        this.windowStateDescriptor = stateDescriptor;
        this.trigger = (Trigger) Preconditions.checkNotNull(trigger);
        this.allowedLateness = j;
        this.lateDataOutputTag = outputTag;
        setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        this.numLateRecordsDropped = this.metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
        this.timestampedCollector = new TimestampedCollector<>(this.output);
        this.internalTimerService = (InternalTimerService<W>) getInternalTimerService("window-timers", this.windowSerializer, this);
        this.triggerContext = new Context(null, null);
        this.processContext = new WindowContext(null);
        this.windowAssignerContext = new WindowAssigner.WindowAssignerContext() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.1
            private long currentElementTimestamp;

            @Override // org.apache.flink.streaming.api.windowing.assigners.WindowAssigner.WindowAssignerContext
            public long getCurrentProcessingTime() {
                return WindowOperator.this.internalTimerService.currentProcessingTime();
            }

            @Override // org.apache.flink.streaming.api.windowing.assigners.WindowAssigner.WindowAssignerContext
            public void setCurrentElementTimestamp(long j) {
                this.currentElementTimestamp = j;
            }

            @Override // org.apache.flink.streaming.api.windowing.assigners.WindowAssigner.WindowAssignerContext
            public long getCurrentElementTimestamp() {
                return this.currentElementTimestamp;
            }
        };
        if (this.windowStateDescriptor != null) {
            this.windowState = getOrCreateKeyedState(this.windowSerializer, this.windowStateDescriptor);
        }
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            if (this.windowState instanceof InternalMergingState) {
                this.windowMergingState = this.windowState;
            }
            this.mergingSetsState = getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor("merging-window-set", new TupleSerializer(Tuple2.class, new TypeSerializer[]{this.windowSerializer, this.windowSerializer})));
            this.mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE);
        }
        if (!(this.windowAssigner instanceof SlidingTimeWindowPaneAssigner) || !(this.windowStateDescriptor instanceof ListStateDescriptor)) {
            this.mergeListStateValues = null;
            this.slidingTimeWindowPaneAssigner = null;
        } else {
            this.mergeListStateValues = new MergeListStateValues();
            this.slidingTimeWindowPaneAssigner = (SlidingTimeWindowPaneAssigner) this.windowAssigner;
            this.paneSize = this.slidingTimeWindowPaneAssigner.getPaneSize();
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        super.close();
        this.timestampedCollector = null;
        this.triggerContext = null;
        this.processContext = null;
        this.windowAssignerContext = null;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void dispose() throws Exception {
        super.dispose();
        this.timestampedCollector = null;
        this.triggerContext = null;
        this.processContext = null;
        this.windowAssignerContext = null;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        Collection<W> assignWindows = this.windowAssigner.assignWindows(streamRecord.getValue(), streamRecord.getTimestamp(), this.windowAssignerContext);
        boolean z = true;
        final K k = (K) getKeyedStateBackend().getCurrentKey();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            MergingWindowSet mergingWindowSet = getMergingWindowSet();
            for (W w : assignWindows) {
                W w2 = (W) mergingWindowSet.addWindow(w, new MergingWindowSet.MergeFunction<W>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.2
                    public void merge(W w3, Collection<W> collection, W w4, Collection<W> collection2) throws Exception {
                        if (WindowOperator.this.windowAssigner.isEventTime() && w3.maxTimestamp() + WindowOperator.this.allowedLateness <= WindowOperator.this.internalTimerService.currentWatermark()) {
                            throw new UnsupportedOperationException("The end timestamp of an event-time window cannot become earlier than the current watermark by merging. Current watermark: " + WindowOperator.this.internalTimerService.currentWatermark() + " window: " + w3);
                        }
                        if (!WindowOperator.this.windowAssigner.isEventTime()) {
                            long currentProcessingTime = WindowOperator.this.internalTimerService.currentProcessingTime();
                            if (w3.maxTimestamp() <= currentProcessingTime) {
                                throw new UnsupportedOperationException("The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: " + currentProcessingTime + " window: " + w3);
                            }
                        }
                        WindowOperator.this.triggerContext.key = (K) k;
                        WindowOperator.this.triggerContext.window = w3;
                        WindowOperator.this.triggerContext.onMerge(collection);
                        for (W w5 : collection) {
                            WindowOperator.this.triggerContext.window = w5;
                            WindowOperator.this.triggerContext.clear();
                            WindowOperator.this.deleteCleanupTimer(w5);
                        }
                        WindowOperator.this.windowMergingState.mergeNamespaces(w4, collection2);
                    }

                    /* JADX WARN: Multi-variable type inference failed */
                    @Override // org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.MergeFunction
                    public /* bridge */ /* synthetic */ void merge(Object obj, Collection collection, Object obj2, Collection collection2) throws Exception {
                        merge((Collection) obj, (Collection<Collection>) collection, (Collection) obj2, (Collection<Collection>) collection2);
                    }
                });
                if (isWindowLate(w2)) {
                    mergingWindowSet.retireWindow(w2);
                } else {
                    z = false;
                    Window stateWindow = mergingWindowSet.getStateWindow(w2);
                    if (stateWindow == null) {
                        throw new IllegalStateException("Window " + w + " is not in in-flight window set.");
                    }
                    this.windowState.setCurrentNamespace(stateWindow);
                    this.windowState.add(streamRecord.getValue());
                    this.triggerContext.key = k;
                    this.triggerContext.window = w2;
                    TriggerResult onElement = this.triggerContext.onElement(streamRecord);
                    if (onElement.isFire()) {
                        Object obj = this.windowState.get();
                        if (obj != null) {
                            emitWindowContents(w2, obj);
                        }
                    }
                    if (onElement.isPurge()) {
                        this.windowState.clear();
                    }
                    registerCleanupTimer(w2);
                }
            }
            mergingWindowSet.persist();
        } else if ((this.windowAssigner instanceof SlidingTimeWindowPaneAssigner) && (this.windowStateDescriptor instanceof ListStateDescriptor)) {
            long windowSize = this.slidingTimeWindowPaneAssigner.getWindowSize();
            long windowSlide = this.slidingTimeWindowPaneAssigner.getWindowSlide();
            long windowOffset = this.slidingTimeWindowPaneAssigner.getWindowOffset();
            long currentElementTimestamp = this.windowAssignerContext.getCurrentElementTimestamp();
            long j = currentElementTimestamp - (((currentElementTimestamp - windowOffset) + windowSlide) % this.paneSize);
            W assignPane = this.slidingTimeWindowPaneAssigner.assignPane(j, j + this.paneSize);
            boolean z2 = false;
            for (W w3 : assignWindows) {
                if (!isWindowLate(w3)) {
                    if (!z2) {
                        this.windowState.setCurrentNamespace(assignPane);
                        this.windowState.add(streamRecord.getValue());
                        z2 = true;
                    }
                    this.triggerContext.key = k;
                    this.triggerContext.window = w3;
                    TriggerResult onElement2 = this.triggerContext.onElement(streamRecord);
                    if (onElement2.isFire()) {
                        emitWindowListStateContents(w3, windowSize);
                    }
                    if (onElement2.isPurge()) {
                        purgeListStatePanes(w3, windowSize, windowSlide);
                    }
                    registerCleanupTimer(w3);
                }
            }
        } else {
            for (W w4 : assignWindows) {
                if (!isWindowLate(w4)) {
                    z = false;
                    this.windowState.setCurrentNamespace(w4);
                    this.windowState.add(streamRecord.getValue());
                    this.triggerContext.key = k;
                    this.triggerContext.window = w4;
                    TriggerResult onElement3 = this.triggerContext.onElement(streamRecord);
                    if (onElement3.isFire()) {
                        Object obj2 = this.windowState.get();
                        if (obj2 != null) {
                            emitWindowContents(w4, obj2);
                        }
                    }
                    if (onElement3.isPurge()) {
                        this.windowState.clear();
                    }
                    registerCleanupTimer(w4);
                }
            }
        }
        if (z && isElementLate(streamRecord)) {
            if (this.lateDataOutputTag != null) {
                sideOutput(streamRecord);
            } else {
                this.numLateRecordsDropped.inc();
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void onEventTime(InternalTimer<K, W> internalTimer) throws Exception {
        MergingWindowSet mergingWindowSet;
        Object obj;
        this.triggerContext.key = internalTimer.getKey();
        this.triggerContext.window = internalTimer.getNamespace();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            mergingWindowSet = getMergingWindowSet();
            Window stateWindow = mergingWindowSet.getStateWindow(this.triggerContext.window);
            if (stateWindow == null) {
                return;
            } else {
                this.windowState.setCurrentNamespace(stateWindow);
            }
        } else if ((this.windowAssigner instanceof SlidingTimeWindowPaneAssigner) && (this.windowStateDescriptor instanceof ListStateDescriptor)) {
            mergingWindowSet = null;
        } else {
            this.windowState.setCurrentNamespace(this.triggerContext.window);
            mergingWindowSet = null;
        }
        TriggerResult onEventTime = this.triggerContext.onEventTime(internalTimer.getTimestamp());
        if ((this.windowAssigner instanceof SlidingTimeWindowPaneAssigner) && (this.windowStateDescriptor instanceof ListStateDescriptor)) {
            long windowSize = this.slidingTimeWindowPaneAssigner.getWindowSize();
            long windowSlide = this.slidingTimeWindowPaneAssigner.getWindowSlide();
            if (onEventTime.isFire()) {
                emitWindowListStateContents(this.triggerContext.window, windowSize);
            }
            if (onEventTime.isPurge()) {
                purgeListStatePanes(this.triggerContext.window, windowSize, windowSlide);
            }
            if (this.windowAssigner.isEventTime() && isCleanupTime(this.triggerContext.window, internalTimer.getTimestamp())) {
                clearupListStatePanes(this.triggerContext.window, windowSize, windowSlide, mergingWindowSet);
                return;
            }
            return;
        }
        if (onEventTime.isFire() && (obj = this.windowState.get()) != null) {
            emitWindowContents(this.triggerContext.window, obj);
        }
        if (onEventTime.isPurge()) {
            this.windowState.clear();
        }
        if (this.windowAssigner.isEventTime() && isCleanupTime(this.triggerContext.window, internalTimer.getTimestamp())) {
            clearAllState(this.triggerContext.window, this.windowState, mergingWindowSet);
        }
        if (mergingWindowSet != null) {
            mergingWindowSet.persist();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void onProcessingTime(InternalTimer<K, W> internalTimer) throws Exception {
        MergingWindowSet mergingWindowSet;
        Object obj;
        this.triggerContext.key = internalTimer.getKey();
        this.triggerContext.window = internalTimer.getNamespace();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            mergingWindowSet = getMergingWindowSet();
            Window stateWindow = mergingWindowSet.getStateWindow(this.triggerContext.window);
            if (stateWindow == null) {
                return;
            } else {
                this.windowState.setCurrentNamespace(stateWindow);
            }
        } else if ((this.windowAssigner instanceof SlidingTimeWindowPaneAssigner) && (this.windowStateDescriptor instanceof ListStateDescriptor)) {
            mergingWindowSet = null;
        } else {
            this.windowState.setCurrentNamespace(this.triggerContext.window);
            mergingWindowSet = null;
        }
        TriggerResult onProcessingTime = this.triggerContext.onProcessingTime(internalTimer.getTimestamp());
        if ((this.windowAssigner instanceof SlidingTimeWindowPaneAssigner) && (this.windowStateDescriptor instanceof ListStateDescriptor)) {
            long windowSize = this.slidingTimeWindowPaneAssigner.getWindowSize();
            long windowSlide = this.slidingTimeWindowPaneAssigner.getWindowSlide();
            if (onProcessingTime.isFire()) {
                emitWindowListStateContents(this.triggerContext.window, windowSize);
            }
            if (onProcessingTime.isPurge()) {
                purgeListStatePanes(this.triggerContext.window, windowSize, windowSlide);
            }
            if (this.windowAssigner.isEventTime() || !isCleanupTime(this.triggerContext.window, internalTimer.getTimestamp())) {
                return;
            }
            clearupListStatePanes(this.triggerContext.window, windowSize, windowSlide, mergingWindowSet);
            return;
        }
        if (onProcessingTime.isFire() && (obj = this.windowState.get()) != null) {
            emitWindowContents(this.triggerContext.window, obj);
        }
        if (onProcessingTime.isPurge()) {
            this.windowState.clear();
        }
        if (!this.windowAssigner.isEventTime() && isCleanupTime(this.triggerContext.window, internalTimer.getTimestamp())) {
            clearAllState(this.triggerContext.window, this.windowState, mergingWindowSet);
        }
        if (mergingWindowSet != null) {
            mergingWindowSet.persist();
        }
    }

    private void clearAllState(W w, AppendingState<IN, ACC> appendingState, MergingWindowSet<W> mergingWindowSet) throws Exception {
        appendingState.clear();
        this.triggerContext.clear();
        this.processContext.window = w;
        this.processContext.clear();
        if (mergingWindowSet != null) {
            mergingWindowSet.retireWindow(w);
            mergingWindowSet.persist();
        }
    }

    private void purgeListStatePanes(W w, long j, long j2) {
        long maxTimestamp = (w.maxTimestamp() + 1) - j;
        int i = (int) (j2 / this.paneSize);
        for (int i2 = 0; i2 < i; i2++) {
            this.windowState.setCurrentNamespace(this.slidingTimeWindowPaneAssigner.assignPane(maxTimestamp, maxTimestamp + this.paneSize));
            this.windowState.clear();
            maxTimestamp += this.paneSize;
        }
    }

    private void clearupListStatePanes(W w, long j, long j2, MergingWindowSet<W> mergingWindowSet) throws Exception {
        long maxTimestamp = (w.maxTimestamp() + 1) - j;
        int i = (int) (j2 / this.paneSize);
        for (int i2 = 0; i2 < i; i2++) {
            this.windowState.setCurrentNamespace(this.slidingTimeWindowPaneAssigner.assignPane(maxTimestamp, maxTimestamp + this.paneSize));
            clearAllState(w, this.windowState, mergingWindowSet);
            maxTimestamp += this.paneSize;
        }
    }

    private void emitWindowContents(W w, ACC acc) throws Exception {
        this.timestampedCollector.setAbsoluteTimestamp(w.maxTimestamp());
        this.processContext.window = w;
        ((InternalWindowFunction) this.userFunction).process(this.triggerContext.key, w, this.processContext, acc, this.timestampedCollector);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v21, types: [java.util.Collection] */
    private void emitWindowListStateContents(W w, long j) throws Exception {
        long maxTimestamp = (w.maxTimestamp() + 1) - j;
        ACC acc = null;
        int i = (int) (j / this.paneSize);
        for (int i2 = 0; i2 < i; i2++) {
            this.windowState.setCurrentNamespace(this.slidingTimeWindowPaneAssigner.assignPane(maxTimestamp, maxTimestamp + this.paneSize));
            acc = this.mergeListStateValues.mergeValues(acc, this.windowState);
            maxTimestamp += this.paneSize;
        }
        if (acc != null) {
            emitWindowContents(w, acc);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sideOutput(StreamRecord<IN> streamRecord) {
        this.output.collect(this.lateDataOutputTag, streamRecord);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MergingWindowSet<W> getMergingWindowSet() throws Exception {
        return new MergingWindowSet<>((MergingWindowAssigner) this.windowAssigner, this.mergingSetsState);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isWindowLate(W w) {
        return this.windowAssigner.isEventTime() && cleanupTime(w) <= this.internalTimerService.currentWatermark();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isElementLate(StreamRecord<IN> streamRecord) {
        return this.windowAssigner.isEventTime() && streamRecord.getTimestamp() + this.allowedLateness <= this.internalTimerService.currentWatermark();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerCleanupTimer(W w) {
        long cleanupTime = cleanupTime(w);
        if (cleanupTime == Long.MAX_VALUE) {
            return;
        }
        if (this.windowAssigner.isEventTime()) {
            this.triggerContext.registerEventTimeTimer(cleanupTime);
        } else {
            this.triggerContext.registerProcessingTimeTimer(cleanupTime);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void deleteCleanupTimer(W w) {
        long cleanupTime = cleanupTime(w);
        if (cleanupTime == Long.MAX_VALUE) {
            return;
        }
        if (this.windowAssigner.isEventTime()) {
            this.triggerContext.deleteEventTimeTimer(cleanupTime);
        } else {
            this.triggerContext.deleteProcessingTimeTimer(cleanupTime);
        }
    }

    private long cleanupTime(W w) {
        if (!this.windowAssigner.isEventTime()) {
            return w.maxTimestamp();
        }
        long maxTimestamp = w.maxTimestamp() + this.allowedLateness;
        if (maxTimestamp >= w.maxTimestamp()) {
            return maxTimestamp;
        }
        return Long.MAX_VALUE;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean isCleanupTime(W w, long j) {
        return j == cleanupTime(w);
    }

    @VisibleForTesting
    public Trigger<? super IN, ? super W> getTrigger() {
        return this.trigger;
    }

    @VisibleForTesting
    public KeySelector<IN, K> getKeySelector() {
        return this.keySelector;
    }

    @VisibleForTesting
    public WindowAssigner<? super IN, W> getWindowAssigner() {
        return this.windowAssigner;
    }

    @VisibleForTesting
    public StateDescriptor<? extends AppendingState<IN, ACC>, ?> getStateDescriptor() {
        return this.windowStateDescriptor;
    }
}
