/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.publisher;

import com.google.protobuf.GeneratedMessage;
import java.util.Arrays;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.StreamSupport;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.management.NotCompliantMBeanException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestState;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.publisher.AgentId;
import org.apache.sling.distribution.journal.impl.publisher.DiscoveryService;
import org.apache.sling.distribution.journal.impl.publisher.DistPublisherJMX;
import org.apache.sling.distribution.journal.impl.publisher.PackageMessageFactory;
import org.apache.sling.distribution.journal.impl.publisher.PackageQueuedNotifier;
import org.apache.sling.distribution.journal.impl.publisher.PublisherConfiguration;
import org.apache.sling.distribution.journal.impl.publisher.State;
import org.apache.sling.distribution.journal.impl.publisher.TopologyView;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.impl.shared.AgentState;
import org.apache.sling.distribution.journal.impl.shared.DefaultDistributionLog;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;

@ParametersAreNonnullByDefault
@Component(service={}, immediate=true, configurationPid={"org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory"})
@Designate(ocd=PublisherConfiguration.class, factory=true)
public class DistributionPublisher
implements DistributionAgent {
    public static final String FACTORY_PID = "org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory";
    private final Map<DistributionRequestType, Consumer<Messages.PackageMessage>> REQ_TYPES = new HashMap<DistributionRequestType, Consumer<Messages.PackageMessage>>();
    private final DefaultDistributionLog log = new DefaultDistributionLog(this.pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO);
    @Reference
    private MessagingProvider messagingProvider;
    @Reference(name="packageBuilder")
    private DistributionPackageBuilder packageBuilder;
    @Reference
    private PackageQueuedNotifier queuedNotifier;
    @Reference
    private PubQueueProvider pubQueueProvider;
    @Reference
    private DiscoveryService discoveryService;
    @Reference
    private PackageMessageFactory factory;
    @Reference
    private EventAdmin eventAdmin;
    @Reference
    private Topics topics;
    @Reference
    JournalAvailable journalAvailable;
    @Reference
    private DistributionMetricsService distributionMetricsService;
    private String pubAgentName;
    private String pkgType;
    private long queuedTimeout;
    private ServiceRegistration<DistributionAgent> componentReg;
    private MessageSender<Messages.PackageMessage> sender;
    private JMXRegistration reg;
    private DistributionMetricsService.GaugeService<Integer> subscriberCountGauge;

    public DistributionPublisher() {
        this.REQ_TYPES.put(DistributionRequestType.ADD, this::sendAndWait);
        this.REQ_TYPES.put(DistributionRequestType.DELETE, this::sendAndWait);
        this.REQ_TYPES.put(DistributionRequestType.TEST, this::send);
    }

    @Activate
    public void activate(PublisherConfiguration config, BundleContext context) {
        DistPublisherJMX bean;
        Objects.requireNonNull(this.factory);
        Objects.requireNonNull(this.distributionMetricsService);
        this.pubAgentName = Objects.requireNonNull(config.name());
        this.queuedTimeout = config.queuedTimeout();
        this.pkgType = this.packageBuilder.getType();
        this.sender = this.messagingProvider.createSender();
        Dictionary<String, Object> props = this.createServiceProps(config);
        this.componentReg = Objects.requireNonNull(context.registerService(DistributionAgent.class, (Object)this, props));
        try {
            bean = new DistPublisherJMX(this.pubAgentName, this.discoveryService, this);
        }
        catch (NotCompliantMBeanException e) {
            throw new RuntimeException(e);
        }
        this.reg = new JMXRegistration(bean, "agent", this.pubAgentName);
        String msg = String.format("Started Publisher agent %s with packageBuilder %s, queuedTimeout %s", this.pubAgentName, this.pkgType, this.queuedTimeout);
        this.subscriberCountGauge = this.distributionMetricsService.createGauge("distribution.journal.publisher.subscriber_count;pub_name=" + this.pubAgentName, "Current number of publish subscribers", () -> this.discoveryService.getTopologyView().getSubscribedAgentIds().size());
        this.log.info(msg, new Object[0]);
    }

    @Deactivate
    public void deactivate() {
        this.reg.close();
        this.componentReg.unregister();
        String msg = String.format("Stopped Publisher agent %s with packageBuilder %s, queuedTimeout %s", this.pubAgentName, this.pkgType, this.queuedTimeout);
        IOUtils.closeQuietly(this.subscriberCountGauge);
        this.log.info(msg, new Object[0]);
    }

    private Dictionary<String, Object> createServiceProps(PublisherConfiguration config) {
        Hashtable<String, Object> props = new Hashtable<String, Object>();
        ((Dictionary)props).put("name", config.name());
        ((Dictionary)props).put("title", config.name());
        ((Dictionary)props).put("details", config.name());
        ((Dictionary)props).put("packageBuilder.target", config.packageBuilder_target());
        ((Dictionary)props).put("webconsole.configurationFactory.nameHint", config.webconsole_configurationFactory_nameHint());
        return props;
    }

    @Nonnull
    public Iterable<String> getQueueNames() {
        HashSet<String> queueNames = new HashSet<String>();
        TopologyView view = this.discoveryService.getTopologyView();
        for (String subAgentId : view.getSubscribedAgentIds(this.pubAgentName)) {
            boolean errorQueueEnabled;
            queueNames.add(subAgentId);
            State subState = view.getState(subAgentId, this.pubAgentName);
            if (subState == null || !(errorQueueEnabled = subState.getMaxRetries() >= 0)) continue;
            queueNames.add(String.format("%s-error", subAgentId));
        }
        return Collections.unmodifiableCollection(queueNames);
    }

    public DistributionQueue getQueue(String queueName) {
        if (StreamSupport.stream(this.getQueueNames().spliterator(), true).noneMatch(queueName::equals)) {
            return null;
        }
        return queueName.endsWith("-error") ? this.getErrorQueue(queueName) : this.getPubQueue(queueName);
    }

    @Nonnull
    private DistributionQueue getErrorQueue(String queueName) {
        AgentId subAgentId = new AgentId(StringUtils.substringBeforeLast((String)queueName, (String)"-error"));
        return this.pubQueueProvider.getErrorQueue(this.pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName);
    }

    @CheckForNull
    private DistributionQueue getPubQueue(String queueName) {
        AgentId subAgentId;
        TopologyView view = this.discoveryService.getTopologyView();
        State state = view.getState((subAgentId = new AgentId(queueName)).getAgentId(), this.pubAgentName);
        if (state != null) {
            return this.pubQueueProvider.getQueue(this.pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName, state.getOffset() + 1L, state.getRetries(), state.isEditable());
        }
        return null;
    }

    @Nonnull
    public DistributionLog getLog() {
        return this.log;
    }

    @Nonnull
    public DistributionAgentState getState() {
        return AgentState.getState(this);
    }

    @Nonnull
    public DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest request) throws DistributionException {
        Consumer<Messages.PackageMessage> handler = this.REQ_TYPES.get(request.getRequestType());
        if (handler != null) {
            return this.execute(resourceResolver, request, handler);
        }
        return this.executeUnsupported(request);
    }

    private DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest request, Consumer<Messages.PackageMessage> sender) throws DistributionException {
        try {
            Messages.PackageMessage pkg = DistributionMetricsService.timed(this.distributionMetricsService.getBuildPackageDuration(), () -> this.factory.create(this.packageBuilder, resourceResolver, this.pubAgentName, request));
            DistributionMetricsService.timed(this.distributionMetricsService.getEnqueuePackageDuration(), () -> sender.accept(pkg));
            this.distributionMetricsService.getExportedPackageSize().update(pkg.getPkgLength());
            this.distributionMetricsService.getAcceptedRequests().mark();
            String msg = String.format("Distribution request accepted with type %s paths %s ", request.getRequestType(), Arrays.toString(request.getPaths()));
            this.log.info(msg, new Object[0]);
            return new SimpleDistributionResponse(DistributionRequestState.ACCEPTED, msg);
        }
        catch (Throwable e) {
            this.distributionMetricsService.getDroppedRequests().mark();
            String msg = String.format("Failed to queue distribution request %s", e.getMessage());
            this.log.error(msg, e);
            if (e instanceof Error) {
                throw (Error)e;
            }
            throw new DistributionException(msg, e);
        }
    }

    private void sendAndWait(Messages.PackageMessage pkg) {
        try {
            CompletableFuture<Void> received = this.queuedNotifier.registerWait(pkg.getPkgId());
            Event createdEvent = DistributionEvent.eventPackageCreated(pkg, this.pubAgentName);
            this.eventAdmin.postEvent(createdEvent);
            this.send(pkg);
            received.get(this.queuedTimeout, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            this.queuedNotifier.unRegisterWait(pkg.getPkgId());
            throw new RuntimeException(e);
        }
    }

    private void send(Messages.PackageMessage pipePackage) {
        String topicName = this.topics.getPackageTopic();
        this.sender.send(topicName, (GeneratedMessage)pipePackage);
    }

    @Nonnull
    private DistributionResponse executeUnsupported(DistributionRequest request) {
        String msg = String.format("Request type %s is not supported by this agent, expected one of %s", request.getRequestType(), this.REQ_TYPES.keySet());
        this.log.info(msg, new Object[0]);
        return new SimpleDistributionResponse(DistributionRequestState.DROPPED, msg);
    }
}

