package org.apache.hudi.table;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.adapter.DataStreamScanProviderAdapter;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.prune.PartitionPruner;
import org.apache.hudi.common.prune.condition.EqualsCondition;
import org.apache.hudi.common.prune.condition.GreaterThanCondition;
import org.apache.hudi.common.prune.condition.GreaterThanOrEqualsCondition;
import org.apache.hudi.common.prune.condition.LessThanCondition;
import org.apache.hudi.common.prune.condition.LessThanOrEqualsCondition;
import org.apache.hudi.common.prune.condition.NotEqualsCondition;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.source.FileIndex;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.source.StreamReadOperator;
import org.apache.hudi.source.StreamReadWithHistoricalDataMonitoringFunction;
import org.apache.hudi.source.filters.BinaryExpression;
import org.apache.hudi.source.filters.FilterExpressionParser;
import org.apache.hudi.source.prune.DataPruner;
import org.apache.hudi.source.prune.PrimaryKeyPruners;
import org.apache.hudi.source.rebalance.partitioner.StreamReadAppendPartitioner;
import org.apache.hudi.source.rebalance.partitioner.StreamReadBucketIndexPartitioner;
import org.apache.hudi.source.rebalance.selector.StreamReadAppendKeySelector;
import org.apache.hudi.source.rebalance.selector.StreamReadBucketIndexKeySelector;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.HoodieInputFormat;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.table.format.cdc.CdcInputFormat;
import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
import org.apache.hudi.table.format.mow.MergeOnWriteInputFormat;
import org.apache.hudi.table.lookup.HoodieLookupFunction;
import org.apache.hudi.table.lookup.HoodieLookupTableReader;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.ChangelogModes;
import org.apache.hudi.util.ExpressionUtils;
import org.apache.hudi.util.InputFormats;
import org.apache.hudi.util.RowDataToJavaConverter;
import org.apache.hudi.util.SerializableSchema;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/table/HoodieTableSource.class */
public class HoodieTableSource implements ScanTableSource, SupportsProjectionPushDown, SupportsLimitPushDown, SupportsFilterPushDown, LookupTableSource, Serializable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(HoodieTableSource.class);
    private static final int NO_LIMIT_CONSTANT = -1;
    private final StorageConfiguration<Configuration> hadoopConf;
    private final HoodieTableMetaClient metaClient;
    private final long maxCompactionMemoryInBytes;
    private final SerializableSchema schema;
    private final RowType tableRowType;
    private final StoragePath path;
    private final List<String> partitionKeys;
    private final String defaultPartName;
    private final org.apache.flink.configuration.Configuration conf;
    private final InternalSchemaManager internalSchemaManager;
    private int[] requiredPos;
    private long limit;
    private List<ExpressionPredicates.Predicate> predicates;
    private DataPruner dataPruner;
    private PartitionPruner partitionPruner;
    private int dataBucket;
    private transient FileIndex fileIndex;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.hudi.table.HoodieTableSource$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/hudi/table/HoodieTableSource$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hudi$common$model$HoodieTableType = new int[HoodieTableType.values().length];

        static {
            try {
                $SwitchMap$org$apache$hudi$common$model$HoodieTableType[HoodieTableType.MERGE_ON_READ.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$model$HoodieTableType[HoodieTableType.COPY_ON_WRITE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$model$HoodieTableType[HoodieTableType.MERGE_ON_WRITE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public HoodieTableSource(SerializableSchema serializableSchema, StoragePath storagePath, List<String> list, String str, org.apache.flink.configuration.Configuration configuration) {
        this(serializableSchema, storagePath, list, str, configuration, null, null, null, -1, null, null, null, null);
    }

    public HoodieTableSource(SerializableSchema serializableSchema, StoragePath storagePath, List<String> list, String str, org.apache.flink.configuration.Configuration configuration, @Nullable List<ExpressionPredicates.Predicate> list2, @Nullable DataPruner dataPruner, @Nullable PartitionPruner partitionPruner, int i, @Nullable int[] iArr, @Nullable Long l, @Nullable HoodieTableMetaClient hoodieTableMetaClient, @Nullable InternalSchemaManager internalSchemaManager) {
        this.schema = serializableSchema;
        this.tableRowType = this.schema.toSourceRowDataType().notNull().getLogicalType();
        this.path = storagePath;
        this.partitionKeys = list;
        this.defaultPartName = str;
        this.conf = configuration;
        this.predicates = list2 == null ? Collections.emptyList() : list2;
        this.dataPruner = dataPruner;
        this.partitionPruner = partitionPruner;
        this.dataBucket = i;
        this.requiredPos = iArr == null ? IntStream.range(0, this.tableRowType.getFieldCount()).toArray() : iArr;
        this.limit = l == null ? -1L : l.longValue();
        this.hadoopConf = new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(configuration));
        this.metaClient = hoodieTableMetaClient == null ? StreamerUtil.metaClientForReader(configuration, (Configuration) this.hadoopConf.unwrap()) : hoodieTableMetaClient;
        this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(configuration);
        this.internalSchemaManager = internalSchemaManager == null ? InternalSchemaManager.get(this.conf, this.metaClient) : internalSchemaManager;
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
        return new DataStreamScanProviderAdapter() { // from class: org.apache.hudi.table.HoodieTableSource.1
            public boolean isBounded() {
                return !HoodieTableSource.this.conf.getBoolean(FlinkOptions.READ_AS_STREAMING);
            }

            public DataStream<RowData> produceDataStream(StreamExecutionEnvironment streamExecutionEnvironment) {
                TypeInformation fromDataTypeToTypeInfo = TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(HoodieTableSource.this.getProducedDataType());
                OptionsInference.setupSourceTasks(HoodieTableSource.this.conf, streamExecutionEnvironment.getParallelism());
                if (!HoodieTableSource.this.conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
                    return streamExecutionEnvironment.addSource(new InputFormatSourceFunction(HoodieTableSource.this.getInputFormat(), fromDataTypeToTypeInfo), HoodieTableSource.this.asSummaryString(), fromDataTypeToTypeInfo).name(HoodieTableSource.this.getSourceOperatorName("bounded_source")).setParallelism(HoodieTableSource.this.conf.getInteger(FlinkOptions.READ_TASKS));
                }
                return new DataStreamSource(HoodieTableSource.this.addFileDistributionStrategy(streamExecutionEnvironment.addSource(streamExecutionEnvironment.getConfig().isHistoricalDataProcessing() ? new StreamReadWithHistoricalDataMonitoringFunction(HoodieTableSource.this.conf, FilePathUtils.toFlinkPath(HoodieTableSource.this.path), HoodieTableSource.this.tableRowType, HoodieTableSource.this.maxCompactionMemoryInBytes, HoodieTableSource.this.partitionPruner) : new StreamReadMonitoringFunction(HoodieTableSource.this.conf, FilePathUtils.toFlinkPath(HoodieTableSource.this.path), HoodieTableSource.this.tableRowType, HoodieTableSource.this.maxCompactionMemoryInBytes, HoodieTableSource.this.partitionPruner), HoodieTableSource.this.getSourceOperatorName("split_monitor")).uid(Pipelines.opUID("split_monitor", HoodieTableSource.this.conf)).setParallelism(1).setMaxParallelism(1)).keyBy((v0) -> {
                    return v0.getFileId();
                }).transform("split_reader", fromDataTypeToTypeInfo, StreamReadOperator.factory((MergeOnReadInputFormat) HoodieTableSource.this.getInputFormat(true), HoodieTableSource.this.conf)).uid(Pipelines.opUID("split_reader", HoodieTableSource.this.conf)).setParallelism(HoodieTableSource.this.conf.getInteger(FlinkOptions.READ_TASKS)));
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case 421507757:
                        if (implMethodName.equals("getFileId")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/format/mor/MergeOnReadInputSplit") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                            return (v0) -> {
                                return v0.getFileId();
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DataStream<MergeOnReadInputSplit> addFileDistributionStrategy(SingleOutputStreamOperator<MergeOnReadInputSplit> singleOutputStreamOperator) {
        return OptionsResolver.isMorWithBucketIndexUpsert(this.conf) ? singleOutputStreamOperator.partitionCustom(new StreamReadBucketIndexPartitioner(this.conf.getInteger(FlinkOptions.READ_TASKS)), new StreamReadBucketIndexKeySelector()) : OptionsResolver.isAppendMode(this.conf) ? singleOutputStreamOperator.partitionCustom(new StreamReadAppendPartitioner(this.conf.getInteger(FlinkOptions.READ_TASKS)), new StreamReadAppendKeySelector()) : singleOutputStreamOperator.keyBy((v0) -> {
            return v0.getFileId();
        });
    }

    public ChangelogMode getChangelogMode() {
        return OptionsResolver.emitChangelog(this.conf) ? ChangelogModes.FULL : ChangelogMode.insertOnly();
    }

    /* renamed from: copy, reason: merged with bridge method [inline-methods] */
    public HoodieTableSource m92copy() {
        return new HoodieTableSource(this.schema, this.path, this.partitionKeys, this.defaultPartName, this.conf, this.predicates, this.dataPruner, this.partitionPruner, this.dataBucket, this.requiredPos, Long.valueOf(this.limit), this.metaClient, this.internalSchemaManager);
    }

    public String asSummaryString() {
        return "HudiTableSource";
    }

    public SupportsFilterPushDown.Result applyFilters(List<ResolvedExpression> list) {
        List<ResolvedExpression> filterSimpleCallExpression = ExpressionUtils.filterSimpleCallExpression(list);
        Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> splitExprByPartitionCall = ExpressionUtils.splitExprByPartitionCall(filterSimpleCallExpression, this.partitionKeys, this.tableRowType);
        this.predicates = ExpressionPredicates.fromExpression((List<ResolvedExpression>) splitExprByPartitionCall.f0);
        this.dataPruner = DataPruner.newInstance((List) splitExprByPartitionCall.f0);
        this.partitionPruner = cratePartitionPruner(filterSimpleCallExpression);
        this.dataBucket = getDataBucket((List) splitExprByPartitionCall.f0);
        return SupportsFilterPushDown.Result.of(new ArrayList((Collection) splitExprByPartitionCall.f1), new ArrayList(list));
    }

    public boolean supportsNestedProjection() {
        return false;
    }

    public void applyProjection(int[][] iArr) {
        this.requiredPos = Arrays.stream(iArr).mapToInt(iArr2 -> {
            return iArr2[0];
        }).toArray();
    }

    public void applyLimit(long j) {
        this.limit = j;
    }

    public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.LookupContext lookupContext) {
        return TableFunctionProvider.of(new HoodieLookupFunction(new HoodieLookupTableReader(this::getBatchInputFormat, this.conf), getProducedDataType().notNull().getLogicalType(), getLookupKeys(lookupContext.getKeys()), (Duration) this.conf.get(FlinkOptions.LOOKUP_JOIN_CACHE_TTL)));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DataType getProducedDataType() {
        String[] strArr = (String[]) this.schema.getColumnNames().toArray(new String[0]);
        DataType[] dataTypeArr = (DataType[]) this.schema.getColumnDataTypes().toArray(new DataType[0]);
        return DataTypes.ROW((DataTypes.Field[]) Arrays.stream(this.requiredPos).mapToObj(i -> {
            return DataTypes.FIELD(strArr[i], dataTypeArr[i]);
        }).toArray(i2 -> {
            return new DataTypes.Field[i2];
        })).bridgedTo(RowData.class);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getSourceOperatorName(String str) {
        String[] strArr = (String[]) this.schema.getColumnNames().toArray(new String[0]);
        List list = (List) Arrays.stream(this.requiredPos).mapToObj(i -> {
            return strArr[i];
        }).collect(Collectors.toList());
        StringBuilder sb = new StringBuilder();
        sb.append(str).append("(").append("table=").append(Collections.singletonList(this.conf.getString(FlinkOptions.TABLE_NAME))).append(", ").append("fields=").append(list).append(")");
        return sb.toString();
    }

    @Nullable
    private PartitionPruner cratePartitionPruner(List<ResolvedExpression> list) {
        if (list.isEmpty()) {
            return null;
        }
        StringJoiner stringJoiner = new StringJoiner(" and ");
        list.forEach(resolvedExpression -> {
            stringJoiner.add(resolvedExpression.asSummaryString());
        });
        LOG.info("Partition pruner for hoodie source, condition is:\n" + stringJoiner);
        RowDataToJavaConverter rowDataToJavaConverter = new RowDataToJavaConverter(this.schema);
        List list2 = (List) FilterExpressionParser.parse(list).stream().map(filterExpression -> {
            if (!(filterExpression instanceof BinaryExpression)) {
                return null;
            }
            BinaryExpression binaryExpression = (BinaryExpression) filterExpression;
            String column = binaryExpression.getColumn();
            String function = binaryExpression.getFunction();
            try {
                Object convert = rowDataToJavaConverter.convert(column, binaryExpression.getValue());
                if (function.equals(BuiltInFunctionDefinitions.GREATER_THAN.getName())) {
                    return new GreaterThanCondition(column, convert);
                }
                if (function.equals(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.getName())) {
                    return new GreaterThanOrEqualsCondition(column, convert);
                }
                if (function.equals(BuiltInFunctionDefinitions.LESS_THAN.getName())) {
                    return new LessThanCondition(column, convert);
                }
                if (function.equals(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.getName())) {
                    return new LessThanOrEqualsCondition(column, convert);
                }
                if (function.equals(BuiltInFunctionDefinitions.EQUALS.getName())) {
                    return new EqualsCondition(column, convert);
                }
                if (function.equals(BuiltInFunctionDefinitions.NOT_EQUALS.getName())) {
                    return new NotEqualsCondition(column, convert);
                }
                return null;
            } catch (IllegalArgumentException e) {
                LOG.warn("Unable to create PartitionPruner", e);
                return null;
            }
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
        return this.conf.getBoolean(FlinkOptions.HIDDEN_PARTITIONING_ENABLED) ? PartitionPruner.getInstance(list2, this.conf.getString(FlinkOptions.HIDDEN_PARTITIONING_RULE)) : PartitionPruner.getInstance(list2, String.join(",", this.partitionKeys));
    }

    private int getDataBucket(List<ResolvedExpression> list) {
        if (!OptionsResolver.isBucketIndexType(this.conf) || list.isEmpty()) {
            return -1;
        }
        Set set = (Set) Arrays.stream(OptionsResolver.getIndexKeys(this.conf)).collect(Collectors.toSet());
        List list2 = (List) list.stream().filter(resolvedExpression -> {
            return ExpressionUtils.isEqualsLitExpr(resolvedExpression, set);
        }).collect(Collectors.toList());
        if (ExpressionUtils.isFilteringByAllFields(list2, set)) {
            return PrimaryKeyPruners.getBucketId(list2, this.conf);
        }
        return -1;
    }

    private List<MergeOnReadInputSplit> buildInputSplits() {
        FileIndex orBuildFileIndex = getOrBuildFileIndex();
        List<String> orBuildPartitionPaths = orBuildFileIndex.getOrBuildPartitionPaths();
        if (orBuildPartitionPaths.isEmpty()) {
            return Collections.emptyList();
        }
        List<StoragePathInfo> filesInPartitions = orBuildFileIndex.getFilesInPartitions();
        if (filesInPartitions.isEmpty()) {
            throw new HoodieException("No files found for reading in user provided path.");
        }
        HoodieTableFileSystemView hoodieTableFileSystemView = new HoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(), filesInPartitions);
        if (!hoodieTableFileSystemView.getLastInstant().isPresent()) {
            return Collections.emptyList();
        }
        String timestamp = ((HoodieInstant) hoodieTableFileSystemView.getLastInstant().get()).getTimestamp();
        String string = this.conf.getString(FlinkOptions.MERGE_TYPE);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        return (List) orBuildPartitionPaths.stream().map(str -> {
            return (List) hoodieTableFileSystemView.getLatestMergedFileSlicesBeforeOrOn(str, timestamp).map(fileSlice -> {
                return new MergeOnReadInputSplit(atomicInteger.getAndAdd(1), (String) fileSlice.getBaseFile().map((v0) -> {
                    return v0.getPath();
                }).orElse((Object) null), Option.ofNullable(fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(hoodieLogFile -> {
                    return hoodieLogFile.getPath().toString();
                }).collect(Collectors.toList())), timestamp, this.metaClient.getBasePath(), this.maxCompactionMemoryInBytes, string, null, fileSlice.getFileId());
            }).collect(Collectors.toList());
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
    }

    public HoodieInputFormat<?> getInputFormat() {
        return getInputFormat(false);
    }

    @VisibleForTesting
    public HoodieInputFormat<?> getInputFormat(boolean z) {
        return z ? getStreamInputFormat() : getBatchInputFormat();
    }

    private HoodieInputFormat<?> getBatchInputFormat() {
        Schema tableAvroSchema = getTableAvroSchema();
        DataType convertToDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
        RowType rowType = (RowType) convertToDataType.getLogicalType();
        RowType rowType2 = (RowType) getProducedDataType().notNull().getLogicalType();
        String string = this.conf.getString(FlinkOptions.QUERY_TYPE);
        boolean z = -1;
        switch (string.hashCode()) {
            case -683877954:
                if (string.equals(FlinkOptions.QUERY_TYPE_READ_OPTIMIZED)) {
                    z = true;
                    break;
                }
                break;
            case 284874180:
                if (string.equals(FlinkOptions.QUERY_TYPE_SNAPSHOT)) {
                    z = false;
                    break;
                }
                break;
            case 1085372378:
                if (string.equals(FlinkOptions.QUERY_TYPE_INCREMENTAL)) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                HoodieTableType valueOf = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
                List<HoodieCommitMetadata> allCommitsMetadata = FormatUtils.getAllCommitsMetadata(this.metaClient, this.conf, FilePathUtils.toFlinkPath(this.path));
                switch (AnonymousClass2.$SwitchMap$org$apache$hudi$common$model$HoodieTableType[valueOf.ordinal()]) {
                    case 1:
                        List<MergeOnReadInputSplit> buildInputSplits = buildInputSplits();
                        if (!buildInputSplits.isEmpty()) {
                            return mergeOnReadInputFormat(rowType, rowType2, tableAvroSchema, convertToDataType, buildInputSplits, allCommitsMetadata, false);
                        }
                        LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead");
                        return InputFormats.EMPTY_INPUT_FORMAT;
                    case 2:
                    case 3:
                        return baseFileOnlyInputFormat();
                    default:
                        throw new HoodieException("Unexpected table type: " + this.conf.getString(FlinkOptions.TABLE_TYPE));
                }
            case true:
                return baseFileOnlyInputFormat();
            case true:
                if (OptionsResolver.isMowTable(this.conf)) {
                    return baseFileOnlyInputFormat();
                }
                IncrementalInputSplits build = IncrementalInputSplits.builder().conf(this.conf).path(FilePathUtils.toFlinkPath(this.path)).rowType(this.tableRowType).maxCompactionMemoryInBytes(this.maxCompactionMemoryInBytes).partitionPruner(this.partitionPruner).build();
                boolean z2 = this.conf.getBoolean(FlinkOptions.CDC_ENABLED);
                IncrementalInputSplits.Result inputSplits = build.inputSplits(this.metaClient, z2);
                List<HoodieCommitMetadata> commitsMetadata = inputSplits.getCommitsMetadata() != null ? inputSplits.getCommitsMetadata() : FormatUtils.getAllCommitsMetadata(this.metaClient, this.conf, FilePathUtils.toFlinkPath(this.path));
                if (!inputSplits.isEmpty()) {
                    return z2 ? cdcInputFormat(rowType, rowType2, tableAvroSchema, convertToDataType, inputSplits.getInputSplits(), commitsMetadata) : mergeOnReadInputFormat(rowType, rowType2, tableAvroSchema, convertToDataType, inputSplits.getInputSplits(), commitsMetadata, false);
                }
                LOG.warn("No input splits generate for incremental read, returns empty collection instead");
                return InputFormats.EMPTY_INPUT_FORMAT;
            default:
                throw new HoodieException(String.format("Invalid query type : '%s', options ['%s', '%s', '%s'] are supported now", string, FlinkOptions.QUERY_TYPE_SNAPSHOT, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, FlinkOptions.QUERY_TYPE_INCREMENTAL));
        }
    }

    private HoodieInputFormat<?> getStreamInputFormat() {
        Schema inferSchemaFromDdl = (this.metaClient == null || !tableDataExists()) ? inferSchemaFromDdl() : getTableAvroSchema();
        DataType convertToDataType = AvroSchemaConverter.convertToDataType(inferSchemaFromDdl);
        RowType rowType = (RowType) convertToDataType.getLogicalType();
        RowType rowType2 = (RowType) getProducedDataType().notNull().getLogicalType();
        String string = this.conf.getString(FlinkOptions.QUERY_TYPE);
        boolean z = -1;
        switch (string.hashCode()) {
            case 284874180:
                if (string.equals(FlinkOptions.QUERY_TYPE_SNAPSHOT)) {
                    z = false;
                    break;
                }
                break;
            case 1085372378:
                if (string.equals(FlinkOptions.QUERY_TYPE_INCREMENTAL)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
            case true:
                return this.conf.getBoolean(FlinkOptions.CDC_ENABLED) ? cdcInputFormat(rowType, rowType2, inferSchemaFromDdl, convertToDataType, Collections.emptyList(), Collections.emptyList()) : mergeOnReadInputFormat(rowType, rowType2, inferSchemaFromDdl, convertToDataType, Collections.emptyList(), Collections.emptyList(), HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)) == HoodieTableType.MERGE_ON_READ);
            default:
                throw new HoodieException(String.format("Invalid query type : '%s', options ['%s', '%s'] are supported now", string, FlinkOptions.QUERY_TYPE_SNAPSHOT, FlinkOptions.QUERY_TYPE_INCREMENTAL));
        }
    }

    private boolean tableDataExists() {
        return this.metaClient.getActiveTimeline().getLastCommitMetadataWithValidData().isPresent();
    }

    private MergeOnReadInputFormat cdcInputFormat(RowType rowType, RowType rowType2, Schema schema, DataType dataType, List<MergeOnReadInputSplit> list, List<HoodieCommitMetadata> list2) {
        return CdcInputFormat.builder().config(this.conf).tableState(new MergeOnReadTableState(rowType, rowType2, schema.toString(), AvroSchemaConverter.convertToSchema(rowType2).toString(), list, list2, this.conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","))).fieldTypes(dataType.getChildren()).defaultPartName(this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)).predicates(this.predicates).limit(this.limit).emitDelete(false).build();
    }

    private MergeOnReadInputFormat mergeOnReadInputFormat(RowType rowType, RowType rowType2, Schema schema, DataType dataType, List<MergeOnReadInputSplit> list, List<HoodieCommitMetadata> list2, boolean z) {
        return MergeOnReadInputFormat.builder().config(this.conf).tableState(new MergeOnReadTableState(rowType, rowType2, schema.toString(), AvroSchemaConverter.convertToSchema(rowType2).toString(), list, list2, this.conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","))).fieldTypes(dataType.getChildren()).defaultPartName(this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)).predicates(this.predicates).limit(this.limit).emitDelete(z).internalSchemaManager(this.internalSchemaManager).build();
    }

    private HoodieInputFormat<?> baseFileOnlyInputFormat() {
        List<StoragePathInfo> readFiles = getReadFiles();
        if (readFiles.isEmpty()) {
            return InputFormats.EMPTY_INPUT_FORMAT;
        }
        HoodieTableFileSystemView hoodieTableFileSystemView = new HoodieTableFileSystemView(this.metaClient, this.metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(), readFiles);
        if (OptionsResolver.isMowTable(this.conf)) {
            StoragePath[] storagePathArr = (StoragePath[]) hoodieTableFileSystemView.getLatestBaseFilesInRange(Objects.equals(this.conf.getString(FlinkOptions.READ_START_COMMIT), FlinkOptions.START_COMMIT_EARLIEST) ? null : this.conf.getString(FlinkOptions.READ_START_COMMIT), this.conf.getString(FlinkOptions.READ_END_COMMIT)).map((v0) -> {
                return v0.getPathInfo();
            }).map((v0) -> {
                return v0.getPath();
            }).toArray(i -> {
                return new StoragePath[i];
            });
            if (storagePathArr.length == 0) {
                return InputFormats.EMPTY_INPUT_FORMAT;
            }
            return new MergeOnWriteInputFormat(FilePathUtils.toFlinkPaths(storagePathArr), (String[]) this.schema.getColumnNames().toArray(new String[0]), (DataType[]) this.schema.getColumnDataTypes().toArray(new DataType[0]), this.requiredPos, this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD), this.conf.getBoolean(FlinkOptions.HIDDEN_PARTITIONING_ENABLED), this.conf.getString(FlinkOptions.HIDDEN_PARTITIONING_RULE), this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING), this.predicates, this.limit == -1 ? Long.MAX_VALUE : this.limit, this.conf, HadoopConfigurations.getParquetConf(this.conf, (Configuration) this.hadoopConf.unwrap()), this.conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE), this.internalSchemaManager);
        }
        Path[] pathArr = (Path[]) hoodieTableFileSystemView.getLatestBaseFiles().map((v0) -> {
            return v0.getPathInfo();
        }).map(storagePathInfo -> {
            return new Path(storagePathInfo.getPath().toUri());
        }).toArray(i2 -> {
            return new Path[i2];
        });
        if (pathArr.length == 0) {
            return InputFormats.EMPTY_INPUT_FORMAT;
        }
        return new CopyOnWriteInputFormat(FilePathUtils.toFlinkPaths(pathArr), (String[]) this.schema.getColumnNames().toArray(new String[0]), (DataType[]) this.schema.getColumnDataTypes().toArray(new DataType[0]), this.requiredPos, this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD), this.conf.getBoolean(FlinkOptions.HIDDEN_PARTITIONING_ENABLED), this.conf.getString(FlinkOptions.HIDDEN_PARTITIONING_RULE), this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING), this.predicates, this.limit == -1 ? Long.MAX_VALUE : this.limit, this.conf, HadoopConfigurations.getParquetConf(this.conf, (Configuration) this.hadoopConf.unwrap()), this.conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE), this.internalSchemaManager);
    }

    private Schema inferSchemaFromDdl() {
        return HoodieAvroUtils.addMetadataFields(AvroSchemaConverter.convertToSchema(this.tableRowType), this.conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
    }

    private FileIndex getOrBuildFileIndex() {
        if (this.fileIndex == null) {
            this.fileIndex = FileIndex.builder().path(this.path).conf(this.conf).rowType(this.tableRowType).dataPruner(this.dataPruner).partitionPruner(this.partitionPruner).dataBucket(this.dataBucket).build();
        }
        return this.fileIndex;
    }

    private int[] getLookupKeys(int[][] iArr) {
        int[] iArr2 = new int[iArr.length];
        int i = 0;
        for (int[] iArr3 : iArr) {
            if (iArr3.length > 1) {
                throw new UnsupportedOperationException("Hive lookup can not support nested key now.");
            }
            iArr2[i] = iArr3[0];
            i++;
        }
        return iArr2;
    }

    @VisibleForTesting
    public Schema getTableAvroSchema() {
        try {
            return new TableSchemaResolver(this.metaClient).getTableAvroSchema();
        } catch (Throwable th) {
            LOG.warn("Get table avro schema error, use schema from the DDL instead", th);
            return inferSchemaFromDdl();
        }
    }

    @VisibleForTesting
    public HoodieTableMetaClient getMetaClient() {
        return this.metaClient;
    }

    @VisibleForTesting
    public org.apache.flink.configuration.Configuration getConf() {
        return this.conf;
    }

    @VisibleForTesting
    public void reset() {
        this.metaClient.reloadActiveTimeline();
        this.fileIndex = null;
    }

    @VisibleForTesting
    public List<StoragePathInfo> getReadFiles() {
        FileIndex orBuildFileIndex = getOrBuildFileIndex();
        return orBuildFileIndex.getOrBuildPartitionPaths().isEmpty() ? Collections.emptyList() : orBuildFileIndex.getFilesInPartitions();
    }

    @VisibleForTesting
    public List<ExpressionPredicates.Predicate> getPredicates() {
        return this.predicates;
    }

    @VisibleForTesting
    public DataPruner getDataPruner() {
        return this.dataPruner;
    }

    @VisibleForTesting
    public int getDataBucket() {
        return this.dataBucket;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -35648451:
                if (implMethodName.equals("getBatchInputFormat")) {
                    z = true;
                    break;
                }
                break;
            case 421507757:
                if (implMethodName.equals("getFileId")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/format/mor/MergeOnReadInputSplit") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.getFileId();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/hudi/common/function/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/HoodieTableSource") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/hudi/table/format/HoodieInputFormat;")) {
                    HoodieTableSource hoodieTableSource = (HoodieTableSource) serializedLambda.getCapturedArg(0);
                    return hoodieTableSource::getBatchInputFormat;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
