package org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.server.quorum;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooDefs;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.common.Time;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.server.Request;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.server.RequestProcessor;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.server.ServerMetrics;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.server.WorkerService;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.server.ZooKeeperCriticalThread;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.server.ZooKeeperServerListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/shaded/zookeeper3/org/apache/zookeeper/server/quorum/CommitProcessor.class */
public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(CommitProcessor.class);
    public static final String ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS = "zookeeper.commitProcessor.numWorkerThreads";
    public static final String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT = "zookeeper.commitProcessor.shutdownTimeout";
    public static final String ZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZE = "zookeeper.commitProcessor.maxReadBatchSize";
    public static final String ZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZE = "zookeeper.commitProcessor.maxCommitBatchSize";
    protected LinkedBlockingQueue<Request> queuedRequests;
    protected final LinkedBlockingQueue<Request> queuedWriteRequests;
    private AtomicInteger numReadQueuedRequests;
    private AtomicInteger numWriteQueuedRequests;
    protected final LinkedBlockingQueue<Request> committedRequests;
    protected final Map<Long, Deque<Request>> pendingRequests;
    protected final AtomicInteger numRequestsProcessing;
    RequestProcessor nextProcessor;
    protected volatile boolean stoppedMainLoop;
    protected volatile boolean stopped;
    private long workerShutdownTimeoutMS;
    protected WorkerService workerPool;
    private Object emptyPoolSync;
    private static volatile int maxReadBatchSize;
    private static volatile int maxCommitBatchSize;
    boolean matchSyncs;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/shaded/zookeeper3/org/apache/zookeeper/server/quorum/CommitProcessor$CommitWorkRequest.class */
    public class CommitWorkRequest extends WorkerService.WorkRequest {
        private final Request request;

        CommitWorkRequest(Request request) {
            this.request = request;
        }

        @Override // org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.server.WorkerService.WorkRequest
        public void cleanup() {
            if (CommitProcessor.this.stopped) {
                return;
            }
            CommitProcessor.LOG.error("Exception thrown by downstream processor, unable to continue.");
            CommitProcessor.this.halt();
        }

        @Override // org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.server.WorkerService.WorkRequest
        public void doWork() throws RequestProcessor.RequestProcessorException {
            try {
                CommitProcessor.processCommitMetrics(this.request, CommitProcessor.this.needCommit(this.request));
                long currentElapsedTime = Time.currentElapsedTime();
                CommitProcessor.this.nextProcessor.processRequest(this.request);
                if (CommitProcessor.this.needCommit(this.request)) {
                    ServerMetrics.getMetrics().WRITE_FINAL_PROC_TIME.add(Time.currentElapsedTime() - currentElapsedTime);
                } else {
                    ServerMetrics.getMetrics().READ_FINAL_PROC_TIME.add(Time.currentElapsedTime() - currentElapsedTime);
                }
            } finally {
                if (CommitProcessor.this.numRequestsProcessing.decrementAndGet() == 0) {
                    CommitProcessor.this.wakeupOnEmpty();
                }
            }
        }
    }

    public CommitProcessor(RequestProcessor requestProcessor, String str, boolean z, ZooKeeperServerListener zooKeeperServerListener) {
        super("CommitProcessor:" + str, zooKeeperServerListener);
        this.queuedRequests = new LinkedBlockingQueue<>();
        this.queuedWriteRequests = new LinkedBlockingQueue<>();
        this.numReadQueuedRequests = new AtomicInteger(0);
        this.numWriteQueuedRequests = new AtomicInteger(0);
        this.committedRequests = new LinkedBlockingQueue<>();
        this.pendingRequests = new HashMap(10000);
        this.numRequestsProcessing = new AtomicInteger(0);
        this.stoppedMainLoop = true;
        this.stopped = true;
        this.emptyPoolSync = new Object();
        this.nextProcessor = requestProcessor;
        this.matchSyncs = z;
    }

    private boolean isProcessingRequest() {
        return this.numRequestsProcessing.get() != 0;
    }

    protected boolean needCommit(Request request) {
        switch (request.type) {
            case ZooDefs.OpCode.closeSession /* -11 */:
            case ZooDefs.OpCode.createSession /* -10 */:
                return !request.isLocalSession();
            case 1:
            case 2:
            case 5:
            case 7:
            case 13:
            case 14:
            case 15:
            case 16:
            case 19:
            case 20:
            case 21:
            case 1001:
            case ZooDefs.OpCode.deleteQuota /* 1002 */:
                return true;
            case 9:
                return this.matchSyncs;
            default:
                return false;
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        Request poll;
        do {
            try {
                boolean z = !this.committedRequests.isEmpty();
                int size = this.queuedRequests.size();
                if (size == 0 && !z) {
                    synchronized (this) {
                        while (!this.stopped && size == 0 && !z) {
                            wait();
                            z = !this.committedRequests.isEmpty();
                            size = this.queuedRequests.size();
                        }
                    }
                }
                ServerMetrics.getMetrics().READS_QUEUED_IN_COMMIT_PROCESSOR.add(this.numReadQueuedRequests.get());
                ServerMetrics.getMetrics().WRITES_QUEUED_IN_COMMIT_PROCESSOR.add(this.numWriteQueuedRequests.get());
                ServerMetrics.getMetrics().COMMITS_QUEUED_IN_COMMIT_PROCESSOR.add(this.committedRequests.size());
                long currentElapsedTime = Time.currentElapsedTime();
                int i = 0;
                while (true) {
                    if (this.stopped || size <= 0 || ((maxReadBatchSize >= 0 && i > maxReadBatchSize) || (poll = this.queuedRequests.poll()) == null)) {
                        break;
                    }
                    size--;
                    if (needCommit(poll) || this.pendingRequests.containsKey(Long.valueOf(poll.sessionId))) {
                        this.pendingRequests.computeIfAbsent(Long.valueOf(poll.sessionId), l -> {
                            return new ArrayDeque();
                        }).addLast(poll);
                        ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(r0.size());
                    } else {
                        i++;
                        this.numReadQueuedRequests.decrementAndGet();
                        sendToNextProcessor(poll);
                    }
                    if (maxReadBatchSize < 0 && !this.pendingRequests.isEmpty() && !this.committedRequests.isEmpty()) {
                        z = true;
                        break;
                    }
                }
                ServerMetrics.getMetrics().READS_ISSUED_IN_COMMIT_PROC.add(i);
                if (!z) {
                    z = !this.committedRequests.isEmpty();
                }
                if (z && !this.stopped) {
                    waitForEmptyPool();
                    if (this.stopped) {
                        return;
                    }
                    int i2 = maxCommitBatchSize;
                    HashSet<Long> hashSet = new HashSet();
                    long currentElapsedTime2 = Time.currentElapsedTime();
                    int i3 = 0;
                    while (z && !this.stopped && i2 > 0) {
                        Request peek = this.committedRequests.peek();
                        if (!this.queuedWriteRequests.isEmpty() && this.queuedWriteRequests.peek().sessionId == peek.sessionId && this.queuedWriteRequests.peek().cxid == peek.cxid) {
                            Deque<Request> deque = this.pendingRequests.get(Long.valueOf(peek.sessionId));
                            ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(this.pendingRequests.size());
                            if (deque == null || deque.isEmpty() || !needCommit(deque.peek())) {
                                break;
                            }
                            ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(deque.size());
                            Request poll2 = deque.poll();
                            poll2.setHdr(peek.getHdr());
                            poll2.setTxn(peek.getTxn());
                            poll2.setTxnDigest(peek.getTxnDigest());
                            poll2.zxid = peek.zxid;
                            poll2.commitRecvTime = peek.commitRecvTime;
                            peek = poll2;
                            this.numWriteQueuedRequests.decrementAndGet();
                            this.queuedWriteRequests.poll();
                            hashSet.add(Long.valueOf(peek.sessionId));
                        }
                        this.committedRequests.remove();
                        i2--;
                        i3++;
                        processWrite(peek);
                        z = !this.committedRequests.isEmpty();
                    }
                    ServerMetrics.getMetrics().WRITE_BATCH_TIME_IN_COMMIT_PROCESSOR.add(Time.currentElapsedTime() - currentElapsedTime2);
                    ServerMetrics.getMetrics().WRITES_ISSUED_IN_COMMIT_PROC.add(i3);
                    int i4 = 0;
                    for (Long l2 : hashSet) {
                        Deque<Request> deque2 = this.pendingRequests.get(l2);
                        int i5 = 0;
                        while (!this.stopped && !deque2.isEmpty() && !needCommit(deque2.peek())) {
                            this.numReadQueuedRequests.decrementAndGet();
                            sendToNextProcessor(deque2.poll());
                            i5++;
                        }
                        ServerMetrics.getMetrics().READS_AFTER_WRITE_IN_SESSION_QUEUE.add(i5);
                        i4 += i5;
                        if (deque2.isEmpty()) {
                            this.pendingRequests.remove(l2);
                        }
                    }
                    ServerMetrics.getMetrics().SESSION_QUEUES_DRAINED.add(hashSet.size());
                    ServerMetrics.getMetrics().READ_ISSUED_FROM_SESSION_QUEUE.add(i4);
                }
                ServerMetrics.getMetrics().COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() - currentElapsedTime);
                endOfIteration();
            } catch (Throwable th) {
                handleException(getName(), th);
            }
        } while (!this.stoppedMainLoop);
        LOG.info("CommitProcessor exited loop!");
    }

    protected void endOfIteration() {
    }

    protected void waitForEmptyPool() throws InterruptedException {
        int i = this.numRequestsProcessing.get();
        if (i != 0) {
            ServerMetrics.getMetrics().CONCURRENT_REQUEST_PROCESSING_IN_COMMIT_PROCESSOR.add(i);
        }
        long currentElapsedTime = Time.currentElapsedTime();
        synchronized (this.emptyPoolSync) {
            while (!this.stopped && isProcessingRequest()) {
                this.emptyPoolSync.wait();
            }
        }
        ServerMetrics.getMetrics().TIME_WAITING_EMPTY_POOL_IN_COMMIT_PROCESSOR_READ.add(Time.currentElapsedTime() - currentElapsedTime);
    }

    @Override // java.lang.Thread
    public void start() {
        int intValue = Integer.getInteger(ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS, Runtime.getRuntime().availableProcessors()).intValue();
        this.workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT, 5000L).longValue();
        initBatchSizes();
        LOG.info("Configuring CommitProcessor with {} worker threads.", intValue > 0 ? Integer.valueOf(intValue) : BooleanUtils.NO);
        if (this.workerPool == null) {
            this.workerPool = new WorkerService("CommitProcWork", intValue, true);
        }
        this.stopped = false;
        this.stoppedMainLoop = false;
        super.start();
    }

    private void sendToNextProcessor(Request request) {
        this.numRequestsProcessing.incrementAndGet();
        this.workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
    }

    private void processWrite(Request request) throws RequestProcessor.RequestProcessorException {
        processCommitMetrics(request, true);
        long currentElapsedTime = Time.currentElapsedTime();
        this.nextProcessor.processRequest(request);
        ServerMetrics.getMetrics().WRITE_FINAL_PROC_TIME.add(Time.currentElapsedTime() - currentElapsedTime);
    }

    private static void initBatchSizes() {
        maxReadBatchSize = Integer.getInteger(ZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZE, -1).intValue();
        maxCommitBatchSize = Integer.getInteger(ZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZE, 1).intValue();
        if (maxCommitBatchSize <= 0) {
            throw new IllegalArgumentException("maxCommitBatchSize must be positive, was " + maxCommitBatchSize);
        }
        LOG.info("Configuring CommitProcessor with readBatchSize {} commitBatchSize {}", Integer.valueOf(maxReadBatchSize), Integer.valueOf(maxCommitBatchSize));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void processCommitMetrics(Request request, boolean z) {
        if (!z) {
            if (request.commitProcQueueStartTime != -1) {
                ServerMetrics.getMetrics().READ_COMMITPROC_TIME.add(Time.currentElapsedTime() - request.commitProcQueueStartTime);
            }
        } else if (request.commitProcQueueStartTime == -1 || request.commitRecvTime == -1) {
            if (request.commitRecvTime != -1) {
                ServerMetrics.getMetrics().SERVER_WRITE_COMMITTED_TIME.add(Time.currentElapsedTime() - request.commitRecvTime);
            }
        } else {
            long currentElapsedTime = Time.currentElapsedTime();
            ServerMetrics.getMetrics().WRITE_COMMITPROC_TIME.add(currentElapsedTime - request.commitProcQueueStartTime);
            ServerMetrics.getMetrics().LOCAL_WRITE_COMMITTED_TIME.add(currentElapsedTime - request.commitRecvTime);
        }
    }

    public static int getMaxReadBatchSize() {
        return maxReadBatchSize;
    }

    public static int getMaxCommitBatchSize() {
        return maxCommitBatchSize;
    }

    public static void setMaxReadBatchSize(int i) {
        maxReadBatchSize = i;
        LOG.info("Configuring CommitProcessor with readBatchSize {}", Integer.valueOf(maxReadBatchSize));
    }

    public static void setMaxCommitBatchSize(int i) {
        if (i > 0) {
            maxCommitBatchSize = i;
            LOG.info("Configuring CommitProcessor with commitBatchSize {}", Integer.valueOf(maxCommitBatchSize));
        }
    }

    @SuppressFBWarnings({"NN_NAKED_NOTIFY"})
    private synchronized void wakeup() {
        notifyAll();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void wakeupOnEmpty() {
        synchronized (this.emptyPoolSync) {
            this.emptyPoolSync.notifyAll();
        }
    }

    public void commit(Request request) {
        if (this.stopped || request == null) {
            return;
        }
        LOG.debug("Committing request:: {}", request);
        request.commitRecvTime = Time.currentElapsedTime();
        ServerMetrics.getMetrics().COMMITS_QUEUED.add(1L);
        this.committedRequests.add(request);
        wakeup();
    }

    @Override // org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.server.RequestProcessor
    public void processRequest(Request request) {
        if (this.stopped) {
            return;
        }
        LOG.debug("Processing request:: {}", request);
        request.commitProcQueueStartTime = Time.currentElapsedTime();
        this.queuedRequests.add(request);
        if (needCommit(request)) {
            this.queuedWriteRequests.add(request);
            this.numWriteQueuedRequests.incrementAndGet();
        } else {
            this.numReadQueuedRequests.incrementAndGet();
        }
        wakeup();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void halt() {
        this.stoppedMainLoop = true;
        this.stopped = true;
        wakeupOnEmpty();
        wakeup();
        this.queuedRequests.clear();
        if (this.workerPool != null) {
            this.workerPool.stop();
        }
    }

    @Override // org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.server.RequestProcessor
    public void shutdown() {
        LOG.info("Shutting down");
        halt();
        if (this.workerPool != null) {
            this.workerPool.join(this.workerShutdownTimeoutMS);
        }
        if (this.nextProcessor != null) {
            this.nextProcessor.shutdown();
        }
    }
}
