/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.subscriber;

import com.google.protobuf.GeneratedMessage;
import java.io.Closeable;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.jackrabbit.vault.packaging.Packaging;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestState;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.RunnableUtil;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
import org.apache.sling.distribution.journal.impl.shared.AgentState;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.PackageBrowser;
import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.impl.subscriber.Announcer;
import org.apache.sling.distribution.journal.impl.subscriber.ContentPackageExtractor;
import org.apache.sling.distribution.journal.impl.subscriber.LocalStore;
import org.apache.sling.distribution.journal.impl.subscriber.Precondition;
import org.apache.sling.distribution.journal.impl.subscriber.SubscriberConfiguration;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.serviceusermapping.ServiceUserMapped;
import org.apache.sling.settings.SlingSettingsService;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

@ParametersAreNonnullByDefault
@Component(service={}, immediate=true, property={"announceDelay=10000"}, configurationPid={"org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory"})
@Designate(ocd=SubscriberConfiguration.class, factory=true)
public class DistributionSubscriber
implements DistributionAgent {
    private static final int PRECONDITION_TIMEOUT = 60;
    private static final int RETRY_SEND_DELAY = 1000;
    static int RETRY_DELAY = 5000;
    static int QUEUE_FETCH_DELAY = 1000;
    private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
    private static final Set<DistributionRequestType> SUPPORTED_REQ_TYPES = Collections.emptySet();
    @Reference(name="packageBuilder")
    private DistributionPackageBuilder packageBuilder;
    @Reference
    private SlingSettingsService slingSettings;
    @Reference
    private ResourceResolverFactory resolverFactory;
    @Reference
    private MessagingProvider messagingProvider;
    @Reference
    private Topics topics;
    @Reference
    private EventAdmin eventAdmin;
    @Reference
    private JournalAvailable journalAvailable;
    @Reference(name="precondition")
    private Precondition precondition;
    @Reference
    private DistributionMetricsService distributionMetricsService;
    @Reference
    private ServiceUserMapped mappedUser;
    @Reference
    private Packaging packaging;
    private ServiceRegistration<DistributionAgent> componentReg;
    private final PackageRetries packageRetries = new PackageRetries();
    private DistributionMetricsService.GaugeService<Integer> retriesGauge;
    private Closeable packagePoller;
    private Closeable commandPoller;
    private LocalStore processedOffsets;
    private LocalStore processedStatuses;
    private final AtomicLong clearOffset = new AtomicLong(-1L);
    private final BlockingQueue<DistributionQueueItem> queueItemsBuffer = new LinkedBlockingQueue<DistributionQueueItem>(8);
    private Set<String> queueNames = Collections.emptySet();
    private MessageSender<Messages.PackageStatusMessage> sender;
    private Announcer announcer;
    private String subAgentName;
    private String subSlingId;
    private String pkgType;
    private int maxRetries;
    private boolean errorQueueEnabled;
    private boolean editable;
    private volatile boolean running = true;
    private volatile Thread queueProcessor;
    private ContentPackageExtractor extractor;

    @Activate
    public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) {
        this.subSlingId = Objects.requireNonNull(this.slingSettings.getSlingId());
        this.subAgentName = Objects.requireNonNull(config.name());
        Objects.requireNonNull(config);
        Objects.requireNonNull(context);
        Objects.requireNonNull(this.packageBuilder);
        Objects.requireNonNull(this.slingSettings);
        Objects.requireNonNull(this.resolverFactory);
        Objects.requireNonNull(this.messagingProvider);
        Objects.requireNonNull(this.topics);
        Objects.requireNonNull(this.eventAdmin);
        Objects.requireNonNull(this.precondition);
        this.queueNames = this.getNotEmpty(config.agentNames());
        this.maxRetries = config.maxRetries();
        this.errorQueueEnabled = this.maxRetries >= 0;
        this.editable = config.editable();
        this.processedOffsets = new LocalStore(this.resolverFactory, "packages", this.subAgentName);
        long startOffset = this.processedOffsets.load("offset", -1L) + 1L;
        this.processedStatuses = new LocalStore(this.resolverFactory, "statuses", this.subAgentName);
        String assign = this.messagingProvider.assignTo(startOffset);
        this.packagePoller = this.messagingProvider.createPoller(this.topics.getPackageTopic(), Reset.earliest, assign, new HandlerAdapter[]{HandlerAdapter.create(Messages.PackageMessage.class, this::handlePackageMessage)});
        if (this.editable) {
            this.commandPoller = this.messagingProvider.createPoller(this.topics.getCommandTopic(), Reset.earliest, new HandlerAdapter[]{HandlerAdapter.create(Messages.CommandMessage.class, this::handleCommandMessage)});
        }
        this.queueProcessor = RunnableUtil.startBackgroundThread(this::processQueue, (String)String.format("Queue Processor for Subscriber agent %s", this.subAgentName));
        this.sender = this.messagingProvider.createSender();
        String nameRetries = "distribution.journal.subscriber.current_retries;sub_name=" + config.name();
        this.retriesGauge = this.distributionMetricsService.createGauge(nameRetries, "Retries of current package", this.packageRetries::getSum);
        int announceDelay = PropertiesUtil.toInteger((Object)properties.get("announceDelay"), (int)10000);
        MessageSender disSender = this.messagingProvider.createSender();
        this.announcer = new Announcer(this.subSlingId, this.subAgentName, this.topics.getDiscoveryTopic(), this.queueNames, (MessageSender<Messages.DiscoveryMessage>)disSender, this.processedOffsets, this.packageRetries, this.maxRetries, config.editable(), announceDelay);
        this.pkgType = Objects.requireNonNull(this.packageBuilder.getType());
        String msg = String.format("Started Subscriber agent %s at offset %s, subscribed to agent names %s with package builder %s editable %s maxRetries %s errorQueueEnabled %s", this.subAgentName, startOffset, this.queueNames, this.pkgType, config.editable(), this.maxRetries, this.errorQueueEnabled);
        LOG.info(msg);
        Dictionary<String, Object> props = this.createServiceProps(config);
        this.componentReg = context.registerService(DistributionAgent.class, (Object)this, props);
        this.extractor = new ContentPackageExtractor(this.packaging, config.packageHandling());
    }

    private Set<String> getNotEmpty(String[] agentNames) {
        return Arrays.asList(agentNames).stream().filter(StringUtils::isNotBlank).collect(Collectors.toSet());
    }

    private Dictionary<String, Object> createServiceProps(SubscriberConfiguration config) {
        Hashtable<String, Object> props = new Hashtable<String, Object>();
        ((Dictionary)props).put("name", config.name());
        ((Dictionary)props).put("title", config.name());
        ((Dictionary)props).put("details", config.name());
        ((Dictionary)props).put("agentNames", config.agentNames());
        ((Dictionary)props).put("editable", config.editable());
        ((Dictionary)props).put("maxRetries", config.maxRetries());
        ((Dictionary)props).put("packageBuilder.target", config.packageBuilder_target());
        ((Dictionary)props).put("precondition.target", config.precondition_target());
        ((Dictionary)props).put("webconsole.configurationFactory.nameHint", config.webconsole_configurationFactory_nameHint());
        return props;
    }

    @Deactivate
    public void deactivate() {
        IOUtils.closeQuietly(this.retriesGauge);
        IOUtils.closeQuietly((Closeable)this.announcer);
        this.componentReg.unregister();
        IOUtils.closeQuietly((Closeable)this.packagePoller);
        IOUtils.closeQuietly((Closeable)this.commandPoller);
        this.running = false;
        Thread interrupter = this.queueProcessor;
        if (interrupter != null) {
            interrupter.interrupt();
        }
        String msg = String.format("Stopped Subscriber agent %s, subscribed to Publisher agent names %s with package builder %s", this.subAgentName, this.queueNames, this.pkgType);
        LOG.info(msg);
    }

    @Nonnull
    public Iterable<String> getQueueNames() {
        return this.queueNames;
    }

    public DistributionQueue getQueue(@Nonnull String queueName) {
        DistributionQueueItem head = this.queueItemsBuffer.stream().filter(item -> this.isIn(queueName, (DistributionQueueItem)item)).findFirst().orElse(null);
        return new SubQueue(queueName, head, this.packageRetries);
    }

    private boolean isIn(String queueName, DistributionQueueItem queueItem) {
        Messages.PackageMessage packageMsg = (Messages.PackageMessage)queueItem.get("packageMessage", Messages.PackageMessage.class);
        return queueName.equals(packageMsg.getPubAgentName());
    }

    @Nonnull
    public DistributionLog getLog() {
        return this::emptyDistributionLog;
    }

    private List<String> emptyDistributionLog() {
        return Collections.emptyList();
    }

    @Nonnull
    public DistributionAgentState getState() {
        return AgentState.getState(this);
    }

    @Nonnull
    public DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest request) {
        return this.executeUnsupported(request);
    }

    @Nonnull
    private DistributionResponse executeUnsupported(DistributionRequest request) {
        String msg = String.format("Request type %s is not supported by this agent, expected one of %s", request.getRequestType(), SUPPORTED_REQ_TYPES);
        LOG.info(msg);
        return new SimpleDistributionResponse(DistributionRequestState.DROPPED, msg);
    }

    private void handlePackageMessage(MessageInfo info, Messages.PackageMessage message) {
        if (!this.queueNames.contains(message.getPubAgentName())) {
            LOG.info(String.format("Skipping package for Publisher agent %s (not subscribed)", message.getPubAgentName()));
            return;
        }
        if (!this.pkgType.equals(message.getPkgType())) {
            LOG.warn(String.format("Skipping package with type %s", message.getPkgType()));
            return;
        }
        DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, message, true);
        try {
            this.enqueue(queueItem);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException();
        }
    }

    private void enqueue(DistributionQueueItem queueItem) throws InterruptedException {
        while (this.running) {
            if (!this.queueItemsBuffer.offer(queueItem, 1000L, TimeUnit.MILLISECONDS)) continue;
            this.distributionMetricsService.getItemsBufferSize().increment();
            return;
        }
        throw new InterruptedException();
    }

    private void processQueue() {
        LOG.info("Started Queue processor");
        while (!Thread.interrupted()) {
            try {
                this.processQueueItems();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        LOG.info("Stopped Queue processor");
    }

    private void processQueueItems() throws InterruptedException {
        try {
            try (Timer.Context context = this.distributionMetricsService.getSendStoredStatusDuration().time();){
                this.sendStoredStatus();
            }
            DistributionQueueItem item = this.blockingPeekQueueItem();
            try (Timer.Context context = this.distributionMetricsService.getProcessQueueItemDuration().time();){
                this.processQueueItem(item);
            }
        }
        catch (InterruptedException e) {
            throw e;
        }
        catch (Throwable t) {
            LOG.error("Error processing queue item", t);
            Thread.sleep(RETRY_DELAY);
        }
    }

    private DistributionQueueItem blockingPeekQueueItem() throws InterruptedException {
        DistributionQueueItem queueItem;
        while ((queueItem = (DistributionQueueItem)this.queueItemsBuffer.peek()) == null) {
            Thread.sleep(QUEUE_FETCH_DELAY);
        }
        return queueItem;
    }

    private void processQueueItem(DistributionQueueItem queueItem) throws Exception {
        boolean skip;
        long offset = (Long)queueItem.get("recordOffset", Long.class);
        try {
            skip = this.isCleared(offset) || this.cannotProcess(offset);
        }
        catch (IllegalStateException e) {
            LOG.info(e.getMessage());
            Thread.sleep(RETRY_DELAY);
            return;
        }
        Messages.PackageMessage pkgMsg = (Messages.PackageMessage)queueItem.get("packageMessage", Messages.PackageMessage.class);
        String pubAgentName = pkgMsg.getPubAgentName();
        if (skip) {
            this.removePackage(pkgMsg, offset);
        } else {
            long createdTime = (Long)queueItem.get("recordTimestamp", Long.class);
            this.importPackage(pkgMsg, offset, createdTime);
        }
        this.queueItemProcessed(pubAgentName);
    }

    private void removePackage(Messages.PackageMessage pkgMsg, long offset) throws Exception {
        LOG.info(String.format("Removing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
        Timer.Context context = this.distributionMetricsService.getRemovedPackageDuration().time();
        try (ResourceResolver resolver = this.getServiceResolver("bookkeeper");){
            if (this.editable) {
                this.storeStatus(resolver, Messages.PackageStatusMessage.Status.REMOVED, offset, pkgMsg.getPubAgentName());
            }
            this.storeOffset(resolver, offset);
            resolver.commit();
            context.stop();
        }
    }

    private void removeFailedPackage(Messages.PackageMessage pkgMsg, long offset) throws Exception {
        LOG.info(String.format("Removing failed distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
        Timer.Context context = this.distributionMetricsService.getRemovedFailedPackageDuration().time();
        try (ResourceResolver resolver = this.getServiceResolver("bookkeeper");){
            this.storeStatus(resolver, Messages.PackageStatusMessage.Status.REMOVED_FAILED, offset, pkgMsg.getPubAgentName());
            this.storeOffset(resolver, offset);
            resolver.commit();
            context.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void importPackage(Messages.PackageMessage pkgMsg, long offset, long createdTime) throws Exception {
        block20: {
            String pubAgentName = pkgMsg.getPubAgentName();
            LOG.info(String.format("Importing distribution package %s of type %s at offset %s", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset));
            this.addPackageMDC(pkgMsg);
            Timer.Context context = this.distributionMetricsService.getImportedPackageDuration().time();
            try (ResourceResolver importerResolver = this.getServiceResolver("importer");){
                this.installPackage(importerResolver, pkgMsg);
                if (this.editable) {
                    this.storeStatus(importerResolver, Messages.PackageStatusMessage.Status.IMPORTED, offset, pubAgentName);
                }
                this.storeOffset(importerResolver, offset);
                importerResolver.commit();
                context.stop();
                this.distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength());
                this.distributionMetricsService.getPackageDistributedDuration().update(System.currentTimeMillis() - createdTime, TimeUnit.MILLISECONDS);
                Event event = DistributionEvent.eventImporterImported(pkgMsg, this.subAgentName);
                this.eventAdmin.postEvent(event);
            }
            catch (Throwable e) {
                this.distributionMetricsService.getFailedPackageImports().mark();
                if (e instanceof Error) {
                    throw (Error)e;
                }
                int retries = this.packageRetries.get(pubAgentName);
                if (this.errorQueueEnabled && retries >= this.maxRetries) {
                    LOG.warn(String.format("Failed to import distribution package %s at offset %s after %s retries, removing the package.", pkgMsg.getPkgId(), offset, retries));
                    this.removeFailedPackage(pkgMsg, offset);
                    break block20;
                }
                this.packageRetries.increase(pubAgentName);
                String msg = String.format("Error processing distribution package %s. Retry attempts %s/%s.", pkgMsg.getPkgId(), retries, this.errorQueueEnabled ? Integer.toString(this.maxRetries) : "infinite");
                throw new DistributionException(msg, e);
            }
            finally {
                MDC.clear();
            }
        }
    }

    private void storeOffset(ResourceResolver importerResolver, long offset) throws PersistenceException {
        this.processedOffsets.store(importerResolver, "offset", offset);
    }

    private void queueItemProcessed(String pubAgentName) {
        this.packageRetries.clear(pubAgentName);
        this.queueItemsBuffer.remove();
        this.distributionMetricsService.getItemsBufferSize().decrement();
    }

    private void addPackageMDC(Messages.PackageMessage pkgMsg) {
        MDC.put((String)"module", (String)"distribution");
        MDC.put((String)"package-id", (String)pkgMsg.getPkgId());
        String paths = pkgMsg.getPathsList().stream().collect(Collectors.joining(","));
        MDC.put((String)"paths", (String)paths);
        MDC.put((String)"pub-sling-id", (String)pkgMsg.getPubSlingId());
        String pubAgentName = pkgMsg.getPubAgentName();
        MDC.put((String)"pub-agent-name", (String)pubAgentName);
        MDC.put((String)"distribution-message-type", (String)pkgMsg.getReqType().name());
        MDC.put((String)"retries", (String)Integer.toString(this.packageRetries.get(pubAgentName)));
        MDC.put((String)"sub-sling-id", (String)this.subSlingId);
        MDC.put((String)"sub-agent-name", (String)this.subAgentName);
    }

    private void installPackage(ResourceResolver resolver, Messages.PackageMessage pkgMsg) throws DistributionException, PersistenceException {
        Messages.PackageMessage.ReqType type = pkgMsg.getReqType();
        switch (type) {
            case ADD: {
                this.installAddPackage(resolver, pkgMsg);
                break;
            }
            case DELETE: {
                this.installDeletePackage(resolver, pkgMsg);
                break;
            }
            case TEST: {
                break;
            }
            default: {
                throw new UnsupportedOperationException(String.format("Unable to process messages with type: %s", type));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void installAddPackage(ResourceResolver resolver, Messages.PackageMessage pkgMsg) throws DistributionException {
        LOG.info("Importing paths " + pkgMsg.getPathsList());
        InputStream pkgStream = null;
        try {
            pkgStream = PackageBrowser.pkgStream(resolver, pkgMsg);
            this.packageBuilder.installPackage(resolver, pkgStream);
            this.extractor.handle(resolver, pkgMsg.getPathsList());
        }
        finally {
            IOUtils.closeQuietly((InputStream)pkgStream);
        }
    }

    private void installDeletePackage(ResourceResolver resolver, Messages.PackageMessage pkgMsg) throws PersistenceException {
        LOG.info("Deleting paths " + pkgMsg.getPathsList());
        for (String path : pkgMsg.getPathsList()) {
            Resource resource = resolver.getResource(path);
            if (resource == null) continue;
            resolver.delete(resource);
        }
    }

    private void storeStatus(ResourceResolver resolver, Messages.PackageStatusMessage.Status status, long offset, String pubAgentName) throws PersistenceException {
        HashMap<String, Object> s = new HashMap<String, Object>();
        s.put("pubAgentName", pubAgentName);
        s.put("statusNumber", status.getNumber());
        s.put("offset", offset);
        s.put("sent", false);
        this.processedStatuses.store(resolver, s);
        LOG.info("Stored status {}", s);
    }

    private void sendStoredStatus() throws InterruptedException {
        ValueMap status = this.processedStatuses.load();
        boolean sent = (Boolean)status.get("sent", (Object)true);
        int retry = 0;
        while (!sent) {
            try {
                this.sendStatusMessage(status);
                this.markStatusSent();
                sent = true;
            }
            catch (Exception e) {
                LOG.warn("Cannot send status (retry {})", (Object)retry, (Object)e);
                Thread.sleep(1000L);
            }
            ++retry;
        }
    }

    private void markStatusSent() {
        try (ResourceResolver resolver = this.getServiceResolver("bookkeeper");){
            this.processedStatuses.store(resolver, "sent", true);
            resolver.commit();
        }
        catch (Exception e) {
            LOG.warn("Failed to mark status as sent", (Throwable)e);
        }
    }

    private void sendStatusMessage(ValueMap status) {
        Messages.PackageStatusMessage pkgStatMsg = Messages.PackageStatusMessage.newBuilder().setSubSlingId(this.subSlingId).setSubAgentName(this.subAgentName).setPubAgentName((String)status.get("pubAgentName", String.class)).setOffset(((Long)status.get("offset", Long.class)).longValue()).setStatus(Messages.PackageStatusMessage.Status.valueOf((int)((Integer)status.get("statusNumber", Integer.class)))).build();
        this.sender.send(this.topics.getStatusTopic(), (GeneratedMessage)pkgStatMsg);
        LOG.info("Sent status message {}", (Object)status);
    }

    private void handleCommandMessage(MessageInfo info, Messages.CommandMessage message) {
        if (this.subSlingId.equals(message.getSubSlingId()) && this.subAgentName.equals(message.getSubAgentName())) {
            if (message.hasClearCommand()) {
                this.handleClearCommand(message.getClearCommand().getOffset());
            } else {
                LOG.warn("Unsupported command {}", (Object)message);
            }
        } else {
            LOG.debug(String.format("Skip command for subSlingId %s", message.getSubSlingId()));
        }
    }

    private boolean isCleared(long offset) {
        return offset <= this.clearOffset.longValue();
    }

    private boolean cannotProcess(long offset) {
        return !this.precondition.canProcess(offset, 60);
    }

    private void handleClearCommand(long offset) {
        if (this.editable) {
            this.clearOffset.accumulateAndGet(offset, Math::max);
            LOG.info("Handled clear command for offset {}", (Object)offset);
        } else {
            LOG.warn("Unexpected ClearCommand for non editable subscriber");
        }
    }

    private ResourceResolver getServiceResolver(String subService) throws LoginException {
        return this.resolverFactory.getServiceResourceResolver(Collections.singletonMap("sling.service.subservice", subService));
    }
}

