/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tika.pipes.async;

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.tika.exception.TikaException;
import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.PipesClient;
import org.apache.tika.pipes.PipesException;
import org.apache.tika.pipes.PipesResult;
import org.apache.tika.pipes.async.AsyncConfig;
import org.apache.tika.pipes.async.AsyncEmitter;
import org.apache.tika.pipes.async.OfferLargerThanQueueSize;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.EmitterManager;
import org.apache.tika.pipes.pipesiterator.PipesIterator;
import org.xml.sax.SAXException;

public class AsyncProcessor
implements Closeable {
    static final int PARSER_FUTURE_CODE = 1;
    private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
    private final ArrayBlockingQueue<EmitData> emitData;
    private final ExecutorCompletionService<Integer> executorCompletionService;
    private final ExecutorService executorService;
    private final AsyncConfig asyncConfig;
    private int numParserThreadsFinished = 0;
    private boolean addedEmitterSemaphores = false;
    private int finished = 0;
    boolean isShuttingDown = false;

    public AsyncProcessor(Path tikaConfigPath) throws TikaException, IOException, SAXException {
        this.asyncConfig = AsyncConfig.load(tikaConfigPath);
        this.fetchEmitTuples = new ArrayBlockingQueue(this.asyncConfig.getQueueSize());
        this.emitData = new ArrayBlockingQueue(100);
        this.executorService = Executors.newFixedThreadPool(this.asyncConfig.getNumClients() + this.asyncConfig.getNumEmitters());
        this.executorCompletionService = new ExecutorCompletionService(this.executorService);
        for (int i = 0; i < this.asyncConfig.getNumClients(); ++i) {
            this.executorCompletionService.submit(new FetchEmitWorker(this.asyncConfig, this.fetchEmitTuples, this.emitData));
        }
        EmitterManager emitterManager = EmitterManager.load(tikaConfigPath);
        for (int i = 0; i < this.asyncConfig.getNumEmitters(); ++i) {
            this.executorCompletionService.submit(new AsyncEmitter(this.asyncConfig, this.emitData, emitterManager));
        }
    }

    public synchronized boolean offer(List<FetchEmitTuple> newFetchEmitTuples, long offerMs) throws PipesException, InterruptedException {
        if (this.isShuttingDown) {
            throw new IllegalStateException("Can't call offer after calling close() or shutdownNow()");
        }
        if (newFetchEmitTuples.size() > this.asyncConfig.getQueueSize()) {
            throw new OfferLargerThanQueueSize(newFetchEmitTuples.size(), this.asyncConfig.getQueueSize());
        }
        long start = System.currentTimeMillis();
        long elapsed = System.currentTimeMillis() - start;
        while (elapsed < offerMs) {
            if (this.fetchEmitTuples.remainingCapacity() > newFetchEmitTuples.size()) {
                try {
                    this.fetchEmitTuples.addAll(newFetchEmitTuples);
                    return true;
                }
                catch (IllegalStateException e) {
                    e.printStackTrace();
                }
            }
            Thread.sleep(100L);
            elapsed = System.currentTimeMillis() - start;
        }
        return false;
    }

    public int getCapacity() {
        return this.fetchEmitTuples.remainingCapacity();
    }

    public synchronized boolean offer(FetchEmitTuple t, long offerMs) throws PipesException, InterruptedException {
        if (this.fetchEmitTuples == null) {
            throw new IllegalStateException("queue hasn't been initialized yet.");
        }
        if (this.isShuttingDown) {
            throw new IllegalStateException("Can't call offer after calling close() or shutdownNow()");
        }
        this.checkActive();
        return this.fetchEmitTuples.offer(t, offerMs, TimeUnit.MILLISECONDS);
    }

    public void finished() throws InterruptedException {
        for (int i = 0; i < this.asyncConfig.getNumClients(); ++i) {
            this.fetchEmitTuples.offer(PipesIterator.COMPLETED_SEMAPHORE);
        }
    }

    public boolean checkActive() {
        Future<Integer> future = this.executorCompletionService.poll();
        if (future != null) {
            try {
                Integer i = future.get();
                if (i == 1) {
                    ++this.numParserThreadsFinished;
                }
            }
            catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
            ++this.finished;
        }
        if (this.numParserThreadsFinished == this.asyncConfig.getNumClients() && !this.addedEmitterSemaphores) {
            for (int i = 0; i < this.asyncConfig.getNumEmitters(); ++i) {
                this.emitData.offer(AsyncEmitter.EMIT_DATA_STOP_SEMAPHORE);
            }
            this.addedEmitterSemaphores = true;
        }
        return this.finished != this.asyncConfig.getNumClients() + this.asyncConfig.getNumEmitters();
    }

    @Override
    public void close() throws IOException {
        this.executorService.shutdownNow();
    }

    private class FetchEmitWorker
    implements Callable<Integer> {
        private final AsyncConfig asyncConfig;
        private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
        private final ArrayBlockingQueue<EmitData> emitDataQueue;

        private FetchEmitWorker(AsyncConfig asyncConfig, ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples, ArrayBlockingQueue<EmitData> emitDataQueue) {
            this.asyncConfig = asyncConfig;
            this.fetchEmitTuples = fetchEmitTuples;
            this.emitDataQueue = emitDataQueue;
        }

        /*
         * Unable to fully structure code
         */
        @Override
        public Integer call() throws Exception {
            pipesClient = new PipesClient(this.asyncConfig);
            var2_2 = null;
            try {
                while (true) {
                    if ((t = this.fetchEmitTuples.poll(1L, TimeUnit.SECONDS)) == null) ** GOTO lbl23
                    if (t == PipesIterator.COMPLETED_SEMAPHORE) {
                        var4_5 = 1;
                        return var4_5;
                    }
                    result = null;
                    try {
                        result = pipesClient.process(t);
                    }
                    catch (IOException e) {
                        result = PipesResult.UNSPECIFIED_CRASH;
                    }
                    switch (1.$SwitchMap$org$apache$tika$pipes$PipesResult$STATUS[result.getStatus().ordinal()]) {
                        case 1: {
                            this.emitDataQueue.offer(result.getEmitData());
                            break;
                        }
                        case 2: {
                            break;
                        }
                    }
lbl23:
                    // 4 sources

                    AsyncProcessor.this.checkActive();
                    continue;
                    break;
                }
                catch (Throwable var3_4) {
                    var2_2 = var3_4;
                    throw var3_4;
                }
            }
            finally {
                if (pipesClient != null) {
                    if (var2_2 != null) {
                        try {
                            pipesClient.close();
                        }
                        catch (Throwable var5_7) {
                            var2_2.addSuppressed(var5_7);
                        }
                    } else {
                        pipesClient.close();
                    }
                }
            }
        }
    }
}

