/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.subscriber;

import com.google.protobuf.GeneratedMessage;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.jackrabbit.vault.packaging.Packaging;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestState;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.RunnableUtil;
import org.apache.sling.distribution.journal.impl.precondition.Precondition;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
import org.apache.sling.distribution.journal.impl.shared.AgentState;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.impl.subscriber.Announcer;
import org.apache.sling.distribution.journal.impl.subscriber.BookKeeper;
import org.apache.sling.distribution.journal.impl.subscriber.CommandPoller;
import org.apache.sling.distribution.journal.impl.subscriber.ContentPackageExtractor;
import org.apache.sling.distribution.journal.impl.subscriber.PackageHandler;
import org.apache.sling.distribution.journal.impl.subscriber.SubscriberConfiguration;
import org.apache.sling.distribution.journal.impl.subscriber.SubscriberIdle;
import org.apache.sling.distribution.journal.impl.subscriber.SubscriberReadyStore;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.settings.SlingSettingsService;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
@Component(service={}, immediate=true, property={"announceDelay=10000"}, configurationPid={"org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory"})
@Designate(ocd=SubscriberConfiguration.class, factory=true)
public class DistributionSubscriber
implements DistributionAgent {
    private static final int PRECONDITION_TIMEOUT = 60;
    static int RETRY_DELAY = 5000;
    static int QUEUE_FETCH_DELAY = 1000;
    private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
    private static final Set<DistributionRequestType> SUPPORTED_REQ_TYPES = Collections.emptySet();
    private static final DistributionQueueItem STOPPED_ITEM = new DistributionQueueItem("stop-item", Collections.emptyMap());
    @Reference(name="packageBuilder")
    private DistributionPackageBuilder packageBuilder;
    @Reference
    private SlingSettingsService slingSettings;
    @Reference
    private ResourceResolverFactory resolverFactory;
    @Reference
    private MessagingProvider messagingProvider;
    @Reference
    private Topics topics;
    @Reference
    private EventAdmin eventAdmin;
    @Reference
    private JournalAvailable journalAvailable;
    @Reference(name="precondition")
    private Precondition precondition;
    @Reference
    private DistributionMetricsService distributionMetricsService;
    @Reference
    private Packaging packaging;
    @Reference
    private SubscriberReadyStore subscriberReadyStore;
    private Optional<SubscriberIdle> subscriberIdle;
    private ServiceRegistration<DistributionAgent> componentReg;
    private Closeable packagePoller;
    private CommandPoller commandPoller;
    private BookKeeper bookKeeper;
    private final BlockingQueue<DistributionQueueItem> queueItemsBuffer = new LinkedBlockingQueue<DistributionQueueItem>(8);
    private Set<String> queueNames = Collections.emptySet();
    private Announcer announcer;
    private String subAgentName;
    private String pkgType;
    private volatile boolean running = true;

    @Activate
    public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) {
        String subSlingId = Objects.requireNonNull(this.slingSettings.getSlingId());
        this.subAgentName = Objects.requireNonNull(config.name());
        Objects.requireNonNull(config);
        Objects.requireNonNull(context);
        Objects.requireNonNull(this.packageBuilder);
        Objects.requireNonNull(this.slingSettings);
        Objects.requireNonNull(this.resolverFactory);
        Objects.requireNonNull(this.messagingProvider);
        Objects.requireNonNull(this.topics);
        Objects.requireNonNull(this.eventAdmin);
        Objects.requireNonNull(this.precondition);
        if (config.subscriberIdleCheck()) {
            Integer idleMillies = (Integer)properties.getOrDefault("idleMillies", 10000);
            AtomicBoolean readyHolder = this.subscriberReadyStore.getReadyHolder(this.subAgentName);
            this.subscriberIdle = Optional.of(new SubscriberIdle(context, idleMillies, readyHolder));
        } else {
            this.subscriberIdle = Optional.empty();
        }
        this.queueNames = this.getNotEmpty(config.agentNames());
        int maxRetries = config.maxRetries();
        boolean editable = config.editable();
        ContentPackageExtractor extractor = new ContentPackageExtractor(this.packaging, config.packageHandling());
        PackageHandler packageHandler = new PackageHandler(this.packageBuilder, extractor);
        this.bookKeeper = new BookKeeper(this.resolverFactory, this.distributionMetricsService, packageHandler, this.eventAdmin, this.sender(this.topics.getStatusTopic()), this.subAgentName, subSlingId, editable, maxRetries);
        long startOffset = this.bookKeeper.loadOffset() + 1L;
        String assign = this.messagingProvider.assignTo(startOffset);
        this.packagePoller = this.messagingProvider.createPoller(this.topics.getPackageTopic(), Reset.earliest, assign, new HandlerAdapter[]{HandlerAdapter.create(Messages.PackageMessage.class, this::handlePackageMessage)});
        this.commandPoller = new CommandPoller(this.messagingProvider, this.topics, subSlingId, this.subAgentName, editable);
        RunnableUtil.startBackgroundThread(this::processQueue, (String)String.format("Queue Processor for Subscriber agent %s", this.subAgentName));
        int announceDelay = PropertiesUtil.toInteger((Object)properties.get("announceDelay"), (int)10000);
        this.announcer = new Announcer(subSlingId, this.subAgentName, this.queueNames, this.sender(this.topics.getDiscoveryTopic()), this.bookKeeper, maxRetries, config.editable(), announceDelay);
        this.pkgType = Objects.requireNonNull(this.packageBuilder.getType());
        boolean errorQueueEnabled = maxRetries >= 0;
        String msg = String.format("Started Subscriber agent %s at offset %s, subscribed to agent names %s with package builder %s editable %s maxRetries %s errorQueueEnabled %s", this.subAgentName, startOffset, this.queueNames, this.pkgType, config.editable(), maxRetries, errorQueueEnabled);
        LOG.info(msg);
        Dictionary<String, Object> props = this.createServiceProps(config);
        this.componentReg = context.registerService(DistributionAgent.class, (Object)this, props);
    }

    private <T extends GeneratedMessage> Consumer<T> sender(String topic) {
        MessageSender sender = this.messagingProvider.createSender();
        return msg -> sender.send(topic, msg);
    }

    private Set<String> getNotEmpty(String[] agentNames) {
        return Arrays.stream(agentNames).filter(StringUtils::isNotBlank).collect(Collectors.toSet());
    }

    private Dictionary<String, Object> createServiceProps(SubscriberConfiguration config) {
        Hashtable<String, Object> props = new Hashtable<String, Object>();
        ((Dictionary)props).put("name", config.name());
        ((Dictionary)props).put("title", config.name());
        ((Dictionary)props).put("details", config.name());
        ((Dictionary)props).put("agentNames", config.agentNames());
        ((Dictionary)props).put("editable", config.editable());
        ((Dictionary)props).put("maxRetries", config.maxRetries());
        ((Dictionary)props).put("packageBuilder.target", config.packageBuilder_target());
        ((Dictionary)props).put("precondition.target", config.precondition_target());
        ((Dictionary)props).put("webconsole.configurationFactory.nameHint", config.webconsole_configurationFactory_nameHint());
        return props;
    }

    @Deactivate
    public void deactivate() {
        this.componentReg.unregister();
        IOUtils.closeQuietly((Closeable[])new Closeable[]{this.announcer, this.bookKeeper, this.packagePoller, this.commandPoller});
        this.subscriberIdle.ifPresent(IOUtils::closeQuietly);
        this.running = false;
        String msg = String.format("Stopped Subscriber agent %s, subscribed to Publisher agent names %s with package builder %s", this.subAgentName, this.queueNames, this.pkgType);
        LOG.info(msg);
    }

    @Nonnull
    public Iterable<String> getQueueNames() {
        return this.queueNames;
    }

    public DistributionQueue getQueue(@Nonnull String queueName) {
        DistributionQueueItem head = this.queueItemsBuffer.stream().filter(item -> this.isIn(queueName, (DistributionQueueItem)item)).findFirst().orElse(null);
        return new SubQueue(queueName, head, this.bookKeeper.getPackageRetries());
    }

    private boolean isIn(String queueName, DistributionQueueItem queueItem) {
        Messages.PackageMessage packageMsg = (Messages.PackageMessage)queueItem.get("packageMessage", Messages.PackageMessage.class);
        return queueName.equals(packageMsg.getPubAgentName());
    }

    @Nonnull
    public DistributionLog getLog() {
        return this::emptyDistributionLog;
    }

    private List<String> emptyDistributionLog() {
        return Collections.emptyList();
    }

    @Nonnull
    public DistributionAgentState getState() {
        return AgentState.getState(this);
    }

    @Nonnull
    public DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest request) {
        return this.executeUnsupported(request);
    }

    @Nonnull
    private DistributionResponse executeUnsupported(DistributionRequest request) {
        String msg = String.format("Request type %s is not supported by this agent, expected one of %s", request.getRequestType(), SUPPORTED_REQ_TYPES);
        LOG.info(msg);
        return new SimpleDistributionResponse(DistributionRequestState.DROPPED, msg);
    }

    private void handlePackageMessage(MessageInfo info, Messages.PackageMessage message) {
        if (this.shouldEnqueue(info, message)) {
            DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, message, true);
            this.enqueue(queueItem);
        } else {
            try {
                this.bookKeeper.skipPackage(info.getOffset());
            }
            catch (LoginException | PersistenceException e) {
                LOG.info("Error marking message at offset {} as skipped", (Object)info.getOffset(), (Object)e);
            }
        }
    }

    private boolean shouldEnqueue(MessageInfo info, Messages.PackageMessage message) {
        if (!this.queueNames.contains(message.getPubAgentName())) {
            LOG.info("Skipping package for Publisher agent {} at offset {} (not subscribed)", (Object)message.getPubAgentName(), (Object)info.getOffset());
            return false;
        }
        if (!this.pkgType.equals(message.getPkgType())) {
            LOG.warn("Skipping package with type {} at offset {}", (Object)message.getPkgType(), (Object)info.getOffset());
            return false;
        }
        return true;
    }

    private void enqueue(DistributionQueueItem queueItem) {
        try {
            while (this.running) {
                if (!this.queueItemsBuffer.offer(queueItem, 1000L, TimeUnit.MILLISECONDS)) continue;
                this.distributionMetricsService.getItemsBufferSize().increment();
                return;
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        throw new RuntimeException();
    }

    private void processQueue() {
        LOG.info("Started Queue processor");
        while (this.running) {
            this.fetchAndProcessQueueItem();
        }
        LOG.info("Stopped Queue processor");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void fetchAndProcessQueueItem() {
        try {
            if (!this.blockingSendStoredStatus()) {
                return;
            }
            DistributionQueueItem item = this.blockingPeekQueueItem();
            if (STOPPED_ITEM == item) {
                return;
            }
            try (Timer.Context context = this.distributionMetricsService.getProcessQueueItemDuration().time();){
                this.processQueueItem(item);
            }
            finally {
                this.subscriberIdle.ifPresent(SubscriberIdle::idle);
            }
        }
        catch (TimeoutException e) {
            LOG.info(e.getMessage());
            DistributionSubscriber.delay(RETRY_DELAY);
        }
        catch (IllegalStateException e) {
            throw e;
        }
        catch (Exception e) {
            LOG.error("Error processing queue item", (Throwable)e);
            DistributionSubscriber.delay(RETRY_DELAY);
        }
    }

    private boolean blockingSendStoredStatus() {
        try (Timer.Context context = this.distributionMetricsService.getSendStoredStatusDuration().time();){
            int retry = 0;
            while (this.running && !this.bookKeeper.sendStoredStatus(retry)) {
                ++retry;
            }
        }
        catch (IOException e) {
            LOG.warn("Error in timer close", (Throwable)e);
        }
        return this.running;
    }

    private DistributionQueueItem blockingPeekQueueItem() throws InterruptedException {
        while (this.running) {
            DistributionQueueItem queueItem = (DistributionQueueItem)this.queueItemsBuffer.peek();
            if (queueItem != null) {
                return queueItem;
            }
            Thread.sleep(QUEUE_FETCH_DELAY);
        }
        return STOPPED_ITEM;
    }

    private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, TimeoutException {
        long offset = (Long)queueItem.get("recordOffset", Long.class);
        Messages.PackageMessage pkgMsg = (Messages.PackageMessage)queueItem.get("packageMessage", Messages.PackageMessage.class);
        boolean skip = this.shouldSkip(offset);
        this.subscriberIdle.ifPresent(SubscriberIdle::busy);
        if (skip) {
            this.bookKeeper.removePackage(pkgMsg, offset);
        } else {
            long createdTime = (Long)queueItem.get("recordTimestamp", Long.class);
            this.bookKeeper.importPackage(pkgMsg, offset, createdTime);
        }
        this.queueItemsBuffer.remove();
        this.distributionMetricsService.getItemsBufferSize().decrement();
    }

    private boolean shouldSkip(long offset) throws TimeoutException {
        return this.commandPoller.isCleared(offset) || !this.precondition.canProcess(this.subAgentName, offset, 60);
    }

    private static void delay(long delayInMs) {
        try {
            Thread.sleep(delayInMs);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

