/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.publisher;

import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.LongStream;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.distribution.journal.JsonMessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.publisher.TopologyChangeHandler;
import org.apache.sling.distribution.journal.impl.publisher.TopologyView;
import org.apache.sling.distribution.journal.impl.publisher.TopologyViewDiff;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
@Component(immediate=true)
public class PackageDistributedNotifier
implements TopologyChangeHandler {
    private static final Logger LOG = LoggerFactory.getLogger(PackageDistributedNotifier.class);
    @Reference
    private EventAdmin eventAdmin;
    @Reference
    private PubQueueCacheService pubQueueCacheService;
    @Reference
    private MessagingProvider messagingProvider;
    @Reference
    private Topics topics;
    private JsonMessageSender<PackageDistributedMessage> sender;
    private boolean sendMsg;

    @Activate
    public void activate() {
        this.sendMsg = StringUtils.isNotBlank((CharSequence)this.topics.getEventTopic());
        if (this.sendMsg) {
            this.sender = this.messagingProvider.createJsonSender();
        }
        LOG.info("Started package distributed notifier with event message topic {}", (Object)this.topics.getEventTopic());
    }

    @Override
    public void changed(TopologyView oldView, TopologyView newView) {
        TopologyViewDiff diffView = new TopologyViewDiff(oldView, newView);
        diffView.getProcessedOffsets().forEach(this::processOffsets);
    }

    private void processOffsets(String pubAgentName, Supplier<LongStream> offsets) {
        long minOffset = offsets.get().findFirst().getAsLong();
        OffsetQueue<DistributionQueueItem> offsetQueue = this.pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
        offsets.get().mapToObj(offsetQueue::getItem).filter(Objects::nonNull).forEach(msg -> this.processOffset(pubAgentName, (DistributionQueueItem)msg));
    }

    protected void processOffset(String pubAgentName, DistributionQueueItem queueItem) {
        this.sendEvt(pubAgentName, queueItem);
        this.sendMsg(pubAgentName, queueItem);
    }

    private void sendMsg(String pubAgentName, DistributionQueueItem queueItem) {
        if (this.sendMsg) {
            PackageDistributedMessage msg = new PackageDistributedMessage();
            msg.pubAgentName = pubAgentName;
            msg.packageId = queueItem.getPackageId();
            msg.offset = (Long)queueItem.get((Object)"recordOffset");
            msg.paths = (String[])queueItem.get((Object)"request.paths");
            msg.deepPaths = (String[])queueItem.get((Object)"request.deepPaths");
            this.sender.send(this.topics.getEventTopic(), (Object)msg);
        }
    }

    private void sendEvt(String pubAgentName, DistributionQueueItem queueItem) {
        Event distributed = DistributionEvent.eventPackageDistributed(queueItem, pubAgentName);
        this.eventAdmin.sendEvent(distributed);
    }
}

