package org.apache.flink.streaming.kinesis.test;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/kinesis/test/KinesisExampleTest.class */
public class KinesisExampleTest {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisExampleTest.class);

    public static void main(String[] strArr) throws Exception {
        List list;
        LOG.info("System properties: {}", System.getProperties());
        ParameterTool fromArgs = ParameterTool.fromArgs(strArr);
        String required = fromArgs.getRequired("input-stream");
        String required2 = fromArgs.getRequired("output-stream");
        KinesisPubsubClient kinesisPubsubClient = new KinesisPubsubClient(fromArgs.getProperties());
        kinesisPubsubClient.createTopic(required, 2, fromArgs.getProperties());
        kinesisPubsubClient.createTopic(required2, 2, fromArgs.getProperties());
        AtomicReference atomicReference = new AtomicReference();
        new Thread(() -> {
            try {
                KinesisExample.main(strArr);
                LOG.info("executed program");
            } catch (Exception e) {
                atomicReference.set(e);
            }
        }).start();
        String[] strArr2 = {"elephant,5,45218", "squirrel,12,46213", "bee,3,51348", "squirrel,22,52444", "bee,10,53412", "elephant,9,54867"};
        for (String str : strArr2) {
            kinesisPubsubClient.sendMessage(required, str);
        }
        LOG.info("generated records");
        Deadline fromNow = Deadline.fromNow(Duration.ofSeconds(60L));
        List readAllMessages = kinesisPubsubClient.readAllMessages(required2);
        while (true) {
            list = readAllMessages;
            if (!fromNow.hasTimeLeft() || atomicReference.get() != null || list.size() >= strArr2.length) {
                break;
            }
            LOG.info("waiting for results..");
            Thread.sleep(1000L);
            readAllMessages = kinesisPubsubClient.readAllMessages(required2);
        }
        if (atomicReference.get() != null) {
            throw ((Exception) atomicReference.get());
        }
        LOG.info("results: {}", list);
        Assert.assertEquals("Results received from '" + required2 + "': " + list, strArr2.length, list.size());
        for (String str2 : new String[]{"elephant,5,45218", "elephant,14,54867", "squirrel,12,46213", "squirrel,34,52444", "bee,3,51348", "bee,13,53412"}) {
            Assert.assertTrue(str2, list.contains(str2));
        }
        System.out.println("test finished");
        System.exit(0);
    }
}
