package org.apache.spark.sql.hudi.streaming;

import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.AvroConversionUtils$;
import org.apache.hudi.DataSourceReadOptions$;
import org.apache.hudi.IncrementalRelation;
import org.apache.hudi.MergeOnReadIncrementalRelation;
import org.apache.hudi.MergeOnReadIncrementalRelation$;
import org.apache.hudi.SparkAdapterSupport;
import org.apache.hudi.cdc.CDCRelation$;
import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.spark.internal.Logging;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.streaming.Offset;
import org.apache.spark.sql.execution.streaming.Source;
import org.apache.spark.sql.hudi.SparkAdapter;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;

/* compiled from: HoodieStreamSource.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005=g\u0001B\f\u0019\u0001\u0015B\u0001B\u0012\u0001\u0003\u0002\u0003\u0006Ia\u0012\u0005\t\u0017\u0002\u0011\t\u0011)A\u0005\u0019\"Aq\u000b\u0001B\u0001B\u0003%\u0001\f\u0003\u0005b\u0001\t\u0005\t\u0015!\u0003c\u0011!)\u0007A!A!\u0002\u00131\u0007\"\u00026\u0001\t\u0003Y\u0007b\u0002:\u0001\u0005\u0004%Ia\u001d\u0005\b\u0003\u000b\u0001\u0001\u0015!\u0003u\u0011)\ty\u0001\u0001EC\u0002\u0013%\u0011\u0011\u0003\u0005\u000b\u00033\u0001\u0001R1A\u0005\n\u0005m\u0001BCA\u0017\u0001!\u0015\r\u0011\"\u0003\u00020!I\u0011Q\b\u0001C\u0002\u0013%\u0011q\b\u0005\t\u0003\u000f\u0002\u0001\u0015!\u0003\u0002B!I\u0011\u0011\n\u0001C\u0002\u0013%\u00111\n\u0005\t\u0003o\u0002\u0001\u0015!\u0003\u0002N!Q\u0011\u0011\u0010\u0001\t\u0006\u0004%I!a\u001f\t\u000f\u0005\u0015\u0005\u0001\"\u0011\u0002\b\"9\u0011\u0011\u0012\u0001\u0005\n\u0005-\u0005bBAH\u0001\u0011\u0005\u0013\u0011\u0013\u0005\b\u00037\u0003A\u0011IAO\u0011\u001d\ti\f\u0001C\u0005\u0003\u007fCq!!2\u0001\t\u0003\n9M\u0001\nI_>$\u0017.Z*ue\u0016\fWnU8ve\u000e,'BA\r\u001b\u0003%\u0019HO]3b[&twM\u0003\u0002\u001c9\u0005!\u0001.\u001e3j\u0015\tib$A\u0002tc2T!a\b\u0011\u0002\u000bM\u0004\u0018M]6\u000b\u0005\u0005\u0012\u0013AB1qC\u000eDWMC\u0001$\u0003\ry'oZ\u0002\u0001'\u0019\u0001aEL\u001b<\u0003B\u0011q\u0005L\u0007\u0002Q)\u0011\u0011FK\u0001\u0005Y\u0006twMC\u0001,\u0003\u0011Q\u0017M^1\n\u00055B#AB(cU\u0016\u001cG\u000f\u0005\u00020g5\t\u0001G\u0003\u0002\u001ac)\u0011!\u0007H\u0001\nKb,7-\u001e;j_:L!\u0001\u000e\u0019\u0003\rM{WO]2f!\t1\u0014(D\u00018\u0015\tAd$\u0001\u0005j]R,'O\\1m\u0013\tQtGA\u0004M_\u001e<\u0017N\\4\u0011\u0005qzT\"A\u001f\u000b\u0003y\nQa]2bY\u0006L!\u0001Q\u001f\u0003\u0019M+'/[1mSj\f'\r\\3\u0011\u0005\t#U\"A\"\u000b\u0005m\u0001\u0013BA#D\u0005M\u0019\u0006/\u0019:l\u0003\u0012\f\u0007\u000f^3s'V\u0004\bo\u001c:u\u0003)\u0019\u0018\u000f\\\"p]R,\u0007\u0010\u001e\t\u0003\u0011&k\u0011\u0001H\u0005\u0003\u0015r\u0011!bU)M\u0007>tG/\u001a=u\u00031iW\r^1eCR\f\u0007+\u0019;i!\tiEK\u0004\u0002O%B\u0011q*P\u0007\u0002!*\u0011\u0011\u000bJ\u0001\u0007yI|w\u000e\u001e \n\u0005Mk\u0014A\u0002)sK\u0012,g-\u0003\u0002V-\n11\u000b\u001e:j]\u001eT!aU\u001f\u0002\u0019M\u001c\u0007.Z7b\u001fB$\u0018n\u001c8\u0011\u0007qJ6,\u0003\u0002[{\t1q\n\u001d;j_:\u0004\"\u0001X0\u000e\u0003uS!A\u0018\u000f\u0002\u000bQL\b/Z:\n\u0005\u0001l&AC*ueV\u001cG\u000fV=qK\u0006Q\u0001/\u0019:b[\u0016$XM]:\u0011\t5\u001bG\nT\u0005\u0003IZ\u00131!T1q\u0003AygMZ:fiJ\u000bgnZ3MS6LG\u000f\u0005\u0002hQ6\t\u0001$\u0003\u0002j1\t1\u0002j\\8eS\u0016|eMZ:fiJ\u000bgnZ3MS6LG/\u0001\u0004=S:LGO\u0010\u000b\u0007Y6tw\u000e]9\u0011\u0005\u001d\u0004\u0001\"\u0002$\u0007\u0001\u00049\u0005\"B&\u0007\u0001\u0004a\u0005\"B,\u0007\u0001\u0004A\u0006\"B1\u0007\u0001\u0004\u0011\u0007\"B3\u0007\u0001\u00041\u0017aC:u_J\fw-Z\"p]\u001a,\u0012\u0001\u001e\t\u0004kbTX\"\u0001<\u000b\u0005]\u001c\u0015aB:u_J\fw-Z\u0005\u0003sZ\u0014Ac\u0015;pe\u0006<WmQ8oM&<WO]1uS>t\u0007cA>\u0002\u00025\tAP\u0003\u0002~}\u0006!1m\u001c8g\u0015\ty\b%\u0001\u0004iC\u0012|w\u000e]\u0005\u0004\u0003\u0007a(!D\"p]\u001aLw-\u001e:bi&|g.\u0001\u0007ti>\u0014\u0018mZ3D_:4\u0007\u0005K\u0002\t\u0003\u0013\u00012\u0001PA\u0006\u0013\r\ti!\u0010\u0002\niJ\fgn]5f]R\f\u0011\u0002^1cY\u0016\u0004\u0016\r\u001e5\u0016\u0005\u0005M\u0001cA;\u0002\u0016%\u0019\u0011q\u0003<\u0003\u0017M#xN]1hKB\u000bG\u000f[\u0001\u000b[\u0016$\u0018m\u00117jK:$XCAA\u000f!\u0011\ty\"!\u000b\u000e\u0005\u0005\u0005\"\u0002BA\u0012\u0003K\tQ\u0001^1cY\u0016T1!a\nD\u0003\u0019\u0019w.\\7p]&!\u00111FA\u0011\u0005UAun\u001c3jKR\u000b'\r\\3NKR\f7\t\\5f]R\f\u0011\u0002^1cY\u0016$\u0016\u0010]3\u0016\u0005\u0005E\u0002\u0003BA\u001a\u0003si!!!\u000e\u000b\t\u0005]\u0012QE\u0001\u0006[>$W\r\\\u0005\u0005\u0003w\t)DA\bI_>$\u0017.\u001a+bE2,G+\u001f9f\u0003)I7o\u0011#D#V,'/_\u000b\u0003\u0003\u0003\u00022\u0001PA\"\u0013\r\t)%\u0010\u0002\b\u0005>|G.Z1o\u0003-I7o\u0011#D#V,'/\u001f\u0011\u0002)!|G\u000e\\8x\u0007>lW.\u001b;IC:$G.\u001b8h+\t\ti\u0005\u0005\u0003\u0002P\u0005Ed\u0002BA)\u0003WrA!a\u0015\u0002h9!\u0011QKA3\u001d\u0011\t9&a\u0019\u000f\t\u0005e\u0013\u0011\r\b\u0005\u00037\nyFD\u0002P\u0003;J\u0011aI\u0005\u0003C\tJ!a\u0007\u0011\n\u0007\u0005\u001d2)\u0003\u0003\u0002$\u0005\u0015\u0012\u0002BA5\u0003C\t\u0001\u0002^5nK2Lg.Z\u0005\u0005\u0003[\ny'A\u0007US6,G.\u001b8f+RLGn\u001d\u0006\u0005\u0003S\n\t#\u0003\u0003\u0002t\u0005U$\u0001\u0006%pY2|woQ8n[&$\b*\u00198eY&twM\u0003\u0003\u0002n\u0005=\u0014!\u00065pY2|woQ8n[&$\b*\u00198eY&tw\rI\u0001\u000fS:LG/[1m\u001f\u001a47/\u001a;t+\t\ti\bE\u0002h\u0003\u007fJ1!!!\u0019\u0005IAun\u001c3jKN{WO]2f\u001f\u001a47/\u001a;)\u0007A\tI!\u0001\u0004tG\",W.Y\u000b\u00027\u0006yq-\u001a;MCR,7\u000f^(gMN,G/\u0006\u0002\u0002\u000eB!A(WA?\u0003%9W\r^(gMN,G/\u0006\u0002\u0002\u0014B!A(WAK!\ry\u0013qS\u0005\u0004\u00033\u0003$AB(gMN,G/\u0001\u0005hKR\u0014\u0015\r^2i)\u0019\ty*!.\u0002:B!\u0011\u0011UAX\u001d\u0011\t\u0019+a+\u000f\t\u0005\u0015\u0016\u0011\u0016\b\u0005\u00033\n9+\u0003\u0002 A%\u0011QDH\u0005\u0004\u0003[c\u0012a\u00029bG.\fw-Z\u0005\u0005\u0003c\u000b\u0019LA\u0005ECR\fgI]1nK*\u0019\u0011Q\u0016\u000f\t\u000f\u0005]F\u00031\u0001\u0002\u0014\u0006)1\u000f^1si\"9\u00111\u0018\u000bA\u0002\u0005U\u0015aA3oI\u0006y1\u000f^1si\u000e{W.\\5u)&lW\rF\u0002M\u0003\u0003Dq!a1\u0016\u0001\u0004\ti(A\u0006ti\u0006\u0014Ho\u00144gg\u0016$\u0018\u0001B:u_B$\"!!3\u0011\u0007q\nY-C\u0002\u0002Nv\u0012A!\u00168ji\u0002")
/* loaded from: input_file:org/apache/spark/sql/hudi/streaming/HoodieStreamSource.class */
public class HoodieStreamSource implements Source, Logging, Serializable, SparkAdapterSupport {
    private StoragePath tablePath;
    private HoodieTableMetaClient metaClient;
    private HoodieTableType tableType;
    private transient HoodieSourceOffset initialOffsets;
    private final SQLContext sqlContext;
    private final String metadataPath;
    private final Option<StructType> schemaOption;
    private final Map<String, String> parameters;
    private final HoodieOffsetRangeLimit offsetRangeLimit;
    private final transient StorageConfiguration<Configuration> storageConf;
    private final boolean isCDCQuery;
    private final TimelineUtils.HollowCommitHandling hollowCommitHandling;
    private SparkAdapter sparkAdapter;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile byte bitmap$0;
    private volatile transient boolean bitmap$trans$0;

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public void commit(Offset offset) {
        Source.commit$(this, offset);
    }

    public org.apache.spark.sql.connector.read.streaming.Offset initialOffset() {
        return Source.initialOffset$(this);
    }

    public org.apache.spark.sql.connector.read.streaming.Offset deserializeOffset(String str) {
        return Source.deserializeOffset$(this, str);
    }

    public void commit(org.apache.spark.sql.connector.read.streaming.Offset offset) {
        Source.commit$(this, offset);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.spark.sql.hudi.streaming.HoodieStreamSource] */
    private SparkAdapter sparkAdapter$lzycompute() {
        SparkAdapter sparkAdapter;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 8)) == 0) {
                sparkAdapter = sparkAdapter();
                this.sparkAdapter = sparkAdapter;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 8);
            }
        }
        return this.sparkAdapter;
    }

    @Override // org.apache.hudi.SparkAdapterSupport
    public SparkAdapter sparkAdapter() {
        return ((byte) (this.bitmap$0 & 8)) == 0 ? sparkAdapter$lzycompute() : this.sparkAdapter;
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private StorageConfiguration<Configuration> storageConf() {
        return this.storageConf;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.spark.sql.hudi.streaming.HoodieStreamSource] */
    private StoragePath tablePath$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                StoragePath storagePath = new StoragePath((String) this.parameters.getOrElse("path", () -> {
                    return "Missing 'path' option";
                }));
                this.tablePath = TablePathUtils.getTablePath(new HoodieHadoopStorage(storagePath, (StorageConfiguration<?>) storageConf()), storagePath).get();
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.tablePath;
    }

    private StoragePath tablePath() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? tablePath$lzycompute() : this.tablePath;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.spark.sql.hudi.streaming.HoodieStreamSource] */
    private HoodieTableMetaClient metaClient$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                this.metaClient = HoodieTableMetaClient.builder().setConf(storageConf().newInstance()).setBasePath(tablePath().toString()).build();
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.metaClient;
    }

    private HoodieTableMetaClient metaClient() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? metaClient$lzycompute() : this.metaClient;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.spark.sql.hudi.streaming.HoodieStreamSource] */
    private HoodieTableType tableType$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 4)) == 0) {
                this.tableType = metaClient().getTableType();
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 4);
            }
        }
        return this.tableType;
    }

    private HoodieTableType tableType() {
        return ((byte) (this.bitmap$0 & 4)) == 0 ? tableType$lzycompute() : this.tableType;
    }

    private boolean isCDCQuery() {
        return this.isCDCQuery;
    }

    private TimelineUtils.HollowCommitHandling hollowCommitHandling() {
        return this.hollowCommitHandling;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [org.apache.spark.sql.hudi.streaming.HoodieStreamSource] */
    private HoodieSourceOffset initialOffsets$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$trans$0) {
                HoodieMetadataLog hoodieMetadataLog = new HoodieMetadataLog(this.sqlContext.sparkSession(), this.metadataPath);
                this.initialOffsets = (HoodieSourceOffset) hoodieMetadataLog.get(0L).getOrElse(() -> {
                    HoodieSourceOffset hoodieSourceOffset;
                    HoodieOffsetRangeLimit hoodieOffsetRangeLimit = this.offsetRangeLimit;
                    if (HoodieEarliestOffsetRangeLimit$.MODULE$.equals(hoodieOffsetRangeLimit)) {
                        hoodieSourceOffset = HoodieSourceOffset$.MODULE$.INIT_OFFSET();
                    } else if (HoodieLatestOffsetRangeLimit$.MODULE$.equals(hoodieOffsetRangeLimit)) {
                        hoodieSourceOffset = (HoodieSourceOffset) this.getLatestOffset().getOrElse(() -> {
                            return HoodieSourceOffset$.MODULE$.INIT_OFFSET();
                        });
                    } else {
                        if (!(hoodieOffsetRangeLimit instanceof HoodieSpecifiedOffsetRangeLimit)) {
                            throw new MatchError(hoodieOffsetRangeLimit);
                        }
                        hoodieSourceOffset = new HoodieSourceOffset(((HoodieSpecifiedOffsetRangeLimit) hoodieOffsetRangeLimit).instantTime());
                    }
                    HoodieSourceOffset hoodieSourceOffset2 = hoodieSourceOffset;
                    hoodieMetadataLog.add(0L, hoodieSourceOffset2);
                    this.logInfo(() -> {
                        return new StringBuilder(22).append("The initial offset is ").append(hoodieSourceOffset2).toString();
                    });
                    return hoodieSourceOffset2;
                });
                r0 = this;
                r0.bitmap$trans$0 = true;
            }
        }
        return this.initialOffsets;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public HoodieSourceOffset initialOffsets() {
        return !this.bitmap$trans$0 ? initialOffsets$lzycompute() : this.initialOffsets;
    }

    public StructType schema() {
        return isCDCQuery() ? CDCRelation$.MODULE$.FULL_CDC_SPARK_SCHEMA() : (StructType) this.schemaOption.getOrElse(() -> {
            return AvroConversionUtils$.MODULE$.convertAvroSchemaToStructType(new TableSchemaResolver(this.metaClient()).getTableAvroSchema());
        });
    }

    private Option<HoodieSourceOffset> getLatestOffset() {
        Some some;
        metaClient().reloadActiveTimeline();
        HoodieTimeline handleHollowCommitIfNeeded = TimelineUtils.handleHollowCommitIfNeeded(metaClient().getActiveTimeline().filterCompletedInstants(), metaClient(), hollowCommitHandling());
        if (handleHollowCommitIfNeeded.empty()) {
            some = None$.MODULE$;
        } else {
            TimelineUtils.HollowCommitHandling hollowCommitHandling = hollowCommitHandling();
            TimelineUtils.HollowCommitHandling hollowCommitHandling2 = TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME;
            some = new Some(new HoodieSourceOffset((hollowCommitHandling != null ? !hollowCommitHandling.equals(hollowCommitHandling2) : hollowCommitHandling2 != null) ? handleHollowCommitIfNeeded.lastInstant().get().getTimestamp() : handleHollowCommitIfNeeded.getInstantsOrderedByStateTransitionTime().skip(handleHollowCommitIfNeeded.countInstants() - 1).findFirst().get().getStateTransitionTime()));
        }
        return some;
    }

    public Option<Offset> getOffset() {
        return getLatestOffset();
    }

    public Dataset<Row> getBatch(Option<Offset> option, Offset offset) {
        RDD<Row> buildScan;
        HoodieSourceOffset hoodieSourceOffset = (HoodieSourceOffset) option.map(offset2 -> {
            return HoodieSourceOffset$.MODULE$.apply(offset2);
        }).getOrElse(() -> {
            return this.initialOffsets();
        });
        HoodieSourceOffset apply = HoodieSourceOffset$.MODULE$.apply(offset);
        if (hoodieSourceOffset != null ? hoodieSourceOffset.equals(apply) : apply == null) {
            return this.sqlContext.internalCreateDataFrame(this.sqlContext.sparkContext().emptyRDD(ClassTag$.MODULE$.apply(InternalRow.class)).setName("empty"), schema(), true);
        }
        if (isCDCQuery()) {
            return this.sqlContext.sparkSession().internalCreateDataFrame(CDCRelation$.MODULE$.getCDCRelation(this.sqlContext, metaClient(), Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key()), startCommitTime(hoodieSourceOffset)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key()), apply.commitTime())}))).buildScan0(HoodieCDCUtils.CDC_COLUMNS, (Filter[]) Array$.MODULE$.empty(ClassTag$.MODULE$.apply(Filter.class))), CDCRelation$.MODULE$.FULL_CDC_SPARK_SCHEMA(), true);
        }
        Map $plus$plus = this.parameters.$plus$plus(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key()), DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceReadOptions$.MODULE$.BEGIN_INSTANTTIME().key()), startCommitTime(hoodieSourceOffset)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceReadOptions$.MODULE$.END_INSTANTTIME().key()), apply.commitTime()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(DataSourceReadOptions$.MODULE$.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT().key()), hollowCommitHandling().name())})));
        HoodieTableType tableType = tableType();
        if (HoodieTableType.COPY_ON_WRITE.equals(tableType) ? true : HoodieTableType.MERGE_ON_WRITE.equals(tableType)) {
            SparkRowSerDe createSparkRowSerDe = sparkAdapter().createSparkRowSerDe(schema());
            buildScan = new IncrementalRelation(this.sqlContext, $plus$plus, new Some(schema()), metaClient()).buildScan().map(row -> {
                return createSparkRowSerDe.serializeRow(row);
            }, ClassTag$.MODULE$.apply(InternalRow.class));
        } else {
            if (!HoodieTableType.MERGE_ON_READ.equals(tableType)) {
                throw new IllegalArgumentException(new StringBuilder(21).append("UnSupport tableType: ").append(tableType()).toString());
            }
            buildScan = new MergeOnReadIncrementalRelation(this.sqlContext, $plus$plus, metaClient(), new Some(schema()), MergeOnReadIncrementalRelation$.MODULE$.$lessinit$greater$default$5()).buildScan((String[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(schema().fields())).map(structField -> {
                return structField.name();
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))), (Filter[]) Array$.MODULE$.empty(ClassTag$.MODULE$.apply(Filter.class)));
        }
        return this.sqlContext.internalCreateDataFrame(buildScan, schema(), true);
    }

    private String startCommitTime(HoodieSourceOffset hoodieSourceOffset) {
        String commitTime;
        HoodieSourceOffset INIT_OFFSET = HoodieSourceOffset$.MODULE$.INIT_OFFSET();
        if (INIT_OFFSET != null ? INIT_OFFSET.equals(hoodieSourceOffset) : hoodieSourceOffset == null) {
            commitTime = hoodieSourceOffset.commitTime();
        } else {
            if (hoodieSourceOffset == null) {
                throw new IllegalStateException("UnKnow offset type.");
            }
            commitTime = hoodieSourceOffset.commitTime();
        }
        return commitTime;
    }

    public void stop() {
    }

    public HoodieStreamSource(SQLContext sQLContext, String str, Option<StructType> option, Map<String, String> map, HoodieOffsetRangeLimit hoodieOffsetRangeLimit) {
        this.sqlContext = sQLContext;
        this.metadataPath = str;
        this.schemaOption = option;
        this.parameters = map;
        this.offsetRangeLimit = hoodieOffsetRangeLimit;
        Source.$init$(this);
        Logging.$init$(this);
        SparkAdapterSupport.$init$(this);
        this.storageConf = HadoopFSUtils.getStorageConf(sQLContext.sparkSession().sessionState().newHadoopConf());
        this.isCDCQuery = CDCRelation$.MODULE$.isCDCEnabled(metaClient()) && map.get(DataSourceReadOptions$.MODULE$.QUERY_TYPE().key()).contains(DataSourceReadOptions$.MODULE$.QUERY_TYPE_INCREMENTAL_OPT_VAL()) && map.get(DataSourceReadOptions$.MODULE$.INCREMENTAL_FORMAT().key()).contains(DataSourceReadOptions$.MODULE$.INCREMENTAL_FORMAT_CDC_VAL());
        this.hollowCommitHandling = (TimelineUtils.HollowCommitHandling) map.get(DataSourceReadOptions$.MODULE$.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT().key()).map(str2 -> {
            return TimelineUtils.HollowCommitHandling.valueOf(str2);
        }).getOrElse(() -> {
            return TimelineUtils.HollowCommitHandling.BLOCK;
        });
    }
}
