package org.apache.spark.streaming.kafka010;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Random;
import org.apache.hadoop.hbase.master.InconsistencyReporter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.class */
public class JavaKafkaRDDSuite implements Serializable {
    private transient JavaSparkContext sc = null;
    private transient KafkaTestUtils kafkaTestUtils = null;

    @Before
    public void setUp() {
        this.kafkaTestUtils = new KafkaTestUtils();
        this.kafkaTestUtils.setup();
        this.sc = new JavaSparkContext(new SparkConf().setMaster("local[4]").setAppName(getClass().getSimpleName()));
    }

    @After
    public void tearDown() {
        if (this.sc != null) {
            this.sc.stop();
            this.sc = null;
        }
        if (this.kafkaTestUtils != null) {
            this.kafkaTestUtils.teardown();
            this.kafkaTestUtils = null;
        }
    }

    @Test
    public void testKafkaRDD() throws InterruptedException {
        Random random = new Random();
        createTopicAndSendData("topic1");
        createTopicAndSendData("topic2");
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.kafkaTestUtils.brokerAddress());
        hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        hashMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        hashMap.put("group.id", "java-test-consumer-" + random.nextInt() + "-" + System.currentTimeMillis());
        OffsetRange[] offsetRangeArr = {OffsetRange.create("topic1", 0, 0L, 1L), OffsetRange.create("topic2", 0, 0L, 1L)};
        HashMap hashMap2 = new HashMap();
        String str = this.kafkaTestUtils.brokerAddress().split(":")[0];
        hashMap2.put(offsetRangeArr[0].topicPartition(), str);
        hashMap2.put(offsetRangeArr[1].topicPartition(), str);
        Function<ConsumerRecord<String, String>, String> function = new Function<ConsumerRecord<String, String>, String>() { // from class: org.apache.spark.streaming.kafka010.JavaKafkaRDDSuite.1
            public String call(ConsumerRecord<String, String> consumerRecord) {
                return consumerRecord.value();
            }
        };
        JavaRDD map = KafkaUtils.createRDD(this.sc, hashMap, offsetRangeArr, LocationStrategies.PreferFixed(hashMap2)).map(function);
        JavaRDD map2 = KafkaUtils.createRDD(this.sc, hashMap, offsetRangeArr, LocationStrategies.PreferConsistent()).map(function);
        long count = map.count();
        long count2 = map2.count();
        Assert.assertTrue(count > 0);
        Assert.assertEquals(count, count2);
    }

    private String[] createTopicAndSendData(String str) {
        String[] strArr = {str + InconsistencyReporter.MINUS_ONE, str + "-2", str + "-3"};
        this.kafkaTestUtils.createTopic(str);
        this.kafkaTestUtils.sendMessages(str, strArr);
        return strArr;
    }
}
