/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.event.impl.jobs.queues;

import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.event.impl.EventingThreadPool;
import org.apache.sling.event.impl.jobs.InternalJobState;
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.JobTopicTraverser;
import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
import org.apache.sling.event.impl.jobs.queues.JobExecutionContextImpl;
import org.apache.sling.event.impl.jobs.queues.JobExecutionResultImpl;
import org.apache.sling.event.impl.jobs.queues.OutdatedJobQueueInfo;
import org.apache.sling.event.impl.jobs.queues.QueueJobCache;
import org.apache.sling.event.impl.jobs.queues.QueueServices;
import org.apache.sling.event.impl.support.BatchResourceRemover;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.QueueConfiguration;
import org.apache.sling.event.jobs.Statistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobQueueImpl
implements Queue {
    private static final long MAX_SUSPEND_TIME = 3600000L;
    private final Logger logger;
    private final InternalQueueConfiguration configuration;
    private volatile String queueName;
    private volatile boolean running;
    private final AtomicLong suspendedSince = new AtomicLong(-1L);
    private final QueueServices services;
    private final Map<String, JobHandler> processingJobsLists = new HashMap<String, JobHandler>();
    private final ThreadPool threadPool;
    private final AtomicInteger asyncCounter = new AtomicInteger();
    private final AtomicBoolean isOutdated = new AtomicBoolean(false);
    private final AtomicBoolean closeMarker = new AtomicBoolean(false);
    private final AtomicBoolean doFullCacheSearch = new AtomicBoolean(false);
    private final AtomicInteger waitCounter = new AtomicInteger();
    private final QueueJobCache cache;
    private final Semaphore available;
    private final int maxParallel;
    private final Semaphore drainage;
    private final AtomicBoolean startJobsGuard = new AtomicBoolean(false);
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private volatile long isSleepingUntil = -1L;

    public static JobQueueImpl createQueue(String name, InternalQueueConfiguration config, QueueServices services, Set<String> topics, Set<String> haltedTopicsBackRef, OutdatedJobQueueInfo outdatedQueueInfo) {
        QueueJobCache cache = new QueueJobCache(services.configuration, name, services.statisticsManager, config.getType(), topics);
        if (!cache.getNewlyHaltedTopics().isEmpty()) {
            for (String haltedTopic : cache.getNewlyHaltedTopics()) {
                if (!haltedTopicsBackRef.add(haltedTopic)) continue;
                LoggerFactory.getLogger((String)(JobQueueImpl.class.getName() + '.' + name)).warn("createQueue : topic halted due to ClassNotFoundExceptions : " + haltedTopic);
            }
        }
        if (cache.isEmpty()) {
            return null;
        }
        return new JobQueueImpl(name, config, services, cache, outdatedQueueInfo);
    }

    private JobQueueImpl(String name, InternalQueueConfiguration config, QueueServices services, QueueJobCache cache, OutdatedJobQueueInfo outdatedQueue) {
        this.threadPool = config.getOwnThreadPoolSize() > 0 ? new EventingThreadPool(services.threadPoolManager, config.getOwnThreadPoolSize()) : services.eventingThreadPool;
        this.queueName = name;
        this.configuration = config;
        this.services = services;
        this.logger = LoggerFactory.getLogger((String)(this.getClass().getName() + '.' + name));
        this.running = true;
        this.cache = cache;
        this.maxParallel = config.getMaxParallel();
        if (outdatedQueue == null) {
            this.available = new Semaphore(this.maxParallel, true);
            this.drainage = new Semaphore(0, true);
        } else {
            this.available = outdatedQueue.getAvailable();
            this.drainage = outdatedQueue.getDrainage();
            int oldMaxParallel = outdatedQueue.getMaxParallel();
            int maxParallelDiff = this.maxParallel - oldMaxParallel;
            int drainedOldDrainage = 0;
            int drainedOldAvailable = 0;
            if (maxParallelDiff != 0) {
                drainedOldDrainage = this.drainage.drainPermits();
                drainedOldAvailable = this.available.drainPermits();
                int netNewPermits = drainedOldAvailable - drainedOldDrainage + maxParallelDiff;
                if (netNewPermits > 0) {
                    this.available.release(netNewPermits);
                } else if (netNewPermits < 0) {
                    this.drainage.release(-netNewPermits);
                }
            }
            this.logger.info("<init> reused outdated queue info: queueName : {}, old available : {}, old drainage : {}, old maxParallel : {}, new available : {}, new drainage : {}, new maxParallel : {}", new Object[]{this.queueName, drainedOldAvailable, drainedOldDrainage, oldMaxParallel, this.available.availablePermits(), this.drainage.availablePermits(), this.maxParallel});
        }
        this.logger.info("Starting job queue {}", (Object)this.queueName);
        this.logger.debug("Configuration for job queue={}", (Object)this.configuration);
    }

    @Override
    public InternalQueueConfiguration getConfiguration() {
        return this.configuration;
    }

    @Override
    public String getName() {
        return this.queueName;
    }

    @Override
    public Statistics getStatistics() {
        return this.services.statisticsManager.getQueueStatistics(this.queueName);
    }

    public void startJobs() {
        if (this.startJobsGuard.compareAndSet(false, true)) {
            while (this.running && !this.isOutdated.get() && !this.isSuspended() && this.available.tryAcquire()) {
                boolean started = false;
                this.lock.writeLock().lock();
                try {
                    final JobHandler handler = this.cache.getNextJob(this.services.jobConsumerManager, this.services.statisticsManager, this, this.doFullCacheSearch.getAndSet(false));
                    if (handler == null) break;
                    started = true;
                    this.threadPool.execute(new Runnable(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        public void run() {
                            Thread currentThread = Thread.currentThread();
                            String oldName = currentThread.getName();
                            int oldPriority = currentThread.getPriority();
                            currentThread.setName(oldName + "-" + handler.getJob().getQueueName() + "(" + handler.getJob().getTopic() + ")");
                            if (JobQueueImpl.this.configuration.getThreadPriority() != null) {
                                switch (JobQueueImpl.this.configuration.getThreadPriority()) {
                                    case NORM: {
                                        currentThread.setPriority(5);
                                        break;
                                    }
                                    case MIN: {
                                        currentThread.setPriority(1);
                                        break;
                                    }
                                    case MAX: {
                                        currentThread.setPriority(10);
                                    }
                                }
                            }
                            try {
                                JobQueueImpl.this.startJob(handler);
                            }
                            finally {
                                currentThread.setPriority(oldPriority);
                                currentThread.setName(oldName);
                            }
                            JobQueueImpl.this.startJobs();
                        }
                    });
                }
                finally {
                    if (!started) {
                        this.available.release();
                    }
                    this.lock.writeLock().unlock();
                }
            }
            this.startJobsGuard.set(false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startJob(JobHandler handler) {
        block31: {
            try {
                this.closeMarker.set(false);
                try {
                    final JobImpl job = handler.getJob();
                    handler.started = System.currentTimeMillis();
                    this.services.configuration.getAuditLogger().debug("START OK : {}", (Object)job.getId());
                    Calendar queued = (Calendar)((Object)job.getProperty("event.job.queued.time", Calendar.class));
                    if (queued == null) {
                        queued = Calendar.getInstance();
                        queued.setTimeInMillis(System.currentTimeMillis() - 10000L);
                    }
                    long queueTime = handler.started - queued.getTimeInMillis();
                    this.services.statisticsManager.jobStarted(this.queueName, job.getTopic(), queueTime);
                    NotificationUtility.sendNotification(this.services.eventAdmin, "org/apache/sling/event/notification/job/START", job, queueTime);
                    Map<String, JobHandler> map = this.processingJobsLists;
                    synchronized (map) {
                        this.processingJobsLists.put(job.getId(), handler);
                    }
                    JobExecutionResultImpl result = JobExecutionResultImpl.CANCELLED;
                    Job.JobState resultState = Job.JobState.ERROR;
                    JobExecutionContextImpl ctx = new JobExecutionContextImpl(handler, new JobExecutionContextImpl.ASyncHandler(){

                        @Override
                        public void finished(Job.JobState state) {
                            ((JobQueueImpl)JobQueueImpl.this).services.jobConsumerManager.unregisterListener(job.getId());
                            JobQueueImpl.this.finishedJob(job.getId(), state, true);
                            JobQueueImpl.this.asyncCounter.decrementAndGet();
                        }
                    });
                    try {
                        JobExecutionContextImpl jobExecutionContextImpl = ctx;
                        synchronized (jobExecutionContextImpl) {
                            result = (JobExecutionResultImpl)handler.getConsumer().process(job, ctx);
                            if (result == null) {
                                this.services.jobConsumerManager.registerListener(job.getId(), handler.getConsumer(), ctx);
                                this.asyncCounter.incrementAndGet();
                                ctx.markAsync();
                            } else if (result.succeeded()) {
                                resultState = Job.JobState.SUCCEEDED;
                            } else if (result.failed()) {
                                resultState = Job.JobState.QUEUED;
                            } else if (result.cancelled()) {
                                resultState = handler.isStopped() ? Job.JobState.STOPPED : Job.JobState.ERROR;
                            }
                        }
                        if (result == null) break block31;
                        if (result.getRetryDelayInMs() != null) {
                            job.setProperty(":slingevent:delayOverride", result.getRetryDelayInMs());
                        }
                        if (result.getMessage() != null) {
                            job.setProperty("slingevent:resultMessage", result.getMessage());
                        }
                        this.finishedJob(job.getId(), resultState, false);
                    }
                    catch (Throwable t) {
                        try {
                            this.logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t);
                            result = JobExecutionResultImpl.CANCELLED;
                            resultState = Job.JobState.ERROR;
                        }
                        catch (Throwable throwable) {
                            throw throwable;
                        }
                        finally {
                            if (result != null) {
                                if (result.getRetryDelayInMs() != null) {
                                    job.setProperty(":slingevent:delayOverride", result.getRetryDelayInMs());
                                }
                                if (result.getMessage() != null) {
                                    job.setProperty("slingevent:resultMessage", result.getMessage());
                                }
                                this.finishedJob(job.getId(), resultState, false);
                            }
                        }
                    }
                }
                catch (Exception re) {
                    this.logger.error("Exception during job processing.", (Throwable)re);
                }
            }
            finally {
                if (this.drainage.tryAcquire()) {
                    int approxPermits = this.drainage.availablePermits();
                    this.logger.debug("startJobHandler: drained 1 permit for {}, approx left to drain: {}", (Object)this.queueName, (Object)approxPermits);
                } else {
                    this.available.release();
                }
            }
        }
    }

    public void outdate() {
        if (this.isOutdated.compareAndSet(false, true)) {
            String name = this.getName() + "<outdated>(" + this.hashCode() + ")";
            this.logger.info("Outdating queue {}, renaming to {}.", (Object)this.queueName, (Object)name);
            this.queueName = name;
        }
    }

    public boolean tryToClose() {
        this.resume();
        this.lock.writeLock().lock();
        try {
            if (this.canBeClosed()) {
                if (this.closeMarker.get()) {
                    this.close();
                    boolean bl = true;
                    return bl;
                }
                this.closeMarker.set(true);
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
        return false;
    }

    private boolean canBeClosed() {
        return !this.isSuspended() && this.asyncCounter.get() == 0 && this.waitCounter.get() == 0 && this.available.availablePermits() == this.configuration.getMaxParallel();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        this.running = false;
        this.logger.debug("Shutting down job queue {}", (Object)this.queueName);
        this.resume();
        Map<String, JobHandler> map = this.processingJobsLists;
        synchronized (map) {
            this.processingJobsLists.clear();
        }
        if (this.configuration.getOwnThreadPoolSize() > 0) {
            ((EventingThreadPool)this.threadPool).release();
        }
        this.logger.info("Stopped job queue {}", (Object)this.queueName);
    }

    public void maintain() {
        long since = this.suspendedSince.get();
        if (since != -1L && since + 3600000L < System.currentTimeMillis()) {
            this.logger.info("Waking up suspended queue. It has been suspended for more than {}ms", (Object)3600000L);
            this.resume();
        }
        this.doFullCacheSearch.set(true);
        this.startJobs();
    }

    public void wakeUpQueue(Set<String> topics) {
        this.cache.handleNewTopics(topics);
    }

    private void requeue(JobHandler handler) {
        this.cache.reschedule(this.queueName, handler, this.services.statisticsManager);
        this.startJobs();
    }

    private RescheduleInfo handleReschedule(JobHandler handler, Job.JobState resultState) {
        RescheduleInfo info = new RescheduleInfo();
        info.state = resultState;
        switch (resultState) {
            case SUCCEEDED: {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Finished job {}", (Object)Utility.toString(handler.getJob()));
                }
                info.processingTime = System.currentTimeMillis() - handler.started;
                info.finalState = InternalJobState.SUCCEEDED;
                break;
            }
            case QUEUED: {
                int retries = handler.getJob().getProperty("event.job.retries", 0);
                int retryCount = handler.getJob().getProperty("event.job.retrycount", 0);
                if (retries != -1 && ++retryCount > retries) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.error("Cancelled job {} after {} unsuccessful retries", (Object)Utility.toString(handler.getJob()), (Object)retries);
                    }
                    info.finalState = InternalJobState.CANCELLED;
                    break;
                }
                info.reschedule = true;
                handler.getJob().retry();
                this.logger.warn("Failed job {}, will retry {} more time(s), retryCount={}", new Object[]{Utility.toString(handler.getJob()), retries - retryCount, retryCount});
                info.finalState = InternalJobState.FAILED;
                break;
            }
            default: {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Cancelled job {}", (Object)Utility.toString(handler.getJob()));
                }
                info.finalState = InternalJobState.CANCELLED;
            }
        }
        if (info.state == Job.JobState.QUEUED && !info.reschedule) {
            info.state = Job.JobState.GIVEN_UP;
        }
        return info;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean finishedJob(String jobId, Job.JobState resultState, boolean isAsync) {
        JobHandler handler;
        this.services.configuration.getAuditLogger().debug("FINISHED {} : {}", (Object)resultState, (Object)jobId);
        this.logger.debug("Received finish for job {}, resultState={}", (Object)jobId, (Object)resultState);
        Map<String, JobHandler> map = this.processingJobsLists;
        synchronized (map) {
            handler = this.processingJobsLists.remove(jobId);
        }
        if (!this.running) {
            this.logger.warn("Queue is not running anymore. Discarding finish for {}", (Object)jobId);
            return false;
        }
        if (handler == null) {
            this.logger.warn("This job has never been started by this queue: {}", (Object)jobId);
            return false;
        }
        RescheduleInfo rescheduleInfo = this.handleReschedule(handler, resultState);
        if (!rescheduleInfo.reschedule) {
            boolean keepJobs = rescheduleInfo.state != Job.JobState.SUCCEEDED || this.configuration.isKeepJobs();
            handler.finished(rescheduleInfo.state, keepJobs, rescheduleInfo.processingTime);
        } else {
            this.reschedule(handler);
        }
        this.services.statisticsManager.jobEnded(this.queueName, handler.getJob().getTopic(), rescheduleInfo.finalState, rescheduleInfo.processingTime);
        NotificationUtility.sendNotification(this.services.eventAdmin, rescheduleInfo.finalState.getTopic(), handler.getJob(), rescheduleInfo.processingTime);
        return rescheduleInfo.reschedule;
    }

    @Override
    public void resume() {
        if (this.suspendedSince.getAndSet(-1L) != -1L) {
            this.logger.debug("Waking up suspended queue {}", (Object)this.queueName);
            this.startJobs();
        }
    }

    @Override
    public void suspend() {
        if (this.suspendedSince.compareAndSet(-1L, System.currentTimeMillis())) {
            this.logger.debug("Suspending queue {}", (Object)this.queueName);
        }
    }

    @Override
    public boolean isSuspended() {
        return this.suspendedSince.get() != -1L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void removeAll() {
        Set<String> topics = this.cache.getTopics();
        this.logger.debug("Removing all jobs for queue {} : {}", (Object)this.queueName, topics);
        if (!topics.isEmpty()) {
            try (ResourceResolver resolver = this.services.configuration.createResourceResolver();){
                Resource baseResource = resolver.getResource(this.services.configuration.getLocalJobsPath());
                if (baseResource != null) {
                    final BatchResourceRemover brr = new BatchResourceRemover();
                    for (String t : topics) {
                        final Resource topicResource = baseResource.getChild(t.replace('/', '.'));
                        if (topicResource == null) continue;
                        JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.JobCallback(){

                            @Override
                            public boolean handle(JobImpl job) {
                                Resource jobResource = topicResource.getResourceResolver().getResource(job.getResourcePath());
                                if (jobResource != null) {
                                    try {
                                        brr.delete(jobResource);
                                    }
                                    catch (PersistenceException ignore) {
                                        JobQueueImpl.this.logger.error("Unable to remove job " + job, (Throwable)ignore);
                                        topicResource.getResourceResolver().revert();
                                        topicResource.getResourceResolver().refresh();
                                    }
                                }
                                return true;
                            }
                        });
                    }
                    try {
                        resolver.commit();
                    }
                    catch (PersistenceException ignore) {
                        this.logger.error("Unable to remove jobs", (Throwable)ignore);
                    }
                }
            }
        }
    }

    @Override
    public Object getState(String key) {
        if (this.configuration.getType() == QueueConfiguration.Type.ORDERED && "isSleepingUntil".equals(key)) {
            return this.isSleepingUntil;
        }
        return null;
    }

    @Override
    public String getStateInfo() {
        return "outdated=" + this.isOutdated.get() + ", suspendedSince=" + this.suspendedSince.get() + ", asyncJobs=" + this.asyncCounter.get() + ", waitCount=" + this.waitCounter.get() + ", jobCount=" + String.valueOf(this.configuration.getMaxParallel() - this.available.availablePermits() + (this.configuration.getType() == QueueConfiguration.Type.ORDERED ? ", isSleepingUntil=" + this.isSleepingUntil : ""));
    }

    private long getRetryDelay(JobHandler handler) {
        long delay = this.configuration.getRetryDelayInMs();
        if (handler.getJob().getProperty(":slingevent:delayOverride") != null) {
            delay = (Long)((Object)handler.getJob().getProperty(":slingevent:delayOverride", Long.class));
        } else if (handler.getJob().getProperty("event.job.retrydelay") != null) {
            delay = (Long)((Object)handler.getJob().getProperty("event.job.retrydelay", Long.class));
        }
        return delay;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean stopJob(JobImpl job) {
        JobHandler handler;
        Map<String, JobHandler> map = this.processingJobsLists;
        synchronized (map) {
            handler = this.processingJobsLists.get(job.getId());
        }
        if (handler != null) {
            handler.stop();
        }
        return handler != null;
    }

    private void reschedule(final JobHandler handler) {
        long delay = this.getRetryDelay(handler);
        if (delay > 0L) {
            if (this.configuration.getType() == QueueConfiguration.Type.ORDERED) {
                this.cache.setIsBlocked(true);
            }
            handler.addToRetryList();
            Date fireDate = new Date();
            fireDate.setTime(System.currentTimeMillis() + delay);
            if (this.configuration.getType() == QueueConfiguration.Type.ORDERED) {
                this.isSleepingUntil = fireDate.getTime();
            }
            final Runnable t = new Runnable(){

                @Override
                public void run() {
                    try {
                        if (handler.removeFromRetryList()) {
                            JobQueueImpl.this.requeue(handler);
                        }
                        JobQueueImpl.this.waitCounter.decrementAndGet();
                    }
                    finally {
                        if (JobQueueImpl.this.configuration.getType() == QueueConfiguration.Type.ORDERED) {
                            JobQueueImpl.this.isSleepingUntil = -1L;
                            JobQueueImpl.this.cache.setIsBlocked(false);
                            JobQueueImpl.this.startJobs();
                        }
                    }
                }
            };
            this.waitCounter.incrementAndGet();
            Timer timer = new Timer();
            timer.schedule(new TimerTask(){

                @Override
                public void run() {
                    t.run();
                }
            }, delay);
        } else {
            this.requeue(handler);
        }
    }

    QueueJobCache getCache() {
        return this.cache;
    }

    OutdatedJobQueueInfo getOutdatedJobQueueInfo() {
        return new OutdatedJobQueueInfo(this.available, this.maxParallel, this.drainage);
    }

    private static final class RescheduleInfo {
        public boolean reschedule = false;
        public long processingTime;
        public Job.JobState state;
        public InternalJobState finalState;

        private RescheduleInfo() {
        }
    }
}

