/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.queue.impl;

import com.google.protobuf.GeneratedMessage;
import java.io.Closeable;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.impl.queue.impl.OffsetQueueImpl;
import org.apache.sling.distribution.journal.impl.queue.impl.QueueCacheSeeder;
import org.apache.sling.distribution.journal.impl.queue.impl.RangePoller;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
import org.apache.sling.distribution.journal.impl.subscriber.LocalStore;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
public class PubQueueCache {
    private static final Logger LOG = LoggerFactory.getLogger(PubQueueCache.class);
    private static final long MAX_FETCH_WAIT_MS = 60000L;
    private final Map<String, OffsetQueue<DistributionQueueItem>> agentQueues = new ConcurrentHashMap<String, OffsetQueue<DistributionQueueItem>>();
    private final Lock headPollerLock = new ReentrantLock();
    private final AtomicLong minOffset = new AtomicLong(Long.MAX_VALUE);
    private final AtomicLong maxOffset = new AtomicLong(-1L);
    private volatile long seedOffset = -1L;
    private final Set<JMXRegistration> jmxRegs = new HashSet<JMXRegistration>();
    private final MessagingProvider messagingProvider;
    private final EventAdmin eventAdmin;
    private volatile Closeable tailPoller;
    private final QueueCacheSeeder seeder;
    private final String topic;
    private final LocalStore seedStore;
    private final DistributionMetricsService distributionMetricsService;

    public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, LocalStore seedStore, QueueCacheSeeder seeder) {
        this.messagingProvider = messagingProvider;
        this.eventAdmin = eventAdmin;
        this.distributionMetricsService = distributionMetricsService;
        this.topic = topic;
        this.seedStore = seedStore;
        this.seeder = seeder;
        Long offset = (Long)((Object)seedStore.load("offset", Long.class));
        if (offset != null) {
            this.seedOffset = offset;
            this.startPoller(this.seedOffset);
            seeder.seedOne();
        } else {
            seeder.seed(this::startPoller);
        }
    }

    private void startPoller(long offset) {
        LOG.info("Seed with offset: {}", (Object)offset);
        String assignTo = this.messagingProvider.assignTo(offset);
        this.tailPoller = this.messagingProvider.createPoller(this.topic, Reset.earliest, assignTo, new HandlerAdapter[]{HandlerAdapter.create(Messages.PackageMessage.class, this::handlePackage)});
    }

    @Nonnull
    public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) throws InterruptedException {
        if (!this.isSeeded()) {
            throw new RuntimeException("Gave up waiting for seeded cache");
        }
        this.fetchIfNeeded(minOffset);
        return this.agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl());
    }

    public void storeSeed() {
        long newSeed = this.maxOffset.longValue();
        if (newSeed > this.seedOffset) {
            this.storeSeed(newSeed);
            this.seedOffset = newSeed;
        }
    }

    private void storeSeed(long offset) {
        LOG.info("Store seed offset {}", (Object)offset);
        try {
            this.seedStore.store("offset", offset);
        }
        catch (PersistenceException e) {
            LOG.warn("Failed to persist seed offset", (Throwable)e);
        }
    }

    public int size() {
        return this.agentQueues.values().stream().mapToInt(OffsetQueue::getSize).sum();
    }

    public void close() {
        IOUtils.closeQuietly((Closeable)this.tailPoller);
        IOUtils.closeQuietly((Closeable)this.seeder);
        this.jmxRegs.forEach(IOUtils::closeQuietly);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void fetchIfNeeded(long requestedMinOffset) throws InterruptedException {
        long cachedMinOffset = this.getMinOffset();
        if (requestedMinOffset < cachedMinOffset) {
            LOG.debug("Requested min offset {} smaller than cached min offset {}", (Object)requestedMinOffset, (Object)cachedMinOffset);
            boolean locked = this.headPollerLock.tryLock(60000L, TimeUnit.MILLISECONDS);
            if (!locked) {
                throw new RuntimeException("Gave up fetching queue state");
            }
            try {
                cachedMinOffset = this.getMinOffset();
                if (requestedMinOffset < cachedMinOffset) {
                    this.fetch(requestedMinOffset, cachedMinOffset);
                }
            }
            finally {
                this.headPollerLock.unlock();
            }
        }
    }

    private void fetch(long requestedMinOffset, long cachedMinOffset) throws InterruptedException {
        this.distributionMetricsService.getQueueCacheFetchCount().increment();
        RangePoller headPoller = new RangePoller(this.messagingProvider, this.topic, requestedMinOffset, cachedMinOffset);
        this.merge(headPoller.fetchRange());
        this.updateMinOffset(requestedMinOffset);
    }

    private boolean isSeeded() {
        return this.getMinOffset() != Long.MAX_VALUE;
    }

    protected long getMinOffset() {
        return this.minOffset.longValue();
    }

    private void updateMinOffset(long offset) {
        this.minOffset.accumulateAndGet(offset, Math::min);
    }

    private void updateMaxOffset(long offset) {
        this.maxOffset.accumulateAndGet(offset, Math::max);
    }

    private void merge(List<FullMessage<Messages.PackageMessage>> messages) {
        LOG.debug("Merging fetched offsets");
        messages.stream().filter(this::isNotTestMessage).collect(Collectors.groupingBy(message -> ((Messages.PackageMessage)message.getMessage()).getPubAgentName())).forEach(this::mergeByAgent);
        messages.stream().findFirst().ifPresent(message -> this.updateMinOffset(message.getInfo().getOffset()));
    }

    private void mergeByAgent(String pubAgentName, List<FullMessage<Messages.PackageMessage>> messages) {
        OffsetQueueImpl msgs = new OffsetQueueImpl();
        messages.forEach(message -> msgs.putItem(message.getInfo().getOffset(), QueueItemFactory.fromPackage((FullMessage<Messages.PackageMessage>)message)));
        this.getOrCreateQueue(pubAgentName).putItems(msgs);
        messages.forEach(this::sendQueuedEvent);
    }

    private void sendQueuedEvent(FullMessage<Messages.PackageMessage> fMessage) {
        Messages.PackageMessage message = (Messages.PackageMessage)fMessage.getMessage();
        Event queuedEvent = DistributionEvent.eventPackageQueued(message, message.getPubAgentName());
        this.eventAdmin.postEvent(queuedEvent);
    }

    private OffsetQueue<DistributionQueueItem> getOrCreateQueue(String pubAgentName) {
        return this.agentQueues.computeIfAbsent(pubAgentName, this::createQueue);
    }

    private boolean isNotTestMessage(FullMessage<Messages.PackageMessage> message) {
        return ((Messages.PackageMessage)message.getMessage()).getReqType() != Messages.PackageMessage.ReqType.TEST;
    }

    private OffsetQueue<DistributionQueueItem> createQueue(String pubAgentName) {
        OffsetQueueImpl<DistributionQueueItem> agentQueue = new OffsetQueueImpl<DistributionQueueItem>();
        this.jmxRegs.add(new JMXRegistration(agentQueue, OffsetQueue.class.getSimpleName(), pubAgentName));
        return agentQueue;
    }

    private void handlePackage(MessageInfo info, Messages.PackageMessage message) {
        this.merge(Collections.singletonList(new FullMessage(info, (GeneratedMessage)message)));
        this.updateMaxOffset(info.getOffset());
    }
}

