package org.apache.flink.streaming.api.windowing.triggers;

import java.time.Duration;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.class */
public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T, W> {
    private static final long serialVersionUID = 1;
    private final Trigger<T, W> nestedTrigger;
    private final long interval;
    private final boolean resetTimerOnNewRecord;
    private final boolean shouldClearOnTimeout;
    private final ValueStateDescriptor<Long> timeoutStateDesc = new ValueStateDescriptor<>("timeout", LongSerializer.INSTANCE);

    private ProcessingTimeoutTrigger(Trigger<T, W> trigger, long j, boolean z, boolean z2) {
        this.nestedTrigger = trigger;
        this.interval = j;
        this.resetTimerOnNewRecord = z;
        this.shouldClearOnTimeout = z2;
    }

    @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger
    public TriggerResult onElement(T t, long j, W w, Trigger.TriggerContext triggerContext) throws Exception {
        TriggerResult onElement = this.nestedTrigger.onElement(t, j, w, triggerContext);
        if (onElement.isFire()) {
            clear(w, triggerContext);
            return onElement;
        }
        ValueState valueState = (ValueState) triggerContext.getPartitionedState(this.timeoutStateDesc);
        long currentProcessingTime = triggerContext.getCurrentProcessingTime() + this.interval;
        Long l = (Long) valueState.value();
        if (l != null && this.resetTimerOnNewRecord) {
            triggerContext.deleteProcessingTimeTimer(l.longValue());
            valueState.clear();
            l = null;
        }
        if (l == null) {
            valueState.update(Long.valueOf(currentProcessingTime));
            triggerContext.registerProcessingTimeTimer(currentProcessingTime);
        }
        return onElement;
    }

    @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger
    public TriggerResult onProcessingTime(long j, W w, Trigger.TriggerContext triggerContext) throws Exception {
        TriggerResult onProcessingTime = this.nestedTrigger.onProcessingTime(j, w, triggerContext);
        if (this.shouldClearOnTimeout) {
            clear(w, triggerContext);
        }
        return onProcessingTime.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE;
    }

    @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger
    public TriggerResult onEventTime(long j, W w, Trigger.TriggerContext triggerContext) throws Exception {
        TriggerResult onEventTime = this.nestedTrigger.onEventTime(j, w, triggerContext);
        if (this.shouldClearOnTimeout) {
            clear(w, triggerContext);
        }
        return onEventTime.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE;
    }

    @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger
    public void clear(W w, Trigger.TriggerContext triggerContext) throws Exception {
        ValueState valueState = (ValueState) triggerContext.getPartitionedState(this.timeoutStateDesc);
        Long l = (Long) valueState.value();
        if (l != null) {
            triggerContext.deleteProcessingTimeTimer(l.longValue());
            valueState.clear();
        }
        this.nestedTrigger.clear(w, triggerContext);
    }

    public String toString() {
        return "TimeoutTrigger(" + this.nestedTrigger.toString() + ")";
    }

    public static <T, W extends Window> ProcessingTimeoutTrigger<T, W> of(Trigger<T, W> trigger, Duration duration) {
        return new ProcessingTimeoutTrigger<>(trigger, duration.toMillis(), false, true);
    }

    public static <T, W extends Window> ProcessingTimeoutTrigger<T, W> of(Trigger<T, W> trigger, Duration duration, boolean z, boolean z2) {
        return new ProcessingTimeoutTrigger<>(trigger, duration.toMillis(), z, z2);
    }
}
