/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.queue.impl;

import com.google.protobuf.GeneratedMessage;
import java.io.Closeable;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.RunnableUtil;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.impl.queue.impl.OffsetQueueImpl;
import org.apache.sling.distribution.journal.impl.queue.impl.RangePoller;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
public class PubQueueCache {
    private static final Logger LOG = LoggerFactory.getLogger(PubQueueCache.class);
    private final Map<String, OffsetQueue<DistributionQueueItem>> agentQueues = new ConcurrentHashMap<String, OffsetQueue<DistributionQueueItem>>();
    private static final long SEEDING_DELAY_MS = 1000L;
    private final CountDownLatch seeded = new CountDownLatch(1);
    private final Lock headPollerLock = new ReentrantLock();
    private final AtomicLong minOffset = new AtomicLong(Long.MAX_VALUE);
    private final Set<JMXRegistration> jmxRegs = new HashSet<JMXRegistration>();
    private final MessagingProvider messagingProvider;
    private final EventAdmin eventAdmin;
    private final Closeable tailPoller;
    private final String topic;
    private DistributionMetricsService distributionMetricsService;
    private volatile boolean closed;

    public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic) {
        this.messagingProvider = messagingProvider;
        this.eventAdmin = eventAdmin;
        this.distributionMetricsService = distributionMetricsService;
        this.topic = topic;
        this.tailPoller = messagingProvider.createPoller(topic, Reset.latest, new HandlerAdapter[]{HandlerAdapter.create(Messages.PackageMessage.class, this::handlePackage)});
        RunnableUtil.startBackgroundThread(this::sendSeedingMessages, (String)"queue seeding");
    }

    @Nonnull
    public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) {
        this.fetchIfNeeded(minOffset);
        return this.agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl());
    }

    public int size() {
        return this.agentQueues.values().stream().mapToInt(OffsetQueue::getSize).sum();
    }

    public void close() {
        this.closed = true;
        IOUtils.closeQuietly((Closeable)this.tailPoller);
        this.jmxRegs.stream().forEach(IOUtils::closeQuietly);
    }

    private void sendSeedingMessages() {
        LOG.info("Send seeding messages");
        MessageSender sender = this.messagingProvider.createSender();
        while (!this.closed) {
            Messages.PackageMessage pkgMsg = this.createTestMessage();
            try {
                LOG.debug("Send seeding message");
                sender.send(this.topic, (GeneratedMessage)pkgMsg);
                if (!this.seeded.await(1000L, TimeUnit.MILLISECONDS)) continue;
                LOG.info("Cache has been seeded");
                return;
            }
            catch (MessagingException e) {
                LOG.info(e.getMessage());
                this.sleep(1000L);
            }
            catch (InterruptedException interruptedException) {
            }
        }
    }

    private Messages.PackageMessage createTestMessage() {
        String pkgId = UUID.randomUUID().toString();
        return Messages.PackageMessage.newBuilder().setPubSlingId("seeder").setPkgId(pkgId).setPkgType("seeder").setReqType(Messages.PackageMessage.ReqType.TEST).build();
    }

    private void fetchIfNeeded(long requestedMinOffset) {
        this.waitSeeded();
        long cachedMinOffset = this.getMinOffset();
        if (requestedMinOffset < cachedMinOffset) {
            LOG.debug(String.format("Requested min offset %s smaller than cached min offset %s", requestedMinOffset, cachedMinOffset));
            this.headPollerLock.lock();
            try {
                cachedMinOffset = this.getMinOffset();
                if (requestedMinOffset < cachedMinOffset) {
                    this.fetch(requestedMinOffset, cachedMinOffset);
                }
            }
            catch (Exception e) {
                throw new RuntimeException(String.format("Failed to fetch offsets [%s,%s[, %s", requestedMinOffset, cachedMinOffset, e.getMessage()), e);
            }
            finally {
                this.headPollerLock.unlock();
            }
        }
    }

    private void fetch(long requestedMinOffset, long cachedMinOffset) throws InterruptedException {
        this.distributionMetricsService.getQueueCacheFetchCount().increment();
        RangePoller headPoller = new RangePoller(this.messagingProvider, this.topic, requestedMinOffset, cachedMinOffset);
        this.merge(headPoller.fetchRange());
        this.updateMinOffset(requestedMinOffset);
    }

    private void waitSeeded() {
        while (!this.closed) {
            try {
                if (this.seeded.await(1000L, TimeUnit.MILLISECONDS)) {
                    return;
                }
                LOG.debug("Waiting for seeded cache");
            }
            catch (InterruptedException interruptedException) {}
        }
        throw new RuntimeException();
    }

    protected long getMinOffset() {
        return this.minOffset.longValue();
    }

    private void updateMinOffset(long offset) {
        this.minOffset.accumulateAndGet(offset, Math::min);
    }

    private void merge(List<FullMessage<Messages.PackageMessage>> messages) {
        LOG.debug("Merging fetched offsets");
        messages.stream().filter(this::isNotTestMessage).collect(Collectors.groupingBy(message -> ((Messages.PackageMessage)message.getMessage()).getPubAgentName())).forEach(this::mergeByAgent);
        messages.stream().findFirst().ifPresent(message -> this.updateMinOffset(message.getInfo().getOffset()));
    }

    private void mergeByAgent(String pubAgentName, List<FullMessage<Messages.PackageMessage>> messages) {
        OffsetQueueImpl msgs = new OffsetQueueImpl();
        messages.stream().forEach(message -> msgs.putItem(message.getInfo().getOffset(), QueueItemFactory.fromPackage((FullMessage<Messages.PackageMessage>)message)));
        this.getOrCreateQueue(pubAgentName).putItems(msgs);
        messages.stream().forEach(this::sendQueuedEvent);
    }

    private void sendQueuedEvent(FullMessage<Messages.PackageMessage> fMessage) {
        Messages.PackageMessage message = (Messages.PackageMessage)fMessage.getMessage();
        Event queuedEvent = DistributionEvent.eventPackageQueued(message, message.getPubAgentName());
        this.eventAdmin.postEvent(queuedEvent);
    }

    private OffsetQueue<DistributionQueueItem> getOrCreateQueue(String pubAgentName) {
        return this.agentQueues.computeIfAbsent(pubAgentName, this::createQueue);
    }

    private boolean isNotTestMessage(FullMessage<Messages.PackageMessage> message) {
        return ((Messages.PackageMessage)message.getMessage()).getReqType() != Messages.PackageMessage.ReqType.TEST;
    }

    private OffsetQueue<DistributionQueueItem> createQueue(String pubAgentName) {
        OffsetQueueImpl<DistributionQueueItem> agentQueue = new OffsetQueueImpl<DistributionQueueItem>();
        this.jmxRegs.add(new JMXRegistration(agentQueue, OffsetQueue.class.getSimpleName(), pubAgentName));
        return agentQueue;
    }

    private void sleep(long delay) {
        try {
            Thread.sleep(delay);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private void handlePackage(MessageInfo info, Messages.PackageMessage message) {
        this.merge(Collections.singletonList(new FullMessage(info, (GeneratedMessage)message)));
        this.seeded.countDown();
    }
}

