/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.shared;

import com.google.protobuf.GeneratedMessage;
import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.messages.Messages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
public class LimitPoller {
    private static final Duration CONNECT_TIMEOUT = Duration.ofSeconds(30L);
    private final Logger log = LoggerFactory.getLogger(LimitPoller.class);
    private final long minOffset;
    private final long maxMessages;
    private final Closeable headPoller;
    private final Queue<FullMessage<Messages.PackageMessage>> messages;
    private final Semaphore nextMessage;

    public LimitPoller(MessagingProvider messagingProvider, String packageTopic, long minOffset, long maxMessages) {
        this.minOffset = minOffset;
        this.maxMessages = maxMessages;
        this.messages = new ConcurrentLinkedQueue<FullMessage<Messages.PackageMessage>>();
        this.nextMessage = new Semaphore(0);
        String assign = messagingProvider.assignTo(minOffset);
        this.log.info("Fetching {} messages starting from {}", (Object)maxMessages, (Object)minOffset);
        this.headPoller = messagingProvider.createPoller(packageTopic, Reset.earliest, assign, new HandlerAdapter[]{HandlerAdapter.create(Messages.PackageMessage.class, this::handlePackage)});
    }

    public List<FullMessage<Messages.PackageMessage>> fetch(Duration timeOut) {
        try {
            boolean timeout = this.nextMessage.tryAcquire(CONNECT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
            while (!timeout && (long)this.messages.size() < this.maxMessages) {
                timeout = !this.nextMessage.tryAcquire(timeOut.toMillis(), TimeUnit.MILLISECONDS);
            }
            ArrayList<FullMessage<Messages.PackageMessage>> result = new ArrayList<FullMessage<Messages.PackageMessage>>(this.messages);
            this.log.info("Fetched {} messages", (Object)result.size());
            ArrayList<FullMessage<Messages.PackageMessage>> arrayList = result;
            return arrayList;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e.getMessage(), e);
        }
        finally {
            IOUtils.closeQuietly((Closeable)this.headPoller);
        }
    }

    private void handlePackage(MessageInfo info, Messages.PackageMessage message) {
        long offset = info.getOffset();
        this.log.debug("Reading offset {}", (Object)offset);
        if ((long)this.messages.size() < this.maxMessages && info.getOffset() >= this.minOffset) {
            this.messages.add((FullMessage<Messages.PackageMessage>)new FullMessage(info, (GeneratedMessage)message));
        }
        this.nextMessage.release();
    }
}

