/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.queue.impl;

import com.google.protobuf.GeneratedMessage;
import java.io.Closeable;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.RunnableUtil;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.impl.queue.impl.OffsetQueueImpl;
import org.apache.sling.distribution.journal.impl.queue.impl.RangePoller;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
public class PubQueueCache {
    private static final Logger LOG = LoggerFactory.getLogger(PubQueueCache.class);
    private static final long MAX_FETCH_WAIT_MS = 60000L;
    private final Map<String, OffsetQueue<DistributionQueueItem>> agentQueues = new ConcurrentHashMap<String, OffsetQueue<DistributionQueueItem>>();
    private final CountDownLatch seeded = new CountDownLatch(1);
    private final Lock headPollerLock = new ReentrantLock();
    private final AtomicLong minOffset = new AtomicLong(Long.MAX_VALUE);
    private final Set<JMXRegistration> jmxRegs = new HashSet<JMXRegistration>();
    private final MessagingProvider messagingProvider;
    private final EventAdmin eventAdmin;
    private final Closeable tailPoller;
    private final String topic;
    private long seedingDelayMs;
    private final DistributionMetricsService distributionMetricsService;
    private volatile boolean closed;
    private final Thread seeder;

    public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, long seedingDelayMs) {
        this.messagingProvider = messagingProvider;
        this.eventAdmin = eventAdmin;
        this.distributionMetricsService = distributionMetricsService;
        this.seedingDelayMs = seedingDelayMs;
        this.topic = topic;
        this.tailPoller = messagingProvider.createPoller(topic, Reset.latest, new HandlerAdapter[]{HandlerAdapter.create(Messages.PackageMessage.class, this::handlePackage)});
        this.seeder = RunnableUtil.startBackgroundThread(this::seedCache, (String)"queue seeding");
    }

    @Nonnull
    public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) throws InterruptedException {
        this.fetchIfNeeded(minOffset);
        return this.agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl());
    }

    public int size() {
        return this.agentQueues.values().stream().mapToInt(OffsetQueue::getSize).sum();
    }

    public void close() {
        this.closed = true;
        this.seeder.interrupt();
        IOUtils.closeQuietly((Closeable)this.tailPoller);
        this.jmxRegs.stream().forEach(IOUtils::closeQuietly);
    }

    private void seedCache() {
        LOG.info("Start message seeder");
        try {
            MessageSender sender = this.messagingProvider.createSender();
            this.sendSeedingMessages((MessageSender<Messages.PackageMessage>)sender);
        }
        finally {
            LOG.info("Stop message seeder");
        }
    }

    private void sendSeedingMessages(MessageSender<Messages.PackageMessage> sender) {
        while (!Thread.interrupted()) {
            Messages.PackageMessage pkgMsg = this.createTestMessage();
            LOG.info("Send seeding message");
            try {
                sender.send(this.topic, (GeneratedMessage)pkgMsg);
                this.sleep(this.seedingDelayMs);
            }
            catch (MessagingException e) {
                LOG.warn(e.getMessage(), (Throwable)e);
                this.sleep(this.seedingDelayMs * 10L);
            }
        }
    }

    private void sleep(long sleepMs) {
        try {
            Thread.sleep(sleepMs);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private Messages.PackageMessage createTestMessage() {
        String pkgId = UUID.randomUUID().toString();
        return Messages.PackageMessage.newBuilder().setPubSlingId("seeder").setPkgId(pkgId).setPkgType("seeder").setReqType(Messages.PackageMessage.ReqType.TEST).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void fetchIfNeeded(long requestedMinOffset) throws InterruptedException {
        this.waitSeeded();
        long cachedMinOffset = this.getMinOffset();
        if (requestedMinOffset < cachedMinOffset) {
            LOG.debug("Requested min offset {} smaller than cached min offset {}", (Object)requestedMinOffset, (Object)cachedMinOffset);
            boolean locked = this.headPollerLock.tryLock(60000L, TimeUnit.MILLISECONDS);
            if (!locked) {
                throw new RuntimeException("Gave up fetching queue state");
            }
            try {
                cachedMinOffset = this.getMinOffset();
                if (requestedMinOffset < cachedMinOffset) {
                    this.fetch(requestedMinOffset, cachedMinOffset);
                }
            }
            finally {
                this.headPollerLock.unlock();
            }
        }
    }

    private void fetch(long requestedMinOffset, long cachedMinOffset) throws InterruptedException {
        this.distributionMetricsService.getQueueCacheFetchCount().increment();
        RangePoller headPoller = new RangePoller(this.messagingProvider, this.topic, requestedMinOffset, cachedMinOffset);
        this.merge(headPoller.fetchRange());
        this.updateMinOffset(requestedMinOffset);
    }

    private void waitSeeded() throws InterruptedException {
        long start = System.currentTimeMillis();
        while (!this.closed && System.currentTimeMillis() - start < 60000L) {
            if (this.seeded.await(this.seedingDelayMs, TimeUnit.MILLISECONDS)) {
                return;
            }
            LOG.debug("Waiting for seeded cache");
        }
        throw new RuntimeException("Gave up waiting for seeded cache");
    }

    protected long getMinOffset() {
        return this.minOffset.longValue();
    }

    private void updateMinOffset(long offset) {
        this.minOffset.accumulateAndGet(offset, Math::min);
    }

    private void merge(List<FullMessage<Messages.PackageMessage>> messages) {
        LOG.debug("Merging fetched offsets");
        messages.stream().filter(this::isNotTestMessage).collect(Collectors.groupingBy(message -> ((Messages.PackageMessage)message.getMessage()).getPubAgentName())).forEach(this::mergeByAgent);
        messages.stream().findFirst().ifPresent(message -> this.updateMinOffset(message.getInfo().getOffset()));
    }

    private void mergeByAgent(String pubAgentName, List<FullMessage<Messages.PackageMessage>> messages) {
        OffsetQueueImpl msgs = new OffsetQueueImpl();
        messages.stream().forEach(message -> msgs.putItem(message.getInfo().getOffset(), QueueItemFactory.fromPackage((FullMessage<Messages.PackageMessage>)message)));
        this.getOrCreateQueue(pubAgentName).putItems(msgs);
        messages.stream().forEach(this::sendQueuedEvent);
    }

    private void sendQueuedEvent(FullMessage<Messages.PackageMessage> fMessage) {
        Messages.PackageMessage message = (Messages.PackageMessage)fMessage.getMessage();
        Event queuedEvent = DistributionEvent.eventPackageQueued(message, message.getPubAgentName());
        this.eventAdmin.postEvent(queuedEvent);
    }

    private OffsetQueue<DistributionQueueItem> getOrCreateQueue(String pubAgentName) {
        return this.agentQueues.computeIfAbsent(pubAgentName, this::createQueue);
    }

    private boolean isNotTestMessage(FullMessage<Messages.PackageMessage> message) {
        return ((Messages.PackageMessage)message.getMessage()).getReqType() != Messages.PackageMessage.ReqType.TEST;
    }

    private OffsetQueue<DistributionQueueItem> createQueue(String pubAgentName) {
        OffsetQueueImpl<DistributionQueueItem> agentQueue = new OffsetQueueImpl<DistributionQueueItem>();
        this.jmxRegs.add(new JMXRegistration(agentQueue, OffsetQueue.class.getSimpleName(), pubAgentName));
        return agentQueue;
    }

    private void handlePackage(MessageInfo info, Messages.PackageMessage message) {
        this.merge(Collections.singletonList(new FullMessage(info, (GeneratedMessage)message)));
        if (this.seeded.getCount() > 0L) {
            LOG.info("Cache has been seeded");
        }
        this.seeded.countDown();
        this.seeder.interrupt();
    }
}

