/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.queue.impl.simple;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FilenameFilter;
import java.io.Reader;
import java.io.StringReader;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonException;
import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.json.JsonString;
import javax.json.JsonValue;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import org.apache.sling.commons.scheduler.ScheduleOptions;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.distribution.queue.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueProcessor;
import org.apache.sling.distribution.queue.DistributionQueueProvider;
import org.apache.sling.distribution.queue.DistributionQueueType;
import org.apache.sling.distribution.queue.impl.simple.SimpleDistributionQueue;
import org.apache.sling.distribution.queue.impl.simple.SimpleDistributionQueueCheckpoint;
import org.apache.sling.distribution.queue.impl.simple.SimpleDistributionQueueProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleDistributionQueueProvider
implements DistributionQueueProvider {
    public static final String TYPE = "simple";
    public static final String TYPE_CHECKPOINT = "simple-checkpoint";
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final String name;
    private final Scheduler scheduler;
    private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap<String, SimpleDistributionQueue>();
    private final boolean checkpoint;
    private File checkpointDirectory;

    public SimpleDistributionQueueProvider(Scheduler scheduler, String name, boolean checkpoint) {
        this.checkpoint = checkpoint;
        if (name == null || scheduler == null) {
            throw new IllegalArgumentException("all arguments are required");
        }
        if (checkpoint) {
            this.checkpointDirectory = new File(name + "-simple-queues-checkpoints");
            this.log.info("creating checkpoint directory {}", (Object)this.checkpointDirectory.getAbsoluteFile());
            if (this.checkpointDirectory.exists() && !this.checkpointDirectory.isDirectory()) assert (this.checkpointDirectory.delete());
            boolean created = false;
            if (!this.checkpointDirectory.exists()) {
                created = this.checkpointDirectory.mkdir();
            }
            this.log.info("checkpoint directory created: {}, exists {}", (Object)created, (Object)(this.checkpointDirectory.isDirectory() && this.checkpointDirectory.exists() ? 1 : 0));
        }
        this.scheduler = scheduler;
        this.name = name;
    }

    @Override
    @Nonnull
    public DistributionQueue getQueue(@Nonnull String queueName) {
        String key = this.name + queueName;
        SimpleDistributionQueue queue = this.queueMap.get(key);
        if (queue == null) {
            this.log.debug("creating a queue with key {}", (Object)key);
            queue = new SimpleDistributionQueue(this.name, queueName);
            this.queueMap.put(key, queue);
            this.log.debug("queue created {}", (Object)queue);
        }
        return queue;
    }

    @Override
    public DistributionQueue getQueue(@Nonnull String queueName, @Nonnull DistributionQueueType type) {
        return this.getQueue(queueName);
    }

    Collection<SimpleDistributionQueue> getQueues() {
        return this.queueMap.values();
    }

    @Override
    public void enableQueueProcessing(@Nonnull DistributionQueueProcessor queueProcessor, String ... queueNames) {
        ScheduleOptions options;
        if (this.checkpoint) {
            this.log.debug("recovering from checkpoints if needed");
            for (final String queueName : queueNames) {
                this.log.debug("recovering for queue {}", (Object)queueName);
                DistributionQueue queue = this.getQueue(queueName);
                FilenameFilter filenameFilter = new FilenameFilter(){

                    @Override
                    public boolean accept(File file, String name) {
                        return name.equals(queueName + "-checkpoint");
                    }
                };
                for (File qf : this.checkpointDirectory.listFiles(filenameFilter)) {
                    this.log.info("recovering from checkpoint {}", (Object)qf);
                    try {
                        LineIterator lineIterator = IOUtils.lineIterator((Reader)new FileReader(qf));
                        while (lineIterator.hasNext()) {
                            String s = lineIterator.nextLine();
                            String[] split = s.split(" ");
                            String id = split[0];
                            String infoString = split[1];
                            HashMap<String, Object> info = new HashMap<String, Object>();
                            JsonReader reader = Json.createReader((Reader)new StringReader(infoString));
                            JsonObject jsonObject = reader.readObject();
                            for (Map.Entry entry : jsonObject.entrySet()) {
                                if (((JsonValue)entry.getValue()).getValueType().equals((Object)JsonValue.ValueType.ARRAY)) {
                                    JsonArray value = jsonObject.getJsonArray((String)entry.getKey());
                                    String[] a = new String[value.size()];
                                    for (int i = 0; i < a.length; ++i) {
                                        a[i] = value.getString(i);
                                    }
                                    info.put((String)entry.getKey(), a);
                                    continue;
                                }
                                if (JsonValue.NULL.equals(entry.getValue())) {
                                    info.put((String)entry.getKey(), null);
                                    continue;
                                }
                                info.put((String)entry.getKey(), ((JsonString)entry.getValue()).getString());
                            }
                            queue.add(new DistributionQueueItem(id, info));
                        }
                        this.log.info("recovered {} items from queue {}", (Object)queue.getStatus().getItemsCount(), (Object)queueName);
                    }
                    catch (FileNotFoundException e) {
                        this.log.warn("could not read checkpoint file {}", (Object)qf.getAbsolutePath());
                    }
                    catch (JsonException e) {
                        this.log.warn("could not parse info from checkpoint file {}", (Object)qf.getAbsolutePath());
                    }
                }
            }
            for (final String queueName : queueNames) {
                options = this.scheduler.NOW(-1, 15L).canRunConcurrently(false).name(this.getJobName(queueName + "-checkpoint"));
                this.scheduler.schedule((Object)new SimpleDistributionQueueCheckpoint(this.getQueue(queueName), this.checkpointDirectory), options);
            }
        }
        for (final String queueName : queueNames) {
            options = this.scheduler.NOW(-1, 1L).canRunConcurrently(false).name(this.getJobName(queueName));
            this.scheduler.schedule((Object)new SimpleDistributionQueueProcessor(this.getQueue(queueName), queueProcessor), options);
        }
    }

    @Override
    public void disableQueueProcessing() {
        for (DistributionQueue distributionQueue : this.getQueues()) {
            String queueName = distributionQueue.getName();
            if (this.scheduler.unschedule(this.getJobName(queueName))) {
                this.log.debug("queue processing on {} stopped", (Object)distributionQueue);
            } else {
                this.log.warn("could not disable queue processing on {}", (Object)distributionQueue);
            }
            if (!this.checkpoint) continue;
            if (this.scheduler.unschedule(this.getJobName(queueName) + "-checkpoint")) {
                this.log.debug("checkpoint on {} stopped", (Object)distributionQueue);
                continue;
            }
            this.log.warn("could not disable checkpoint on {}", (Object)distributionQueue);
        }
    }

    private String getJobName(String queueName) {
        return "simple-queueProcessor-" + this.name + "-" + queueName;
    }
}

