/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.subscriber;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.subscriber.LocalStore;
import org.apache.sling.distribution.journal.impl.subscriber.PackageHandler;
import org.apache.sling.distribution.journal.messages.Messages;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class BookKeeper
implements Closeable {
    private static final String KEY_OFFSET = "offset";
    private static final String SUBSERVICE_IMPORTER = "importer";
    private static final String SUBSERVICE_BOOKKEEPER = "bookkeeper";
    private static final int RETRY_SEND_DELAY = 1000;
    private static final int COMMIT_AFTER_NUM_SKIPPED = 10;
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final ResourceResolverFactory resolverFactory;
    private final DistributionMetricsService distributionMetricsService;
    private final PackageHandler packageHandler;
    private final EventAdmin eventAdmin;
    private final Consumer<Messages.PackageStatusMessage> sender;
    private final boolean editable;
    private final int maxRetries;
    private final boolean errorQueueEnabled;
    private final PackageRetries packageRetries = new PackageRetries();
    private final LocalStore statusStore;
    private final LocalStore processedOffsets;
    private final String subAgentName;
    private final String subSlingId;
    private DistributionMetricsService.GaugeService<Integer> retriesGauge;
    private int skippedCounter = 0;

    public BookKeeper(ResourceResolverFactory resolverFactory, DistributionMetricsService distributionMetricsService, PackageHandler packageHandler, EventAdmin eventAdmin, Consumer<Messages.PackageStatusMessage> sender, String subAgentName, String subSlingId, boolean editable, int maxRetries) {
        this.packageHandler = packageHandler;
        this.eventAdmin = eventAdmin;
        String nameRetries = "distribution.journal.subscriber.current_retries;sub_name=" + subAgentName;
        this.retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries of current package", this.packageRetries::getSum);
        this.resolverFactory = resolverFactory;
        this.distributionMetricsService = distributionMetricsService;
        this.sender = sender;
        this.subAgentName = subAgentName;
        this.subSlingId = subSlingId;
        this.editable = editable;
        this.maxRetries = maxRetries;
        this.errorQueueEnabled = maxRetries >= 0;
        this.statusStore = new LocalStore(resolverFactory, "statuses", subAgentName);
        this.processedOffsets = new LocalStore(resolverFactory, "packages", subAgentName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void importPackage(Messages.PackageMessage pkgMsg, long offset, long createdTime) throws DistributionException {
        this.log.info("Importing distribution package {} of type {} at offset {}", new Object[]{pkgMsg.getPkgId(), pkgMsg.getReqType(), offset});
        this.addPackageMDC(pkgMsg);
        try (Timer.Context context = this.distributionMetricsService.getImportedPackageDuration().time();
             ResourceResolver importerResolver = this.getServiceResolver(SUBSERVICE_IMPORTER);){
            this.packageHandler.apply(importerResolver, pkgMsg);
            if (this.editable) {
                this.storeStatus(importerResolver, new PackageStatus(Messages.PackageStatusMessage.Status.IMPORTED, offset, pkgMsg.getPubAgentName()));
            }
            this.storeOffset(importerResolver, offset);
            importerResolver.commit();
            this.distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength());
            this.distributionMetricsService.getPackageDistributedDuration().update(System.currentTimeMillis() - createdTime, TimeUnit.MILLISECONDS);
            this.packageRetries.clear(pkgMsg.getPubAgentName());
            Event event = DistributionEvent.eventImporterImported(pkgMsg, this.subAgentName);
            this.eventAdmin.postEvent(event);
        }
        catch (IOException | RuntimeException | LoginException e) {
            this.failure(pkgMsg, offset, (Exception)e);
        }
        finally {
            MDC.clear();
        }
    }

    private void addPackageMDC(Messages.PackageMessage pkgMsg) {
        MDC.put((String)"module", (String)"distribution");
        MDC.put((String)"package-id", (String)pkgMsg.getPkgId());
        String paths = pkgMsg.getPathsList().stream().collect(Collectors.joining(","));
        MDC.put((String)"paths", (String)paths);
        MDC.put((String)"pub-sling-id", (String)pkgMsg.getPubSlingId());
        String pubAgentName = pkgMsg.getPubAgentName();
        MDC.put((String)"pub-agent-name", (String)pubAgentName);
        MDC.put((String)"distribution-message-type", (String)pkgMsg.getReqType().name());
        MDC.put((String)"retries", (String)Integer.toString(this.packageRetries.get(pubAgentName)));
        MDC.put((String)"sub-sling-id", (String)this.subSlingId);
        MDC.put((String)"sub-agent-name", (String)this.subAgentName);
    }

    private void failure(Messages.PackageMessage pkgMsg, long offset, Exception e) throws DistributionException {
        this.distributionMetricsService.getFailedPackageImports().mark();
        String pubAgentName = pkgMsg.getPubAgentName();
        int retries = this.packageRetries.get(pubAgentName);
        if (!this.errorQueueEnabled || retries < this.maxRetries) {
            this.packageRetries.increase(pubAgentName);
            String msg = String.format("Error processing distribution package %s. Retry attempts %s/%s.", pkgMsg.getPkgId(), retries, this.errorQueueEnabled ? Integer.toString(this.maxRetries) : "infinite");
            throw new DistributionException(msg, (Throwable)e);
        }
        this.log.warn("Failed to import distribution package {} at offset {} after {} retries, removing the package.", new Object[]{pkgMsg.getPkgId(), offset, retries});
        this.removeFailedPackage(pkgMsg, offset);
    }

    public void removePackage(Messages.PackageMessage pkgMsg, long offset) throws LoginException, PersistenceException {
        this.log.info("Removing distribution package {} of type {} at offset {}", new Object[]{pkgMsg.getPkgId(), pkgMsg.getReqType(), offset});
        Timer.Context context = this.distributionMetricsService.getRemovedPackageDuration().time();
        try (ResourceResolver resolver = this.getServiceResolver(SUBSERVICE_BOOKKEEPER);){
            if (this.editable) {
                this.storeStatus(resolver, new PackageStatus(Messages.PackageStatusMessage.Status.REMOVED, offset, pkgMsg.getPubAgentName()));
            }
            this.storeOffset(resolver, offset);
            resolver.commit();
        }
        this.packageRetries.clear(pkgMsg.getPubAgentName());
        context.stop();
    }

    public void skipPackage(long offset) throws LoginException, PersistenceException {
        this.log.info("Skipping package at offset {}", (Object)offset);
        if (this.shouldCommitSkipped()) {
            try (ResourceResolver resolver = this.getServiceResolver(SUBSERVICE_BOOKKEEPER);){
                this.storeOffset(resolver, offset);
                resolver.commit();
            }
        }
    }

    public synchronized boolean shouldCommitSkipped() {
        ++this.skippedCounter;
        if (this.skippedCounter > 10) {
            this.skippedCounter = 1;
            return true;
        }
        return false;
    }

    public void sendStoredStatus() throws InterruptedException {
        try (Timer.Context context = this.distributionMetricsService.getSendStoredStatusDuration().time();){
            PackageStatus status = new PackageStatus(this.statusStore.load());
            boolean sent = status.sent;
            int retry = 0;
            while (!sent) {
                sent = this.sendStoredStatusOnce(status, retry++);
            }
        }
        catch (IOException e) {
            this.log.warn("Error in timer close", (Throwable)e);
        }
    }

    private boolean sendStoredStatusOnce(PackageStatus status, int retry) throws InterruptedException {
        try {
            this.sendStatusMessage(status);
            this.markStatusSent();
            return true;
        }
        catch (Exception e) {
            this.log.warn("Cannot send status (retry {})", (Object)retry, (Object)e);
            Thread.sleep(1000L);
            return false;
        }
    }

    private void sendStatusMessage(PackageStatus status) {
        Messages.PackageStatusMessage pkgStatMsg = Messages.PackageStatusMessage.newBuilder().setSubSlingId(this.subSlingId).setSubAgentName(this.subAgentName).setPubAgentName(status.pubAgentName).setOffset(status.offset.longValue()).setStatus(status.status).build();
        this.sender.accept(pkgStatMsg);
        this.log.info("Sent status message {}", (Object)pkgStatMsg);
    }

    public void markStatusSent() {
        try (ResourceResolver resolver = this.getServiceResolver(SUBSERVICE_BOOKKEEPER);){
            this.statusStore.store(resolver, "sent", true);
            resolver.commit();
        }
        catch (Exception e) {
            this.log.warn("Failed to mark status as sent", (Throwable)e);
        }
    }

    public long loadOffset() {
        return this.processedOffsets.load(KEY_OFFSET, -1L);
    }

    public int getRetries(String pubAgentName) {
        return this.packageRetries.get(pubAgentName);
    }

    public PackageRetries getPackageRetries() {
        return this.packageRetries;
    }

    @Override
    public void close() throws IOException {
        IOUtils.closeQuietly(this.retriesGauge);
    }

    private void removeFailedPackage(Messages.PackageMessage pkgMsg, long offset) throws DistributionException {
        this.log.info("Removing failed distribution package {} of type {} at offset {}", new Object[]{pkgMsg.getPkgId(), pkgMsg.getReqType(), offset});
        Timer.Context context = this.distributionMetricsService.getRemovedFailedPackageDuration().time();
        try (ResourceResolver resolver = this.getServiceResolver(SUBSERVICE_BOOKKEEPER);){
            this.storeStatus(resolver, new PackageStatus(Messages.PackageStatusMessage.Status.REMOVED_FAILED, offset, pkgMsg.getPubAgentName()));
            this.storeOffset(resolver, offset);
            resolver.commit();
        }
        catch (Exception e) {
            throw new DistributionException("Error removing failed package", (Throwable)e);
        }
        context.stop();
    }

    private void storeStatus(ResourceResolver resolver, PackageStatus packageStatus) throws PersistenceException {
        Map<String, Object> statusMap = packageStatus.asMap();
        this.statusStore.store(resolver, statusMap);
        this.log.info("Stored status {}", statusMap);
    }

    private void storeOffset(ResourceResolver resolver, long offset) throws PersistenceException {
        this.processedOffsets.store(resolver, KEY_OFFSET, offset);
    }

    private ResourceResolver getServiceResolver(String subService) throws LoginException {
        return this.resolverFactory.getServiceResourceResolver(Collections.singletonMap("sling.service.subservice", subService));
    }

    class PackageStatus {
        final Messages.PackageStatusMessage.Status status;
        final Long offset;
        final String pubAgentName;
        final Boolean sent;

        PackageStatus(Messages.PackageStatusMessage.Status status, long offset, String pubAgentName) {
            this.status = status;
            this.offset = offset;
            this.pubAgentName = pubAgentName;
            this.sent = false;
        }

        PackageStatus(ValueMap statusMap) {
            Integer statusNum = (Integer)statusMap.get("statusNumber", Integer.class);
            this.status = statusNum != null ? Messages.PackageStatusMessage.Status.valueOf((int)statusNum) : null;
            this.offset = (Long)statusMap.get(BookKeeper.KEY_OFFSET, Long.class);
            this.pubAgentName = (String)statusMap.get("pubAgentName", String.class);
            this.sent = (Boolean)statusMap.get("sent", (Object)true);
        }

        Map<String, Object> asMap() {
            HashMap<String, Object> s = new HashMap<String, Object>();
            s.put("pubAgentName", this.pubAgentName);
            s.put("statusNumber", this.status.getNumber());
            s.put(BookKeeper.KEY_OFFSET, this.offset);
            s.put("sent", this.sent);
            return s;
        }
    }
}

