package org.apache.hudi.utilities.sources;

import java.lang.invoke.SerializedLambda;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;

/* loaded from: input_file:org/apache/hudi/utilities/sources/JsonKafkaSource.class */
public class JsonKafkaSource extends JsonSource {
    private static final Logger LOG = LogManager.getLogger(JsonKafkaSource.class);
    private final KafkaOffsetGen offsetGen;
    private final HoodieDeltaStreamerMetrics metrics;

    public JsonKafkaSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics hoodieDeltaStreamerMetrics) {
        super(typedProperties, javaSparkContext, sparkSession, schemaProvider);
        this.metrics = hoodieDeltaStreamerMetrics;
        typedProperties.put("key.deserializer", StringDeserializer.class);
        typedProperties.put("value.deserializer", StringDeserializer.class);
        this.offsetGen = new KafkaOffsetGen(typedProperties);
    }

    @Override // org.apache.hudi.utilities.sources.Source
    protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> option, long j) {
        OffsetRange[] nextOffsetRanges = this.offsetGen.getNextOffsetRanges(option, j, this.metrics);
        long j2 = KafkaOffsetGen.CheckpointUtils.totalNewMessages(nextOffsetRanges);
        LOG.info("About to read " + j2 + " from Kafka for topic :" + this.offsetGen.getTopicName());
        return j2 <= 0 ? new InputBatch<>(Option.empty(), KafkaOffsetGen.CheckpointUtils.offsetsToStr(nextOffsetRanges)) : new InputBatch<>(Option.of(toRDD(nextOffsetRanges)), KafkaOffsetGen.CheckpointUtils.offsetsToStr(nextOffsetRanges));
    }

    private JavaRDD<String> toRDD(OffsetRange[] offsetRangeArr) {
        return KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRangeArr, LocationStrategies.PreferConsistent()).map(consumerRecord -> {
            return (String) consumerRecord.value();
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 202227380:
                if (implMethodName.equals("lambda$toRDD$d1bcf6dc$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH /* 0 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/sources/JsonKafkaSource") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/lang/String;")) {
                    return consumerRecord -> {
                        return (String) consumerRecord.value();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
