/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.publisher;

import java.io.Closeable;
import java.util.Dictionary;
import java.util.Hashtable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.publisher.AgentId;
import org.apache.sling.distribution.journal.impl.publisher.State;
import org.apache.sling.distribution.journal.impl.publisher.TopologyChangeHandler;
import org.apache.sling.distribution.journal.impl.publisher.TopologyView;
import org.apache.sling.distribution.journal.impl.publisher.TopologyViewManager;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
@Component(service={DiscoveryService.class})
public class DiscoveryService
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(DiscoveryService.class);
    private static final long REFRESH_TTL_MS = 30000L;
    @Reference
    JournalAvailable journalAvailable;
    @Reference
    private MessagingProvider messagingProvider;
    @Reference
    private Topics topics;
    @Reference
    private TopologyChangeHandler topologyChangeHandler;
    private volatile ServiceRegistration<?> reg;
    private final TopologyViewManager viewManager = new TopologyViewManager(30000L);
    private Closeable poller;

    public DiscoveryService() {
    }

    public DiscoveryService(MessagingProvider messagingProvider, TopologyChangeHandler topologyChangeHandler, Topics topics) {
        this.messagingProvider = messagingProvider;
        this.topologyChangeHandler = topologyChangeHandler;
        this.topics = topics;
    }

    @Activate
    public void activate(BundleContext context) {
        this.poller = this.messagingProvider.createPoller(this.topics.getDiscoveryTopic(), Reset.latest, new HandlerAdapter[]{HandlerAdapter.create(Messages.DiscoveryMessage.class, (MessageHandler)new DiscoveryMessageHandler())});
        this.startTopologyViewUpdaterTask(context);
        LOG.info("Discovery service started");
    }

    @Deactivate
    public void deactivate() {
        if (this.reg != null) {
            this.reg.unregister();
        }
        IOUtils.closeQuietly((Closeable)this.poller);
        LOG.info("Discovery service stopped");
    }

    public TopologyView getTopologyView() {
        return this.viewManager.getCurrentView();
    }

    @Override
    public void run() {
        TopologyView oldView = this.viewManager.updateView();
        TopologyView newView = this.viewManager.getCurrentView();
        if (!newView.equals(oldView)) {
            LOG.debug(String.format("TopologyView changed from %s to %s", oldView, newView));
            this.topologyChangeHandler.changed(oldView, newView);
        }
    }

    private void startTopologyViewUpdaterTask(BundleContext context) {
        Hashtable<String, Comparable<Boolean>> props = new Hashtable<String, Comparable<Boolean>>();
        ((Dictionary)props).put("scheduler.concurrent", false);
        ((Dictionary)props).put("scheduler.period", 5L);
        this.reg = context.registerService(Runnable.class.getName(), (Object)this, props);
    }

    private final class DiscoveryMessageHandler
    implements MessageHandler<Messages.DiscoveryMessage> {
        private DiscoveryMessageHandler() {
        }

        public void handle(MessageInfo info, Messages.DiscoveryMessage disMsg) {
            long now = System.currentTimeMillis();
            AgentId subAgentId = new AgentId(disMsg.getSubSlingId(), disMsg.getSubAgentName());
            for (Messages.SubscriberState subStateMsg : disMsg.getSubscriberStateList()) {
                Messages.SubscriberConfiguration subConfig = disMsg.getSubscriberConfiguration();
                State subState = new State(subStateMsg.getPubAgentName(), subAgentId.getAgentId(), now, subStateMsg.getOffset(), subStateMsg.getRetries(), subConfig.getMaxRetries(), subConfig.getEditable());
                DiscoveryService.this.viewManager.refreshState(subState);
            }
        }
    }
}

