/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.queue.impl;

import com.google.protobuf.GeneratedMessage;
import java.io.Closeable;
import java.util.UUID;
import java.util.function.LongConsumer;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.RunnableUtil;
import org.apache.sling.distribution.journal.messages.Messages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueCacheSeeder
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(QueueCacheSeeder.class);
    private static final long CACHE_SEEDING_DELAY_MS = 10000L;
    private final String topic;
    private final MessagingProvider provider;
    private volatile Closeable poller;
    private volatile boolean closed;

    public QueueCacheSeeder(MessagingProvider provider, String topic) {
        this.provider = provider;
        this.topic = topic;
    }

    public void seedOne() {
        RunnableUtil.startBackgroundThread(this::sendSeedingMessage, (String)"Seeder thread - one seed");
    }

    public void seed(LongConsumer callback) {
        this.poller = this.provider.createPoller(this.topic, Reset.latest, new HandlerAdapter[]{HandlerAdapter.create(Messages.PackageMessage.class, (info, msg) -> {
            this.close();
            callback.accept(info.getOffset());
        })});
        RunnableUtil.startBackgroundThread(this::sendSeedingMessages, (String)"Seeder thread");
    }

    @Override
    public void close() {
        this.closed = true;
        IOUtils.closeQuietly((Closeable)this.poller);
    }

    private void sendSeedingMessages() {
        LOG.info("Start message seeder");
        try {
            MessageSender sender = this.provider.createSender();
            while (!this.closed) {
                this.sendSeedingMessage((MessageSender<Messages.PackageMessage>)sender);
                QueueCacheSeeder.delay(10000L);
            }
        }
        finally {
            LOG.info("Stop message seeder");
        }
    }

    private void sendSeedingMessage() {
        this.sendSeedingMessage((MessageSender<Messages.PackageMessage>)this.provider.createSender());
    }

    private void sendSeedingMessage(MessageSender<Messages.PackageMessage> sender) {
        Messages.PackageMessage pkgMsg = this.createTestMessage();
        LOG.info("Send seeding message");
        try {
            sender.send(this.topic, (GeneratedMessage)pkgMsg);
        }
        catch (MessagingException e) {
            LOG.warn(e.getMessage(), (Throwable)e);
            QueueCacheSeeder.delay(100000L);
        }
    }

    private static void delay(long sleepMs) {
        try {
            Thread.sleep(sleepMs);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    protected Messages.PackageMessage createTestMessage() {
        String pkgId = UUID.randomUUID().toString();
        return Messages.PackageMessage.newBuilder().setPubSlingId("seeder").setPkgId(pkgId).setPkgType("seeder").setReqType(Messages.PackageMessage.ReqType.TEST).build();
    }
}

