/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.queue.impl;

import com.google.protobuf.GeneratedMessage;
import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.impl.queue.impl.ClearCallback;
import org.apache.sling.distribution.journal.impl.queue.impl.OffsetQueueImpl;
import org.apache.sling.distribution.journal.impl.queue.impl.PubErrQueue;
import org.apache.sling.distribution.journal.impl.queue.impl.PubQueue;
import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
@Component
public class PubQueueProviderImpl
implements PubQueueProvider {
    private static final Logger LOG = LoggerFactory.getLogger(PubQueueProviderImpl.class);
    private final Map<String, OffsetQueue<Long>> errorQueues = new ConcurrentHashMap<String, OffsetQueue<Long>>();
    @Reference
    private MessagingProvider messagingProvider;
    @Reference
    private Topics topics;
    @Reference
    private PubQueueCacheService pubQueueCacheService;
    private Closeable statusPoller;
    private MessageSender<Messages.CommandMessage> sender;

    public PubQueueProviderImpl() {
    }

    public PubQueueProviderImpl(PubQueueCacheService pubQueueCacheService, MessagingProvider messagingProvider, Topics topics) {
        this.pubQueueCacheService = pubQueueCacheService;
        this.messagingProvider = messagingProvider;
        this.topics = topics;
    }

    @Activate
    public void activate() {
        this.statusPoller = this.messagingProvider.createPoller(this.topics.getStatusTopic(), Reset.earliest, new HandlerAdapter[]{HandlerAdapter.create(Messages.PackageStatusMessage.class, this::handleStatus)});
        this.sender = this.messagingProvider.createSender();
        LOG.info("Started Publisher queue provider service");
    }

    @Deactivate
    public void deactivate() {
        IOUtils.closeQuietly((Closeable)this.statusPoller);
        LOG.info("Stopped Publisher queue provider service");
    }

    @Override
    @Nonnull
    public DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable) {
        OffsetQueue<DistributionQueueItem> agentQueue = this.pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
        ClearCallback callback = editable ? this.editableCallback(subSlingId, subAgentName) : null;
        return new PubQueue(queueName, agentQueue.getMinOffsetQueue(minOffset), headRetries, callback);
    }

    @Override
    @Nonnull
    public DistributionQueue getErrorQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName) {
        OffsetQueue<DistributionQueueItem> agentQueue;
        String errorQueueKey = this.errorQueueKey(pubAgentName, subSlingId, subAgentName);
        OffsetQueue errorQueue = this.errorQueues.getOrDefault(errorQueueKey, new OffsetQueueImpl());
        long headOffset = errorQueue.getHeadOffset();
        if (headOffset < 0L) {
            agentQueue = new OffsetQueueImpl<DistributionQueueItem>();
        } else {
            long minReferencedOffset = (Long)errorQueue.getItem(headOffset);
            agentQueue = this.pubQueueCacheService.getOffsetQueue(pubAgentName, minReferencedOffset);
        }
        return new PubErrQueue(queueName, agentQueue, errorQueue);
    }

    public void handleStatus(MessageInfo info, Messages.PackageStatusMessage message) {
        if (message.getStatus() == Messages.PackageStatusMessage.Status.REMOVED_FAILED) {
            String errorQueueKey = this.errorQueueKey(message.getPubAgentName(), message.getSubSlingId(), message.getSubAgentName());
            OffsetQueue<Long> errorQueue = this.errorQueues.get(errorQueueKey);
            if (errorQueue == null) {
                errorQueue = new OffsetQueueImpl<Long>();
                this.errorQueues.put(errorQueueKey, errorQueue);
            }
            errorQueue.putItem(info.getOffset(), message.getOffset());
        }
    }

    @Nonnull
    private String errorQueueKey(String pubAgentName, String subSlingId, String subAgentName) {
        return String.format("%s#%s#%s", pubAgentName, subSlingId, subAgentName);
    }

    private ClearCallback editableCallback(String subSlingId, String subAgentName) {
        return offset -> {
            Messages.ClearCommand clearCommand = Messages.ClearCommand.newBuilder().setOffset(offset).build();
            Messages.CommandMessage commandMessage = Messages.CommandMessage.newBuilder().setSubSlingId(subSlingId).setSubAgentName(subAgentName).setClearCommand(clearCommand).build();
            this.sender.send(this.topics.getCommandTopic(), (GeneratedMessage)commandMessage);
        };
    }
}

