package org.apache.flink.cep.nfa.sharedbuffer;

import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.function.Function;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.cep.configuration.SharedBufferCacheConfig;
import org.apache.flink.cep.dynamic.processor.PatternProcessor;
import org.apache.flink.cep.nfa.DeweyNumber;
import org.apache.flink.cep.nfa.NFAState;
import org.apache.flink.cep.nfa.sharedbuffer.EventId;
import org.apache.flink.cep.nfa.sharedbuffer.Lockable;
import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferNode;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava31.com.google.common.cache.RemovalCause;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.util.WrappingRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.class */
public class SharedBuffer<V> {
    private static final Logger LOG = LoggerFactory.getLogger(SharedBuffer.class);
    private static final String LEGACY_ENTRIES_STATE_NAME = "sharedBuffer-entries";
    private static final String ENTRIES_STATE_NAME = "sharedBuffer-entries-with-lockable-edges";
    private static final String EVENTS_STATE_NAME = "sharedBuffer-events";
    private static final String EVENTS_COUNT_STATE_NAME = "sharedBuffer-events-count";
    private final MapState<EventId, Lockable<V>> eventsBuffer;
    private final MapState<Long, Integer> eventsCount;
    private final MapState<NodeId, Lockable<SharedBufferNode>> entries;
    private final Cache<EventId, Lockable<V>> eventsBufferCache;
    private final Cache<NodeId, Lockable<SharedBufferNode>> entryCache;
    private final Timer cacheStatisticsTimer;

    @VisibleForTesting
    public SharedBuffer(KeyedStateStore keyedStateStore, TypeSerializer<V> typeSerializer) {
        this(keyedStateStore, typeSerializer, new SharedBufferCacheConfig());
    }

    public SharedBuffer(KeyedStateStore keyedStateStore, TypeSerializer<V> typeSerializer, SharedBufferCacheConfig sharedBufferCacheConfig) {
        this(keyedStateStore, typeSerializer, sharedBufferCacheConfig, (Function<String, String>) Function.identity(), (Function<Integer, Integer>) Function.identity());
    }

    public SharedBuffer(KeyedStateStore keyedStateStore, TypeSerializer<V> typeSerializer, SharedBufferCacheConfig sharedBufferCacheConfig, PatternProcessor<?> patternProcessor, int i) {
        this(keyedStateStore, typeSerializer, sharedBufferCacheConfig, (Function<String, String>) str -> {
            return String.format("%s-%s-%s", str, patternProcessor.getId(), Integer.valueOf(patternProcessor.getVersion()));
        }, (Function<Integer, Integer>) num -> {
            return Integer.valueOf(Math.max(1, num.intValue() / i));
        });
    }

    private SharedBuffer(KeyedStateStore keyedStateStore, TypeSerializer<V> typeSerializer, SharedBufferCacheConfig sharedBufferCacheConfig, Function<String, String> function, Function<Integer, Integer> function2) {
        this.eventsBuffer = keyedStateStore.getMapState(new MapStateDescriptor(function.apply(EVENTS_STATE_NAME), EventId.EventIdSerializer.INSTANCE, new Lockable.LockableTypeSerializer(typeSerializer)));
        this.entries = keyedStateStore.getMapState(new MapStateDescriptor(function.apply(ENTRIES_STATE_NAME), new NodeId.NodeIdSerializer(), new Lockable.LockableTypeSerializer(new SharedBufferNodeSerializer())));
        this.eventsCount = keyedStateStore.getMapState(new MapStateDescriptor(function.apply(EVENTS_COUNT_STATE_NAME), LongSerializer.INSTANCE, IntSerializer.INSTANCE));
        this.eventsBufferCache = CacheBuilder.newBuilder().maximumSize(function2.apply(Integer.valueOf(sharedBufferCacheConfig.getEventsBufferCacheSlots())).intValue()).removalListener(removalNotification -> {
            if (RemovalCause.SIZE == removalNotification.getCause()) {
                try {
                    this.eventsBuffer.put((EventId) removalNotification.getKey(), (Lockable) removalNotification.getValue());
                } catch (Exception e) {
                    LOG.error("Error in putting value into eventsBuffer.", e);
                }
            }
        }).build();
        this.entryCache = CacheBuilder.newBuilder().maximumSize(function2.apply(Integer.valueOf(sharedBufferCacheConfig.getEntryCacheSlots())).intValue()).removalListener(removalNotification2 -> {
            if (RemovalCause.SIZE == removalNotification2.getCause()) {
                try {
                    this.entries.put((NodeId) removalNotification2.getKey(), (Lockable) removalNotification2.getValue());
                } catch (Exception e) {
                    LOG.error("Error in putting value into entries.", e);
                }
            }
        }).build();
        this.cacheStatisticsTimer = new Timer();
        this.cacheStatisticsTimer.schedule(new TimerTask() { // from class: org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                SharedBuffer.LOG.info("Statistics details of eventsBufferCache: {}, statistics details of entryCache: {}.", SharedBuffer.this.eventsBufferCache.stats(), SharedBuffer.this.entryCache.stats());
            }
        }, sharedBufferCacheConfig.getCacheStatisticsInterval().toMillis(), sharedBufferCacheConfig.getCacheStatisticsInterval().toMillis());
    }

    public void migrateOldState(KeyedStateBackend<?> keyedStateBackend, ValueState<NFAState> valueState) throws Exception {
        keyedStateBackend.applyToAllKeys(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new MapStateDescriptor(LEGACY_ENTRIES_STATE_NAME, new NodeId.NodeIdSerializer(), new Lockable.LockableTypeSerializer(new SharedBufferNode.SharedBufferNodeSerializer())), (obj, mapState) -> {
            copyEntries(mapState);
            mapState.entries().forEach(this::lockPredecessorEdges);
            mapState.clear();
            NFAState nFAState = (NFAState) valueState.value();
            nFAState.getPartialMatches().forEach(computationState -> {
                lockEdges(computationState.getPreviousBufferEntry(), computationState.getVersion());
            });
            nFAState.getCompletedMatches().forEach(computationState2 -> {
                lockEdges(computationState2.getPreviousBufferEntry(), computationState2.getVersion());
            });
        });
    }

    private void copyEntries(MapState<NodeId, Lockable<SharedBufferNode>> mapState) throws Exception {
        mapState.entries().forEach(entry -> {
            try {
                this.entries.put((NodeId) entry.getKey(), (Lockable) entry.getValue());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    private void lockPredecessorEdges(Map.Entry<NodeId, Lockable<SharedBufferNode>> entry) {
        entry.getValue().getElement().getEdges().forEach(lockable -> {
            SharedBufferEdge sharedBufferEdge = (SharedBufferEdge) lockable.getElement();
            lockEdges(sharedBufferEdge.getTarget(), sharedBufferEdge.getDeweyNumber());
        });
    }

    private void lockEdges(NodeId nodeId, DeweyNumber deweyNumber) {
        if (nodeId == null) {
            return;
        }
        try {
            ((SharedBufferNode) ((Lockable) this.entries.get(nodeId)).getElement()).getEdges().forEach(lockable -> {
                if (deweyNumber.isCompatibleWith(((SharedBufferEdge) lockable.getElement()).getDeweyNumber())) {
                    lockable.lock();
                }
            });
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public SharedBufferAccessor<V> getAccessor() {
        return new SharedBufferAccessor<>(this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void advanceTime(long j) throws Exception {
        Iterator it = this.eventsCount.keys().iterator();
        while (it.hasNext()) {
            if (((Long) it.next()).longValue() < j) {
                it.remove();
            }
        }
        if (this.eventsCount.isEmpty()) {
            this.eventsCount.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventId registerEvent(V v, long j) throws Exception {
        Integer num = (Integer) this.eventsCount.get(Long.valueOf(j));
        if (num == null) {
            num = 0;
        }
        EventId eventId = new EventId(num.intValue(), j);
        Lockable lockable = new Lockable(v, 1);
        this.eventsCount.put(Long.valueOf(j), Integer.valueOf(num.intValue() + 1));
        this.eventsBufferCache.put(eventId, lockable);
        return eventId;
    }

    public boolean isEmpty() throws Exception {
        return Iterables.isEmpty(this.eventsBufferCache.asMap().keySet()) && Iterables.isEmpty(this.eventsBuffer.keys());
    }

    public void releaseCacheStatisticsTimer() {
        if (this.cacheStatisticsTimer != null) {
            this.cacheStatisticsTimer.cancel();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void upsertEvent(EventId eventId, Lockable<V> lockable) {
        this.eventsBufferCache.put(eventId, lockable);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void upsertEntry(NodeId nodeId, Lockable<SharedBufferNode> lockable) {
        this.entryCache.put(nodeId, lockable);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeEvent(EventId eventId) throws Exception {
        this.eventsBufferCache.invalidate(eventId);
        this.eventsBuffer.remove(eventId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeEntry(NodeId nodeId) throws Exception {
        this.entryCache.invalidate(nodeId);
        this.entries.remove(nodeId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Lockable<SharedBufferNode> getEntry(NodeId nodeId) {
        try {
            Lockable<SharedBufferNode> lockable = (Lockable) this.entryCache.getIfPresent(nodeId);
            if (Objects.nonNull(lockable)) {
                return lockable;
            }
            Lockable<SharedBufferNode> lockable2 = (Lockable) this.entries.get(nodeId);
            if (Objects.nonNull(lockable2)) {
                this.entryCache.put(nodeId, lockable2);
            }
            return lockable2;
        } catch (Exception e) {
            throw new WrappingRuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Lockable<V> getEvent(EventId eventId) {
        try {
            Lockable<V> lockable = (Lockable) this.eventsBufferCache.getIfPresent(eventId);
            if (Objects.nonNull(lockable)) {
                return lockable;
            }
            Lockable<V> lockable2 = (Lockable) this.eventsBuffer.get(eventId);
            if (Objects.nonNull(lockable2)) {
                this.eventsBufferCache.put(eventId, lockable2);
            }
            return lockable2;
        } catch (Exception e) {
            throw new WrappingRuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void flushCache() throws Exception {
        if (!this.entryCache.asMap().isEmpty()) {
            this.entries.putAll(this.entryCache.asMap());
            this.entryCache.invalidateAll();
        }
        if (this.eventsBufferCache.asMap().isEmpty()) {
            return;
        }
        this.eventsBuffer.putAll(this.eventsBufferCache.asMap());
        this.eventsBufferCache.invalidateAll();
    }

    public void clear() {
        this.entries.clear();
        this.eventsBuffer.clear();
        this.eventsCount.clear();
    }

    @VisibleForTesting
    Iterator<Map.Entry<Long, Integer>> getEventCounters() throws Exception {
        return this.eventsCount.iterator();
    }

    @VisibleForTesting
    public int getEventsBufferCacheSize() {
        return (int) this.eventsBufferCache.size();
    }

    @VisibleForTesting
    public int getEventsBufferSize() throws Exception {
        return Iterables.size(this.eventsBuffer.entries());
    }

    @VisibleForTesting
    public int getSharedBufferNodeSize() throws Exception {
        return Iterables.size(this.entries.entries());
    }

    @VisibleForTesting
    public int getSharedBufferNodeCacheSize() throws Exception {
        return (int) this.entryCache.size();
    }
}
