package org.apache.flink.cep.operator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.EventComparator;
import org.apache.flink.cep.configuration.SharedBufferCacheConfig;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.NFAState;
import org.apache.flink.cep.nfa.NFAStateSerializer;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.time.TimerService;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/cep/operator/CepOperator.class */
public class CepOperator<IN, KEY, OUT> extends AbstractUdfStreamOperator<OUT, PatternProcessFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT>, Triggerable<KEY, VoidNamespace> {
    public static final String PATTERN_MATCHED_TIMES_METRIC_NAME = "patternMatchedTimes";
    public static final String PATTERN_MATCHING_AVG_TIME_METRIC_NAME = "patternMatchingAvgTime";
    public static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
    private static final long serialVersionUID = -4166778210774160757L;
    private final boolean isProcessingTime;
    private final TypeSerializer<IN> inputSerializer;
    private static final String NFA_STATE_NAME = "nfaStateName";
    private static final String EVENT_QUEUE_STATE_NAME = "eventQueuesStateName";
    private final NFACompiler.NFAFactory<IN> nfaFactory;
    private transient ValueState<NFAState> computationStates;
    private transient MapState<Long, List<IN>> elementQueueState;
    private transient SharedBuffer<IN> partialMatches;
    private transient InternalTimerService<VoidNamespace> timerService;
    private transient NFA<IN> nfa;
    private final EventComparator<IN> comparator;
    private final OutputTag<IN> lateDataOutputTag;
    private final AfterMatchSkipStrategy afterMatchSkipStrategy;
    private transient CepOperator<IN, KEY, OUT>.ContextFunctionImpl context;
    private transient TimestampedCollector<OUT> collector;
    private transient CepRuntimeContext cepRuntimeContext;
    private transient TimerService cepTimerService;
    private transient Counter patternMatchedTimes;
    private transient Counter numLateRecordsDropped;
    private transient SimpleGauge<Long> patternMatchingAvgTime;
    private transient Long patternMatchingTimes;
    private transient Long patternMatchingTotalTime;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cep/operator/CepOperator$ContextFunctionImpl.class */
    public class ContextFunctionImpl implements PatternProcessFunction.Context {
        private Long timestamp;

        private ContextFunctionImpl() {
        }

        @Override // org.apache.flink.cep.functions.PatternProcessFunction.Context
        public <X> void output(OutputTag<X> outputTag, X x) {
            CepOperator.this.output.collect(outputTag, CepOperator.this.isProcessingTime ? new StreamRecord(x) : new StreamRecord(x, timestamp()));
        }

        void setTimestamp(long j) {
            this.timestamp = Long.valueOf(j);
        }

        @Override // org.apache.flink.cep.time.TimeContext
        public long timestamp() {
            return this.timestamp.longValue();
        }

        @Override // org.apache.flink.cep.time.TimeContext
        public long currentProcessingTime() {
            return CepOperator.this.timerService.currentProcessingTime();
        }
    }

    /* loaded from: input_file:org/apache/flink/cep/operator/CepOperator$SimpleGauge.class */
    public static class SimpleGauge<T> implements Gauge<T> {
        private T value;

        public SimpleGauge(T t) {
            this.value = t;
        }

        public void update(T t) {
            this.value = t;
        }

        public T getValue() {
            return this.value;
        }
    }

    /* loaded from: input_file:org/apache/flink/cep/operator/CepOperator$TimerServiceImpl.class */
    private class TimerServiceImpl implements TimerService {
        private TimerServiceImpl() {
        }

        @Override // org.apache.flink.cep.time.TimerService
        public long currentProcessingTime() {
            return CepOperator.this.timerService.currentProcessingTime();
        }
    }

    public CepOperator(TypeSerializer<IN> typeSerializer, boolean z, NFACompiler.NFAFactory<IN> nFAFactory, @Nullable EventComparator<IN> eventComparator, @Nullable AfterMatchSkipStrategy afterMatchSkipStrategy, PatternProcessFunction<IN, OUT> patternProcessFunction, @Nullable OutputTag<IN> outputTag) {
        super(patternProcessFunction);
        this.inputSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        this.nfaFactory = (NFACompiler.NFAFactory) Preconditions.checkNotNull(nFAFactory);
        this.isProcessingTime = z;
        this.comparator = eventComparator;
        this.lateDataOutputTag = outputTag;
        if (afterMatchSkipStrategy == null) {
            this.afterMatchSkipStrategy = AfterMatchSkipStrategy.noSkip();
        } else {
            this.afterMatchSkipStrategy = afterMatchSkipStrategy;
        }
    }

    public boolean useSplittableTimers() {
        return true;
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<OUT>> output) {
        super.setup(streamTask, streamConfig, output);
        this.cepRuntimeContext = new CepRuntimeContext(getRuntimeContext());
        FunctionUtils.setFunctionRuntimeContext(getUserFunction(), this.cepRuntimeContext);
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.computationStates = stateInitializationContext.getKeyedStateStore().getState(new ValueStateDescriptor(NFA_STATE_NAME, new NFAStateSerializer()));
        this.partialMatches = new SharedBuffer<>(stateInitializationContext.getKeyedStateStore(), this.inputSerializer, SharedBufferCacheConfig.of(getOperatorConfig().getConfiguration()));
        this.elementQueueState = stateInitializationContext.getKeyedStateStore().getMapState(new MapStateDescriptor(EVENT_QUEUE_STATE_NAME, LongSerializer.INSTANCE, new ListSerializer(this.inputSerializer)));
        if (stateInitializationContext.isRestored()) {
            this.partialMatches.migrateOldState(getKeyedStateBackend(), this.computationStates);
        }
    }

    public void open() throws Exception {
        super.open();
        this.timerService = getInternalTimerService("watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);
        this.nfa = this.nfaFactory.createNFA();
        this.nfa.open(this.cepRuntimeContext, new Configuration());
        this.context = new ContextFunctionImpl();
        this.collector = new TimestampedCollector<>(this.output);
        this.cepTimerService = new TimerServiceImpl();
        this.patternMatchedTimes = this.metrics.counter(PATTERN_MATCHED_TIMES_METRIC_NAME);
        this.patternMatchingAvgTime = (SimpleGauge) this.metrics.gauge(PATTERN_MATCHING_AVG_TIME_METRIC_NAME, new SimpleGauge(0L));
        this.numLateRecordsDropped = this.metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
        this.patternMatchingTimes = 0L;
        this.patternMatchingTotalTime = 0L;
    }

    public void close() throws Exception {
        super.close();
        if (this.nfa != null) {
            this.nfa.close();
        }
        if (this.partialMatches != null) {
            this.partialMatches.releaseCacheStatisticsTimer();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        if (this.isProcessingTime) {
            if (this.comparator != null) {
                bufferEvent(streamRecord.getValue(), this.timerService.currentProcessingTime());
                return;
            }
            NFAState nFAState = getNFAState();
            long currentProcessingTime = getProcessingTimeService().getCurrentProcessingTime();
            advanceTime(nFAState, currentProcessingTime);
            processEvent(nFAState, streamRecord.getValue(), currentProcessingTime);
            updateNFA(nFAState);
            return;
        }
        long timestamp = streamRecord.getTimestamp();
        Object value = streamRecord.getValue();
        if (timestamp > this.timerService.currentWatermark()) {
            bufferEvent(value, timestamp);
        } else if (this.lateDataOutputTag != null) {
            this.output.collect(this.lateDataOutputTag, streamRecord);
        } else {
            this.numLateRecordsDropped.inc();
        }
    }

    private void registerTimer(long j) {
        if (this.isProcessingTime) {
            this.timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, j + 1);
        } else {
            this.timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, j);
        }
    }

    private void bufferEvent(IN in, long j) throws Exception {
        List list = (List) this.elementQueueState.get(Long.valueOf(j));
        if (list == null) {
            list = new ArrayList();
            registerTimer(j);
        }
        list.add(in);
        this.elementQueueState.put(Long.valueOf(j), list);
    }

    public void onEventTime(InternalTimer<KEY, VoidNamespace> internalTimer) throws Exception {
        PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
        NFAState nFAState = getNFAState();
        while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek().longValue() <= this.timerService.currentWatermark()) {
            long longValue = sortedTimestamps.poll().longValue();
            advanceTime(nFAState, longValue);
            Stream<IN> sort = sort((Collection) this.elementQueueState.get(Long.valueOf(longValue)));
            try {
                sort.forEachOrdered(obj -> {
                    try {
                        processEvent(nFAState, obj, longValue);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
                if (sort != null) {
                    sort.close();
                }
                this.elementQueueState.remove(Long.valueOf(longValue));
            } catch (Throwable th) {
                if (sort != null) {
                    try {
                        sort.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        advanceTime(nFAState, this.timerService.currentWatermark());
        updateNFA(nFAState);
        if (nFAState.getPartialMatches().size() == 1 && nFAState.getCompletedMatches().isEmpty()) {
            this.computationStates.clear();
        }
    }

    public void onProcessingTime(InternalTimer<KEY, VoidNamespace> internalTimer) throws Exception {
        PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
        NFAState nFAState = getNFAState();
        while (!sortedTimestamps.isEmpty()) {
            long longValue = sortedTimestamps.poll().longValue();
            advanceTime(nFAState, longValue);
            Stream<IN> sort = sort((Collection) this.elementQueueState.get(Long.valueOf(longValue)));
            try {
                sort.forEachOrdered(obj -> {
                    try {
                        processEvent(nFAState, obj, longValue);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
                if (sort != null) {
                    sort.close();
                }
                this.elementQueueState.remove(Long.valueOf(longValue));
            } catch (Throwable th) {
                if (sort != null) {
                    try {
                        sort.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        advanceTime(nFAState, this.timerService.currentProcessingTime());
        updateNFA(nFAState);
    }

    private Stream<IN> sort(Collection<IN> collection) {
        Stream<IN> stream = collection.stream();
        return this.comparator == null ? stream : stream.sorted(this.comparator);
    }

    private NFAState getNFAState() throws IOException {
        NFAState nFAState = (NFAState) this.computationStates.value();
        return nFAState != null ? nFAState : this.nfa.createInitialNFAState();
    }

    private void updateNFA(NFAState nFAState) throws IOException {
        if (nFAState.isStateChanged()) {
            nFAState.resetStateChanged();
            nFAState.resetNewStartPartialMatch();
            this.computationStates.update(nFAState);
        }
    }

    private PriorityQueue<Long> getSortedTimestamps() throws Exception {
        PriorityQueue<Long> priorityQueue = new PriorityQueue<>();
        Iterator it = this.elementQueueState.keys().iterator();
        while (it.hasNext()) {
            priorityQueue.offer((Long) it.next());
        }
        return priorityQueue;
    }

    private void processEvent(NFAState nFAState, IN in, long j) throws Exception {
        SharedBufferAccessor<IN> accessor = this.partialMatches.getAccessor();
        try {
            long nanoTime = System.nanoTime();
            Collection<Map<String, List<IN>>> process = this.nfa.process(accessor, nFAState, in, j, this.afterMatchSkipStrategy, this.cepTimerService);
            long nanoTime2 = System.nanoTime();
            Long l = this.patternMatchingTimes;
            this.patternMatchingTimes = Long.valueOf(this.patternMatchingTimes.longValue() + 1);
            this.patternMatchingTotalTime = Long.valueOf((this.patternMatchingTotalTime.longValue() + nanoTime2) - nanoTime);
            this.patternMatchingAvgTime.update(Long.valueOf(this.patternMatchingTotalTime.longValue() / this.patternMatchingTimes.longValue()));
            if (this.nfa.getWindowTime() > 0 && nFAState.isNewStartPartialMatch()) {
                registerTimer(j + this.nfa.getWindowTime());
            }
            processMatchedSequences(process, j);
            if (accessor != null) {
                accessor.close();
            }
        } catch (Throwable th) {
            if (accessor != null) {
                try {
                    accessor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void advanceTime(NFAState nFAState, long j) throws Exception {
        SharedBufferAccessor<IN> accessor = this.partialMatches.getAccessor();
        try {
            Tuple2<Collection<Map<String, List<IN>>>, Collection<Tuple2<Map<String, List<IN>>, Long>>> advanceTime = this.nfa.advanceTime(accessor, nFAState, j, this.afterMatchSkipStrategy);
            Collection collection = (Collection) advanceTime.f0;
            Collection<Tuple2<Map<String, List<IN>>, Long>> collection2 = (Collection) advanceTime.f1;
            if (!collection.isEmpty()) {
                processMatchedSequences(collection, j);
            }
            if (!collection2.isEmpty()) {
                processTimedOutSequences(collection2);
            }
            if (accessor != null) {
                accessor.close();
            }
        } catch (Throwable th) {
            if (accessor != null) {
                try {
                    accessor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void processMatchedSequences(Iterable<Map<String, List<IN>>> iterable, long j) throws Exception {
        PatternProcessFunction userFunction = getUserFunction();
        setTimestamp(j);
        for (Map<String, List<IN>> map : iterable) {
            this.patternMatchedTimes.inc();
            userFunction.processMatch(map, this.context, this.collector);
        }
    }

    private void processTimedOutSequences(Collection<Tuple2<Map<String, List<IN>>, Long>> collection) throws Exception {
        AbstractRichFunction abstractRichFunction = (PatternProcessFunction) getUserFunction();
        if (abstractRichFunction instanceof TimedOutPartialMatchHandler) {
            TimedOutPartialMatchHandler timedOutPartialMatchHandler = (TimedOutPartialMatchHandler) abstractRichFunction;
            for (Tuple2<Map<String, List<IN>>, Long> tuple2 : collection) {
                setTimestamp(((Long) tuple2.f1).longValue());
                timedOutPartialMatchHandler.processTimedOutMatch((Map) tuple2.f0, this.context);
            }
        }
    }

    private void setTimestamp(long j) {
        if (!this.isProcessingTime) {
            this.collector.setAbsoluteTimestamp(j);
        }
        this.context.setTimestamp(j);
    }

    @VisibleForTesting
    boolean hasNonEmptySharedBuffer(KEY key) throws Exception {
        setCurrentKey(key);
        return !this.partialMatches.isEmpty();
    }

    @VisibleForTesting
    boolean hasNonEmptyPQ(KEY key) throws Exception {
        setCurrentKey(key);
        return !this.elementQueueState.isEmpty();
    }

    @VisibleForTesting
    int getPQSize(KEY key) throws Exception {
        setCurrentKey(key);
        int i = 0;
        Iterator it = this.elementQueueState.values().iterator();
        while (it.hasNext()) {
            i += ((List) it.next()).size();
        }
        return i;
    }

    @VisibleForTesting
    long getLateRecordsNumber() {
        return this.numLateRecordsDropped.getCount();
    }
}
