/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.subscriber;

import com.google.protobuf.GeneratedMessage;
import java.io.Closeable;
import java.io.IOException;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PackageStatusWatcher
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(PackageStatusWatcher.class);
    private final Closeable poller;
    private final String subAgentName;
    private final NavigableMap<Long, FullMessage<Messages.PackageStatusMessage>> cache = new ConcurrentSkipListMap<Long, FullMessage<Messages.PackageStatusMessage>>();

    public PackageStatusWatcher(MessagingProvider messagingProvider, Topics topics, String subAgentName) {
        String topicName = topics.getStatusTopic();
        this.subAgentName = subAgentName;
        this.poller = messagingProvider.createPoller(topicName, Reset.earliest, new HandlerAdapter[]{HandlerAdapter.create(Messages.PackageStatusMessage.class, this::handle)});
    }

    public Messages.PackageStatusMessage.Status getStatus(long pkgOffset) {
        FullMessage msg = (FullMessage)this.cache.get(pkgOffset);
        return msg != null ? ((Messages.PackageStatusMessage)msg.getMessage()).getStatus() : null;
    }

    public Long getStatusOffset(long pkgOffset) {
        FullMessage msg = (FullMessage)this.cache.get(pkgOffset);
        return msg != null ? Long.valueOf(msg.getInfo().getOffset()) : null;
    }

    public void clear(long pkgOffset) {
        NavigableMap<Long, FullMessage<Messages.PackageStatusMessage>> removed = this.cache.headMap(pkgOffset, false);
        if (!removed.isEmpty()) {
            LOG.info("Remove package offsets {} from status cache", removed.keySet());
        }
        removed.clear();
    }

    @Override
    public void close() throws IOException {
        this.poller.close();
    }

    public void handle(MessageInfo info, Messages.PackageStatusMessage msg) {
        Long pkgOffset = msg.getOffset();
        FullMessage message = new FullMessage(info, (GeneratedMessage)msg);
        if (!this.subAgentName.equals(msg.getSubAgentName())) {
            return;
        }
        if (this.cache.containsKey(pkgOffset)) {
            LOG.warn("Package offset {} already exists", (Object)pkgOffset);
        }
        this.cache.put(pkgOffset, (FullMessage<Messages.PackageStatusMessage>)message);
    }
}

