/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.queue.impl;

import com.google.protobuf.GeneratedMessage;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.messages.Messages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
public class RangePoller {
    private static final Logger LOG = LoggerFactory.getLogger(RangePoller.class);
    private final long maxOffset;
    private final long minOffset;
    private final Closeable headPoller;
    private final CountDownLatch fetched = new CountDownLatch(1);
    private final List<FullMessage<Messages.PackageMessage>> messages;

    public RangePoller(MessagingProvider messagingProvider, String packageTopic, long minOffset, long maxOffset) {
        this.maxOffset = maxOffset;
        this.minOffset = minOffset;
        this.messages = new ArrayList<FullMessage<Messages.PackageMessage>>();
        String assign = messagingProvider.assignTo(minOffset);
        LOG.info("Fetching offsets [{},{}[", (Object)minOffset, (Object)maxOffset);
        this.headPoller = messagingProvider.createPoller(packageTopic, Reset.earliest, assign, new HandlerAdapter[]{HandlerAdapter.create(Messages.PackageMessage.class, this::handlePackage)});
    }

    public List<FullMessage<Messages.PackageMessage>> fetchRange() throws InterruptedException {
        try {
            this.fetched.await();
            LOG.info("Fetched offsets [{},{}[", (Object)this.minOffset, (Object)this.maxOffset);
            List<FullMessage<Messages.PackageMessage>> list = this.messages;
            return list;
        }
        finally {
            IOUtils.closeQuietly((Closeable)this.headPoller);
        }
    }

    private void handlePackage(MessageInfo info, Messages.PackageMessage message) {
        long offset = info.getOffset();
        LOG.debug(String.format("Reading offset %s", offset));
        if (offset < this.maxOffset) {
            this.messages.add((FullMessage<Messages.PackageMessage>)new FullMessage(info, (GeneratedMessage)message));
        } else {
            this.fetched.countDown();
        }
    }
}

