package org.wcc.framework.util.thread.pubsub;

import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import org.wcc.framework.AppRuntimeException;
import org.wcc.framework.log.AppLogger;
import org.wcc.framework.util.queue.IQueue;
import org.wcc.framework.util.thread.Counter;
import org.wcc.framework.util.thread.RunWrapper;
import org.wcc.framework.util.thread.task.TaskImp;
import org.wcc.framework.util.thread.task.TaskThreadPool;

/* loaded from: input_file:org/wcc/framework/util/thread/pubsub/Subscriber.class */
public final class Subscriber {
    private static final AppLogger LOGGER = AppLogger.getInstance((Class<?>) Subscriber.class);
    private static final int THREAD_KEEP_ALIVE_TIME = 300000;
    private static final int CHECK_FOR_DONE_INTERVAL = 50;
    private Dispatcher dp;
    private TaskThreadPool wkpool;
    private Counter concurrentCounter = new Counter(0);
    private Map<String, SubWorker> docSubWorkerMap;

    /* loaded from: input_file:org/wcc/framework/util/thread/pubsub/Subscriber$Dispatcher.class */
    private static class Dispatcher extends RunWrapper {
        private TaskThreadPool wtp;
        private Counter concurrentCounter;
        private IQueue queue;
        private Map<String, SubWorker> docSubWorkerMap;

        public Dispatcher(TaskThreadPool taskThreadPool, Counter counter, IQueue iQueue, Map<String, SubWorker> map) {
            this.wtp = taskThreadPool;
            this.concurrentCounter = counter;
            this.queue = iQueue;
            this.docSubWorkerMap = map;
        }

        @Override // org.wcc.framework.util.thread.RunWrapper, java.lang.Runnable
        public void run() {
            while (true) {
                Document document = (Document) this.queue.pop();
                if (!this.wtp.runInPool(new Woker(document, this.concurrentCounter, this.docSubWorkerMap))) {
                    Subscriber.handleDoc(document, this.docSubWorkerMap);
                }
            }
        }
    }

    /* loaded from: input_file:org/wcc/framework/util/thread/pubsub/Subscriber$SubWorker.class */
    public interface SubWorker {
        void handleDocument(Document document) throws AppRuntimeException;
    }

    /* loaded from: input_file:org/wcc/framework/util/thread/pubsub/Subscriber$Woker.class */
    private static class Woker extends TaskImp {
        private Document doc;
        private Counter concurrentCounter;
        private Map<String, SubWorker> docSubWorkerMap;

        public Woker(Document document, Counter counter, Map<String, SubWorker> map) {
            this.doc = document;
            this.concurrentCounter = counter;
            this.docSubWorkerMap = map;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.wcc.framework.util.thread.task.TaskImp
        public void end() {
            this.concurrentCounter.increase();
            super.end();
        }

        @Override // org.wcc.framework.util.thread.task.TaskImp
        protected void routine() throws InterruptedException {
            Subscriber.handleDoc(this.doc, this.docSubWorkerMap);
        }
    }

    private Subscriber(int i, IQueue iQueue, Map<String, SubWorker> map) {
        this.docSubWorkerMap = map;
        this.wkpool = new TaskThreadPool(i, i, 300000L, false, new ThreadPoolExecutor.CallerRunsPolicy());
        this.dp = new Dispatcher(this.wkpool, this.concurrentCounter, iQueue, this.docSubWorkerMap);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Subscriber newInstance(int i, IQueue iQueue, Map<String, SubWorker> map) {
        return new Subscriber(i, iQueue, map);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void runAndWaitForDone(long j) {
        while (this.concurrentCounter.getCurrentValue() < j) {
            try {
                Thread.sleep(50L);
            } catch (InterruptedException e) {
                LOGGER.error("runAndWaitForDone sleep err", (Throwable) e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        this.dp.startAsDaemon();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        try {
            this.wkpool.shutdown();
        } finally {
            this.dp.stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void handleDoc(Document document, Map<String, SubWorker> map) {
        try {
            if (document == null) {
                return;
            }
            SubWorker handler = document.getHandler(map);
            if (handler == null) {
                throw new AppRuntimeException("this doc subwork is null,illegal document!");
            }
            handler.handleDocument(document);
        } catch (Throwable th) {
            LOGGER.error("handle doc err", th);
        } finally {
            document.clear();
        }
    }
}
