package org.apache.flink.cep.dynamic.operator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.EventComparator;
import org.apache.flink.cep.configuration.SharedBufferCacheConfig;
import org.apache.flink.cep.dynamic.DynamicCEPOptions;
import org.apache.flink.cep.dynamic.processor.PatternProcessor;
import org.apache.flink.cep.dynamic.processor.PatternProcessorId;
import org.apache.flink.cep.event.UpdatePatternProcessorEvent;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.NFAState;
import org.apache.flink.cep.nfa.NFAStateSerializer;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.operator.CepOperator;
import org.apache.flink.cep.operator.CepRuntimeContext;
import org.apache.flink.cep.time.TimerService;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/cep/dynamic/operator/DynamicCepOperator.class */
public class DynamicCepOperator<IN, KEY, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT>, Triggerable<KEY, VoidNamespace>, OperatorEventHandler {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicCepOperator.class);
    private static final long serialVersionUID = 1;
    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
    private static final String NFA_STATE_NAME = "nfaStateName";
    private static final String EVENT_QUEUE_STATE_NAME = "eventQueuesStateName";
    private final DynamicCEPOptions.OperatorStartupBehaviour startupBehaviour;
    private final boolean isProcessingTime;
    private final TypeSerializer<IN> inputSerializer;
    private final EventComparator<IN> comparator;
    private transient StateInitializationContext initializationContext;
    private transient Map<NFA<IN>, ValueState<NFAState>> computationStates;
    private transient Map<NFA<IN>, MapState<Long, List<IN>>> elementQueueStates;
    private transient Map<NFA<IN>, SharedBuffer<IN>> partialMatches;
    private transient Map<NFA<IN>, InternalTimerService<VoidNamespace>> timerServices;
    private transient Map<PatternProcessor<IN>, NFA<IN>> patternProcessors;
    private transient DynamicCepOperator<IN, KEY, OUT>.ContextFunctionImpl context;
    private transient TimestampedCollector<OUT> collector;
    private transient CepRuntimeContext cepRuntimeContext;
    private transient Map<PatternProcessorId, TimerService> cepTimerServices;
    private transient Map<PatternProcessorId, Counter> patternMatchedTimesMetricMap;
    private transient Map<PatternProcessorId, Counter> numLateRecordsDroppedMetricMap;
    private transient Map<PatternProcessorId, CepOperator.SimpleGauge<Long>> patternMatchingAvgTimeMetricMap;
    private transient Map<PatternProcessorId, Long> patternMatchingTimesMap;
    private transient Map<PatternProcessorId, Long> patternMatchingTotalTimeMap;

    /* loaded from: input_file:org/apache/flink/cep/dynamic/operator/DynamicCepOperator$ContextFunctionImpl.class */
    public class ContextFunctionImpl implements PatternProcessFunction.Context {
        private Long timestamp;
        private InternalTimerService<?> timerService;

        public ContextFunctionImpl() {
        }

        @Override // org.apache.flink.cep.functions.PatternProcessFunction.Context
        public <X> void output(OutputTag<X> outputTag, X x) {
            DynamicCepOperator.this.output.collect(outputTag, DynamicCepOperator.this.isProcessingTime ? new StreamRecord(x) : new StreamRecord(x, timestamp()));
        }

        void setTimerService(InternalTimerService<?> internalTimerService) {
            this.timerService = internalTimerService;
        }

        void setTimestamp(long j) {
            this.timestamp = Long.valueOf(j);
        }

        @Override // org.apache.flink.cep.time.TimeContext
        public long timestamp() {
            return this.timestamp.longValue();
        }

        @Override // org.apache.flink.cep.time.TimeContext
        public long currentProcessingTime() {
            return this.timerService.currentProcessingTime();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cep/dynamic/operator/DynamicCepOperator$TimerServiceImpl.class */
    public class TimerServiceImpl implements TimerService {
        private final NFA<?> nfa;

        public TimerServiceImpl(NFA<?> nfa) {
            this.nfa = nfa;
        }

        @Override // org.apache.flink.cep.time.TimerService
        public long currentProcessingTime() {
            return ((InternalTimerService) DynamicCepOperator.this.timerServices.get(this.nfa)).currentProcessingTime();
        }
    }

    public DynamicCepOperator(TypeSerializer<IN> typeSerializer, ProcessingTimeService processingTimeService, boolean z, DynamicCEPOptions.OperatorStartupBehaviour operatorStartupBehaviour, @Nullable EventComparator<IN> eventComparator) {
        this.inputSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        this.processingTimeService = processingTimeService;
        this.isProcessingTime = z;
        this.comparator = eventComparator;
        this.startupBehaviour = operatorStartupBehaviour;
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<OUT>> output) {
        super.setup(streamTask, streamConfig, output);
        this.cepRuntimeContext = new CepRuntimeContext(getRuntimeContext());
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.initializationContext = stateInitializationContext;
    }

    public void open() throws Exception {
        super.open();
        this.context = new ContextFunctionImpl();
        this.collector = new TimestampedCollector<>(this.output);
        this.cepTimerServices = new HashMap();
        this.numLateRecordsDroppedMetricMap = new HashMap();
        this.patternMatchedTimesMetricMap = new HashMap();
        this.patternMatchingAvgTimeMetricMap = new HashMap();
        this.patternMatchingTimesMap = new HashMap();
        this.patternMatchingTotalTimeMap = new HashMap();
        initPatternProcessors();
    }

    public void close() throws Exception {
        super.close();
        if (this.patternProcessors != null) {
            for (Map.Entry<PatternProcessor<IN>, NFA<IN>> entry : this.patternProcessors.entrySet()) {
                FunctionUtils.closeFunction(entry.getKey().getPatternProcessFunction());
                entry.getValue().close();
            }
        }
        if (this.partialMatches != null) {
            Iterator<Map.Entry<NFA<IN>, SharedBuffer<IN>>> it = this.partialMatches.entrySet().iterator();
            while (it.hasNext()) {
                it.next().getValue().releaseCacheStatisticsTimer();
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        if (this.patternProcessors == null) {
            LOG.info("There are no available pattern processors, skip element");
            return;
        }
        for (Map.Entry<PatternProcessor<IN>, NFA<IN>> entry : this.patternProcessors.entrySet()) {
            NFA<IN> value = entry.getValue();
            if (!this.isProcessingTime) {
                long timestamp = streamRecord.getTimestamp();
                Object value2 = streamRecord.getValue();
                if (timestamp > this.timerServices.get(value).currentWatermark()) {
                    saveRegisterWatermarkTimer(value);
                    bufferEvent(value, value2, timestamp);
                } else {
                    this.numLateRecordsDroppedMetricMap.get(PatternProcessorId.of(entry.getKey())).inc();
                }
            } else if (this.comparator != null) {
                bufferEvent(value, streamRecord.getValue(), this.timerServices.get(value).currentProcessingTime());
            } else {
                PatternProcessor<IN> key = entry.getKey();
                AfterMatchSkipStrategy afterMatchSkipStrategy = getAfterMatchSkipStrategy(key);
                PatternProcessFunction<IN, ?> patternProcessFunction = key.getPatternProcessFunction();
                NFAState nFAState = getNFAState(value);
                long currentProcessingTime = getProcessingTimeService().getCurrentProcessingTime();
                advanceTime(key, value, nFAState, currentProcessingTime, patternProcessFunction);
                processEvent(value, nFAState, streamRecord.getValue(), currentProcessingTime, afterMatchSkipStrategy, key);
                updateNFA(value, nFAState);
            }
        }
    }

    public void onEventTime(InternalTimer<KEY, VoidNamespace> internalTimer) throws Exception {
        if (this.patternProcessors != null) {
            for (Map.Entry<PatternProcessor<IN>, NFA<IN>> entry : this.patternProcessors.entrySet()) {
                PatternProcessor<IN> key = entry.getKey();
                AfterMatchSkipStrategy afterMatchSkipStrategy = getAfterMatchSkipStrategy(key);
                PatternProcessFunction<IN, ?> patternProcessFunction = key.getPatternProcessFunction();
                NFA<IN> value = entry.getValue();
                PriorityQueue<Long> sortedTimestamps = getSortedTimestamps(value);
                NFAState nFAState = getNFAState(value);
                while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek().longValue() <= this.timerServices.get(value).currentWatermark()) {
                    long longValue = sortedTimestamps.poll().longValue();
                    advanceTime(key, value, nFAState, longValue, patternProcessFunction);
                    maybeSort((List) this.elementQueueStates.get(value).get(Long.valueOf(longValue))).forEach(obj -> {
                        try {
                            processEvent(value, nFAState, obj, longValue, afterMatchSkipStrategy, key);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    });
                    this.elementQueueStates.get(value).remove(Long.valueOf(longValue));
                }
                advanceTime(key, value, nFAState, this.timerServices.get(value).currentWatermark(), patternProcessFunction);
                updateNFA(value, nFAState);
                if (!sortedTimestamps.isEmpty() || !this.partialMatches.get(value).isEmpty()) {
                    saveRegisterWatermarkTimer(value);
                }
            }
        }
    }

    public void onProcessingTime(InternalTimer<KEY, VoidNamespace> internalTimer) throws Exception {
        if (this.patternProcessors != null) {
            for (Map.Entry<PatternProcessor<IN>, NFA<IN>> entry : this.patternProcessors.entrySet()) {
                PatternProcessor<IN> key = entry.getKey();
                PatternProcessFunction<IN, ?> patternProcessFunction = key.getPatternProcessFunction();
                NFA<IN> value = entry.getValue();
                PriorityQueue<Long> sortedTimestamps = getSortedTimestamps(value);
                NFAState nFAState = getNFAState(value);
                while (!sortedTimestamps.isEmpty()) {
                    long longValue = sortedTimestamps.poll().longValue();
                    advanceTime(key, value, nFAState, longValue, patternProcessFunction);
                    maybeSort((List) this.elementQueueStates.get(value).get(Long.valueOf(longValue))).forEach(obj -> {
                        try {
                            processEvent(value, nFAState, obj, longValue, getAfterMatchSkipStrategy(key), key);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    });
                    this.elementQueueStates.get(value).remove(Long.valueOf(longValue));
                }
                updateNFA(value, nFAState);
            }
        }
    }

    public void handleOperatorEvent(OperatorEvent operatorEvent) {
        if (!(operatorEvent instanceof UpdatePatternProcessorEvent)) {
            throw new IllegalStateException("Received unexpected operator event " + operatorEvent);
        }
        try {
            handleUpdatePatternProcessorEvent((UpdatePatternProcessorEvent) operatorEvent);
        } catch (IOException e) {
            throw new FlinkRuntimeException("Failed to deserialize the pattern processors.", e);
        } catch (Exception e2) {
            throw new FlinkRuntimeException("Failed to create the NFA from the pattern processors.", e2);
        }
    }

    @VisibleForTesting
    static String getNameSuffixedWithPatternProcessor(String str, PatternProcessorId patternProcessorId) {
        return String.format("%s-%s-%s", str, patternProcessorId.getId(), Integer.valueOf(patternProcessorId.getVersion()));
    }

    private void initPatternProcessors() throws InterruptedException {
        if (this.startupBehaviour == DynamicCEPOptions.OperatorStartupBehaviour.SKIP_ELEMENTS) {
            return;
        }
        MailboxExecutor mainMailboxExecutor = getContainingTask().getEnvironment().getMainMailboxExecutor();
        LOG.info("Start waiting for initial pattern processors");
        while (this.patternProcessors == null) {
            mainMailboxExecutor.yield();
        }
    }

    private void saveRegisterWatermarkTimer(NFA<IN> nfa) {
        long currentWatermark = this.timerServices.get(nfa).currentWatermark();
        if (currentWatermark + 1 > currentWatermark) {
            this.timerServices.get(nfa).registerEventTimeTimer(VoidNamespace.INSTANCE, currentWatermark + 1);
        }
    }

    private void bufferEvent(NFA<IN> nfa, IN in, long j) throws Exception {
        List list = (List) this.elementQueueStates.get(nfa).get(Long.valueOf(j));
        if (list == null) {
            list = new ArrayList();
        }
        list.add(in);
        this.elementQueueStates.get(nfa).put(Long.valueOf(j), list);
    }

    private AfterMatchSkipStrategy getAfterMatchSkipStrategy(PatternProcessor<IN> patternProcessor) {
        return (AfterMatchSkipStrategy) Optional.ofNullable(patternProcessor.getPattern(getUserCodeClassloader()).getAfterMatchSkipStrategy()).orElse(AfterMatchSkipStrategy.noSkip());
    }

    private NFAState getNFAState(NFA<IN> nfa) throws IOException {
        NFAState nFAState = (NFAState) this.computationStates.get(nfa).value();
        return nFAState != null ? nFAState : nfa.createInitialNFAState();
    }

    private void updateNFA(NFA<IN> nfa, NFAState nFAState) throws IOException {
        if (nFAState.isStateChanged()) {
            nFAState.resetStateChanged();
            this.computationStates.get(nfa).update(nFAState);
        }
    }

    private PriorityQueue<Long> getSortedTimestamps(NFA<IN> nfa) throws Exception {
        PriorityQueue<Long> priorityQueue = new PriorityQueue<>();
        Iterator it = this.elementQueueStates.get(nfa).keys().iterator();
        while (it.hasNext()) {
            priorityQueue.offer((Long) it.next());
        }
        return priorityQueue;
    }

    private void processEvent(NFA<IN> nfa, NFAState nFAState, IN in, long j, AfterMatchSkipStrategy afterMatchSkipStrategy, PatternProcessor<IN> patternProcessor) throws Exception {
        SharedBufferAccessor<IN> accessor = this.partialMatches.get(nfa).getAccessor();
        Throwable th = null;
        try {
            try {
                PatternProcessorId of = PatternProcessorId.of(patternProcessor);
                TimerService timerService = this.cepTimerServices.get(of);
                long nanoTime = System.nanoTime();
                Collection<Map<String, List<IN>>> process = nfa.process(accessor, nFAState, in, j, afterMatchSkipStrategy, timerService);
                this.patternMatchingTotalTimeMap.merge(of, Long.valueOf(System.nanoTime() - nanoTime), (v0, v1) -> {
                    return Long.sum(v0, v1);
                });
                this.patternMatchingTimesMap.merge(of, 1L, (v0, v1) -> {
                    return Long.sum(v0, v1);
                });
                this.patternMatchingAvgTimeMetricMap.get(of).update(Long.valueOf(this.patternMatchingTotalTimeMap.get(of).longValue() / this.patternMatchingTimesMap.get(of).longValue()));
                processMatchedSequences(patternProcessor, process, j);
                if (accessor != null) {
                    if (0 == 0) {
                        accessor.close();
                        return;
                    }
                    try {
                        accessor.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (accessor != null) {
                if (th != null) {
                    try {
                        accessor.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    accessor.close();
                }
            }
            throw th4;
        }
    }

    private void advanceTime(PatternProcessor<IN> patternProcessor, NFA<IN> nfa, NFAState nFAState, long j, PatternProcessFunction<IN, ?> patternProcessFunction) throws Exception {
        SharedBufferAccessor<IN> accessor = this.partialMatches.get(nfa).getAccessor();
        Throwable th = null;
        try {
            try {
                Tuple2<Collection<Map<String, List<IN>>>, Collection<Tuple2<Map<String, List<IN>>, Long>>> advanceTime = nfa.advanceTime(accessor, nFAState, j);
                Collection collection = (Collection) advanceTime.f0;
                Collection<Tuple2<Map<String, List<IN>>, Long>> collection2 = (Collection) advanceTime.f1;
                if (!collection.isEmpty()) {
                    processMatchedSequences(patternProcessor, collection, j);
                }
                if (!collection2.isEmpty()) {
                    processTimedOutSequences(patternProcessFunction, collection2);
                }
                if (accessor != null) {
                    if (0 == 0) {
                        accessor.close();
                        return;
                    }
                    try {
                        accessor.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (accessor != null) {
                if (th != null) {
                    try {
                        accessor.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    accessor.close();
                }
            }
            throw th4;
        }
    }

    private void processMatchedSequences(PatternProcessor<IN> patternProcessor, Iterable<Map<String, List<IN>>> iterable, long j) throws Exception {
        setTimestamp(j);
        for (Map<String, List<IN>> map : iterable) {
            this.patternMatchedTimesMetricMap.get(PatternProcessorId.of(patternProcessor)).inc();
            this.context.setTimerService(this.timerServices.get(this.patternProcessors.get(patternProcessor)));
            patternProcessor.getPatternProcessFunction().processMatch(map, this.context, this.collector);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void processTimedOutSequences(PatternProcessFunction<IN, ?> patternProcessFunction, Collection<Tuple2<Map<String, List<IN>>, Long>> collection) throws Exception {
        if (patternProcessFunction instanceof TimedOutPartialMatchHandler) {
            TimedOutPartialMatchHandler timedOutPartialMatchHandler = (TimedOutPartialMatchHandler) patternProcessFunction;
            for (Tuple2<Map<String, List<IN>>, Long> tuple2 : collection) {
                setTimestamp(((Long) tuple2.f1).longValue());
                timedOutPartialMatchHandler.processTimedOutMatch((Map) tuple2.f0, this.context);
            }
        }
    }

    private void setTimestamp(long j) {
        if (!this.isProcessingTime) {
            this.collector.setAbsoluteTimestamp(j);
        }
        this.context.setTimestamp(j);
    }

    private void handleUpdatePatternProcessorEvent(UpdatePatternProcessorEvent<IN> updatePatternProcessorEvent) throws Exception {
        List<PatternProcessor<IN>> patternProcessors = updatePatternProcessorEvent.patternProcessors();
        int size = patternProcessors.size();
        clearOutdatedElementQueueState(patternProcessors);
        clearOutdatedComputationState(patternProcessors);
        clearOutdatedSharedBufferState(patternProcessors);
        this.patternProcessors = new HashMap(size);
        this.computationStates = new HashMap(size);
        this.elementQueueStates = new HashMap(size);
        this.partialMatches = new HashMap(size);
        this.timerServices = new HashMap(size);
        HashMap hashMap = new HashMap(size);
        HashMap hashMap2 = new HashMap(size);
        HashMap hashMap3 = new HashMap(size);
        HashMap hashMap4 = new HashMap(size);
        for (PatternProcessor<IN> patternProcessor : patternProcessors) {
            PatternProcessorId of = PatternProcessorId.of(patternProcessor);
            PatternProcessFunction<IN, ?> patternProcessFunction = patternProcessor.getPatternProcessFunction();
            FunctionUtils.setFunctionRuntimeContext(patternProcessFunction, this.cepRuntimeContext);
            FunctionUtils.openFunction(patternProcessFunction, new Configuration());
            NFA<IN> createNFA = NFACompiler.compileFactory(patternProcessor.getPattern(getUserCodeClassloader()), patternProcessFunction instanceof TimedOutPartialMatchHandler).createNFA();
            createNFA.open(this.cepRuntimeContext, new Configuration());
            SharedBuffer<IN> sharedBuffer = new SharedBuffer<>(this.initializationContext.getKeyedStateStore(), this.inputSerializer, SharedBufferCacheConfig.of(getOperatorConfig().getConfiguration()), (PatternProcessor<?>) patternProcessor, size);
            ValueState<NFAState> state = this.initializationContext.getKeyedStateStore().getState(new ValueStateDescriptor(getNameSuffixedWithPatternProcessor(NFA_STATE_NAME, of), new NFAStateSerializer()));
            MapState<Long, List<IN>> mapState = this.initializationContext.getKeyedStateStore().getMapState(new MapStateDescriptor(getNameSuffixedWithPatternProcessor(EVENT_QUEUE_STATE_NAME, of), LongSerializer.INSTANCE, new ListSerializer(this.inputSerializer)));
            if (this.initializationContext.isRestored()) {
                sharedBuffer.migrateOldState(getKeyedStateBackend(), state);
            }
            InternalTimerService<VoidNamespace> internalTimerService = getInternalTimerService(String.format("watermark-callbacks-%s-%s", patternProcessor.getId(), Integer.valueOf(patternProcessor.getVersion())), VoidNamespaceSerializer.INSTANCE, this);
            TimerServiceImpl timerServiceImpl = new TimerServiceImpl(createNFA);
            LOG.info("Use patternProcessors version: " + patternProcessor.getVersion() + " with " + patternProcessor.getPattern(getUserCodeClassloader()));
            this.patternProcessors.put(patternProcessor, createNFA);
            this.computationStates.put(createNFA, state);
            this.elementQueueStates.put(createNFA, mapState);
            this.partialMatches.put(createNFA, sharedBuffer);
            this.timerServices.put(createNFA, internalTimerService);
            this.cepTimerServices.put(of, timerServiceImpl);
            hashMap.put(of, this.cepTimerServices.getOrDefault(of, timerServiceImpl));
            addCounterMetric("numLateRecordsDropped", this.numLateRecordsDroppedMetricMap, hashMap2, of);
            addCounterMetric(CepOperator.PATTERN_MATCHED_TIMES_METRIC_NAME, this.patternMatchedTimesMetricMap, hashMap3, of);
            addLongGaugeMetric(CepOperator.PATTERN_MATCHING_AVG_TIME_METRIC_NAME, this.patternMatchingAvgTimeMetricMap, hashMap4, of);
        }
        this.numLateRecordsDroppedMetricMap = hashMap2;
        this.patternMatchedTimesMetricMap = hashMap3;
        this.patternMatchingAvgTimeMetricMap = hashMap4;
        this.cepTimerServices = hashMap;
    }

    @VisibleForTesting
    int getNumOfPatternProcessors() {
        return this.patternProcessors.size();
    }

    @VisibleForTesting
    long getPatternMatchedTimes(PatternProcessorId patternProcessorId) {
        return this.patternMatchedTimesMetricMap.get(patternProcessorId).getCount();
    }

    @VisibleForTesting
    long getLateRecordsNumber(PatternProcessorId patternProcessorId) {
        return this.numLateRecordsDroppedMetricMap.get(patternProcessorId).getCount();
    }

    @VisibleForTesting
    long getPatternMatchingAvgTime(PatternProcessorId patternProcessorId) {
        return this.patternMatchingAvgTimeMetricMap.get(patternProcessorId).getValue().longValue();
    }

    @VisibleForTesting
    Map<NFA<IN>, SharedBuffer<IN>> getPartialMatches() {
        return this.partialMatches;
    }

    @VisibleForTesting
    Map<NFA<IN>, ValueState<NFAState>> getComputationStates() {
        return this.computationStates;
    }

    @VisibleForTesting
    Map<NFA<IN>, MapState<Long, List<IN>>> getElementQueueStates() {
        return this.elementQueueStates;
    }

    @VisibleForTesting
    Map<PatternProcessor<IN>, NFA<IN>> getPatternProcessors() {
        return this.patternProcessors;
    }

    @VisibleForTesting
    StateInitializationContext getInitializationContext() {
        return this.initializationContext;
    }

    private void addCounterMetric(String str, Map<PatternProcessorId, Counter> map, Map<PatternProcessorId, Counter> map2, PatternProcessorId patternProcessorId) {
        map2.put(patternProcessorId, map.containsKey(patternProcessorId) ? map.get(patternProcessorId) : this.metrics.counter(getNameSuffixedWithPatternProcessor(str, patternProcessorId)));
    }

    private void addLongGaugeMetric(String str, Map<PatternProcessorId, CepOperator.SimpleGauge<Long>> map, Map<PatternProcessorId, CepOperator.SimpleGauge<Long>> map2, PatternProcessorId patternProcessorId) {
        map2.put(patternProcessorId, map.containsKey(patternProcessorId) ? map.get(patternProcessorId) : (CepOperator.SimpleGauge) this.metrics.gauge(getNameSuffixedWithPatternProcessor(str, patternProcessorId), new CepOperator.SimpleGauge(0L)));
    }

    private void clearOutdatedComputationState(List<PatternProcessor<IN>> list) {
        if (this.computationStates == null || this.computationStates.isEmpty()) {
            return;
        }
        clearState(list, this.patternProcessors, patternProcessor -> {
            this.computationStates.get(this.patternProcessors.get(patternProcessor)).clear();
        });
    }

    private void clearOutdatedElementQueueState(List<PatternProcessor<IN>> list) {
        if (this.elementQueueStates == null || this.elementQueueStates.isEmpty()) {
            return;
        }
        clearState(list, this.patternProcessors, patternProcessor -> {
            this.elementQueueStates.get(this.patternProcessors.get(patternProcessor)).clear();
        });
    }

    private void clearOutdatedSharedBufferState(List<PatternProcessor<IN>> list) {
        if (this.partialMatches == null || this.partialMatches.isEmpty()) {
            return;
        }
        clearState(list, this.patternProcessors, patternProcessor -> {
            this.partialMatches.get(this.patternProcessors.get(patternProcessor)).clear();
        });
    }

    private List<IN> maybeSort(List<IN> list) {
        if (this.comparator != null) {
            list.sort(this.comparator);
        }
        return list;
    }

    private static <T> void clearState(List<PatternProcessor<T>> list, Map<PatternProcessor<T>, NFA<T>> map, Consumer<PatternProcessor<T>> consumer) {
        map.keySet().forEach(patternProcessor -> {
            boolean z = true;
            Iterator it = list.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                PatternProcessor patternProcessor = (PatternProcessor) it.next();
                if (patternProcessor.getId().equals(patternProcessor.getId())) {
                    if (patternProcessor.getVersion() > patternProcessor.getVersion()) {
                        consumer.accept(patternProcessor);
                        return;
                    } else {
                        if (patternProcessor.getVersion() < patternProcessor.getVersion()) {
                            LOG.warn("Detect newPatternProcessor in new operator event(id: {} version: {}) has smaller version than current one(id: {} version: {}).", new Object[]{patternProcessor.getId(), Integer.valueOf(patternProcessor.getVersion()), patternProcessor.getId(), Integer.valueOf(patternProcessor.getVersion())});
                        }
                        z = false;
                    }
                }
            }
            if (z) {
                consumer.accept(patternProcessor);
            }
        });
    }
}
