package org.apache.hudi.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Serializable;

/* loaded from: input_file:org/apache/hudi/source/IncrementalInputSplits.class */
public class IncrementalInputSplits implements Serializable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalInputSplits.class);
    private final Configuration conf;
    private final Path path;
    private final RowType rowType;
    private final long maxCompactionMemoryInBytes;
    private final PartitionPruners.PartitionPruner partitionPruner;
    private final boolean skipCompaction;

    /* loaded from: input_file:org/apache/hudi/source/IncrementalInputSplits$Builder.class */
    public static class Builder {
        private Configuration conf;
        private Path path;
        private RowType rowType;
        private long maxCompactionMemoryInBytes;
        private PartitionPruners.PartitionPruner partitionPruner;
        private boolean skipCompaction = false;

        public Builder conf(Configuration configuration) {
            this.conf = configuration;
            return this;
        }

        public Builder path(Path path) {
            this.path = path;
            return this;
        }

        public Builder rowType(RowType rowType) {
            this.rowType = rowType;
            return this;
        }

        public Builder maxCompactionMemoryInBytes(long j) {
            this.maxCompactionMemoryInBytes = j;
            return this;
        }

        public Builder partitionPruner(@Nullable PartitionPruners.PartitionPruner partitionPruner) {
            this.partitionPruner = partitionPruner;
            return this;
        }

        public Builder skipCompaction(boolean z) {
            this.skipCompaction = z;
            return this;
        }

        public IncrementalInputSplits build() {
            return new IncrementalInputSplits((Configuration) Objects.requireNonNull(this.conf), (Path) Objects.requireNonNull(this.path), (RowType) Objects.requireNonNull(this.rowType), this.maxCompactionMemoryInBytes, this.partitionPruner, this.skipCompaction);
        }
    }

    /* loaded from: input_file:org/apache/hudi/source/IncrementalInputSplits$Result.class */
    public static class Result {
        private final List<MergeOnReadInputSplit> inputSplits;
        private final String endInstant;
        public static final Result EMPTY = instance(Collections.emptyList(), "");

        public boolean isEmpty() {
            return this.inputSplits.size() == 0;
        }

        public List<MergeOnReadInputSplit> getInputSplits() {
            return this.inputSplits;
        }

        public String getEndInstant() {
            return this.endInstant;
        }

        private Result(List<MergeOnReadInputSplit> list, String str) {
            this.inputSplits = list;
            this.endInstant = str;
        }

        public static Result instance(List<MergeOnReadInputSplit> list, String str) {
            return new Result(list, str);
        }
    }

    private IncrementalInputSplits(Configuration configuration, Path path, RowType rowType, long j, @Nullable PartitionPruners.PartitionPruner partitionPruner, boolean z) {
        this.conf = configuration;
        this.path = path;
        this.rowType = rowType;
        this.maxCompactionMemoryInBytes = j;
        this.partitionPruner = partitionPruner;
        this.skipCompaction = z;
    }

    public static Builder builder() {
        return new Builder();
    }

    private FileIndex getFileIndex() {
        return FileIndex.instance(new org.apache.hadoop.fs.Path(this.path.toUri()), this.conf, this.rowType, null, this.partitionPruner);
    }

    private Set<String> getReadPartitions(List<HoodieCommitMetadata> list) {
        Set<String> writePartitionPaths = HoodieInputFormatUtils.getWritePartitionPaths(list);
        if (this.partitionPruner == null) {
            return writePartitionPaths;
        }
        Set<String> filter = this.partitionPruner.filter(writePartitionPaths);
        double size = writePartitionPaths.size();
        double size2 = filter.size();
        LOG.info("Selected " + size2 + " partitions out of " + size + ", pruned " + (size == 0.0d ? 0.0d : (1.0d - (size2 / size)) * 100.0d) + "% partitions.");
        return filter;
    }

    public Result inputSplits(HoodieTableMetaClient hoodieTableMetaClient, org.apache.hadoop.conf.Configuration configuration) {
        return inputSplits(hoodieTableMetaClient, configuration, null);
    }

    public Result inputSplits(HoodieTableMetaClient hoodieTableMetaClient, org.apache.hadoop.conf.Configuration configuration, String str) {
        InstantRange instantRange;
        Set<String> readPartitions;
        FileStatus[] writePathsOfInstants;
        hoodieTableMetaClient.reloadActiveTimeline();
        HoodieTimeline filterCompletedAndCompactionInstants = hoodieTableMetaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
        if (filterCompletedAndCompactionInstants.empty()) {
            LOG.warn("No splits found for the table under path " + this.path);
            return Result.EMPTY;
        }
        List<HoodieInstant> filterInstantsWithRange = filterInstantsWithRange(filterCompletedAndCompactionInstants, str);
        HoodieInstant hoodieInstant = filterInstantsWithRange.size() == 0 ? null : filterInstantsWithRange.get(filterInstantsWithRange.size() - 1);
        if (hoodieInstant == null) {
            LOG.info("No new instant found for the table under path " + this.path + ", skip reading");
            return Result.EMPTY;
        }
        if (str != null) {
            instantRange = InstantRange.getInstance(str, hoodieInstant.getTimestamp(), InstantRange.RangeType.OPEN_CLOSE);
        } else if (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()) {
            String string = this.conf.getString(FlinkOptions.READ_START_COMMIT);
            instantRange = string.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST) ? null : InstantRange.getInstance(string, hoodieInstant.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE);
        } else {
            instantRange = InstantRange.getInstance(hoodieInstant.getTimestamp(), hoodieInstant.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE);
        }
        String string2 = this.conf.getString(FlinkOptions.TABLE_NAME);
        if (instantRange == null) {
            FileIndex fileIndex = getFileIndex();
            readPartitions = new HashSet(fileIndex.getOrBuildPartitionPaths());
            if (readPartitions.size() == 0) {
                LOG.warn("No partitions found for reading in user provided path.");
                return Result.EMPTY;
            }
            writePathsOfInstants = fileIndex.getFilesInPartitions();
        } else if (isScanFiles(instantRange.getStartInstant(), hoodieTableMetaClient)) {
            FileIndex fileIndex2 = getFileIndex();
            readPartitions = new HashSet(fileIndex2.getOrBuildPartitionPaths());
            if (readPartitions.size() == 0) {
                LOG.warn("No partitions found for reading in user provided path.");
                return null;
            }
            writePathsOfInstants = fileIndex2.getFilesInPartitionsFilter(instantRange.getStartInstant());
        } else {
            List<HoodieCommitMetadata> list = (List) filterInstantsWithRange.stream().map(hoodieInstant2 -> {
                return WriteProfiles.getCommitMetadata(string2, this.path, hoodieInstant2, filterCompletedAndCompactionInstants);
            }).collect(Collectors.toList());
            List<HoodieCommitMetadata> archivedMetadata = getArchivedMetadata(hoodieTableMetaClient, instantRange, filterCompletedAndCompactionInstants, string2);
            if (archivedMetadata.size() > 0) {
                LOG.warn("\n--------------------------------------------------------------------------------\n---------- caution: the reader has fall behind too much from the writer,\n---------- tweak 'read.tasks' option to add parallelism of read tasks.\n--------------------------------------------------------------------------------");
            }
            List<HoodieCommitMetadata> mergeList = archivedMetadata.size() > 0 ? mergeList(archivedMetadata, list) : list;
            readPartitions = getReadPartitions(mergeList);
            if (readPartitions.size() == 0) {
                LOG.warn("No partitions found for reading in user provided path.");
                return Result.EMPTY;
            }
            writePathsOfInstants = WriteProfiles.getWritePathsOfInstants(this.path, configuration, mergeList, hoodieTableMetaClient.getTableType());
        }
        if (writePathsOfInstants.length == 0) {
            LOG.warn("No files found for reading in user provided path.");
            return Result.EMPTY;
        }
        HoodieTableFileSystemView hoodieTableFileSystemView = new HoodieTableFileSystemView(hoodieTableMetaClient, filterCompletedAndCompactionInstants, writePathsOfInstants);
        String timestamp = hoodieInstant.getTimestamp();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        String string3 = this.conf.getString(FlinkOptions.MERGE_TYPE);
        InstantRange instantRange2 = instantRange;
        return Result.instance((List) readPartitions.stream().map(str2 -> {
            return (List) hoodieTableFileSystemView.getMergedFileSlicesForFlink(str2, timestamp).map(fileSlice -> {
                Option ofNullable = Option.ofNullable(fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(hoodieLogFile -> {
                    return hoodieLogFile.getPath().toString();
                }).collect(Collectors.toList()));
                return new MergeOnReadInputSplit(fileSlice.getFileId(), atomicInteger.getAndAdd(1), (String) fileSlice.getBaseFile().map((v0) -> {
                    return v0.getPath();
                }).orElse(null), ofNullable, timestamp, hoodieTableMetaClient.getBasePath(), this.maxCompactionMemoryInBytes, string3, instantRange2);
            }).collect(Collectors.toList());
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList()), timestamp);
    }

    private boolean isScanFiles(String str, HoodieTableMetaClient hoodieTableMetaClient) {
        HoodieTimeline filterCompletedInstants = hoodieTableMetaClient.getActiveTimeline().getCleanerTimeline().filterCompletedInstants();
        if (!this.conf.getBoolean(FlinkOptions.READ_AS_STREAMING_SCAN_FILES) || !this.conf.getBoolean(FlinkOptions.READ_AS_STREAMING) || filterCompletedInstants.empty() || filterCompletedInstants.filter(hoodieInstant -> {
            return hoodieInstant.getTimestamp().compareTo(str) >= 0;
        }).empty()) {
            return false;
        }
        try {
            String earliestCommitToRetain = TimelineMetadataUtils.deserializeHoodieCleanMetadata(filterCompletedInstants.getInstantDetails(new HoodieInstant(false, HoodieTimeline.CLEAN_ACTION, filterCompletedInstants.lastInstant().get().getTimestamp())).get()).getEarliestCommitToRetain();
            if (earliestCommitToRetain.compareTo(str) <= 0) {
                return false;
            }
            LOG.info("Need Scan Files from {}, earliestCommitToRetain is {}.", str, earliestCommitToRetain);
            return true;
        } catch (IOException e) {
            throw new HoodieException(e);
        }
    }

    private List<HoodieCommitMetadata> getArchivedMetadata(HoodieTableMetaClient hoodieTableMetaClient, InstantRange instantRange, HoodieTimeline hoodieTimeline, String str) {
        if (hoodieTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) {
            HoodieArchivedTimeline archivedTimeline = hoodieTableMetaClient.getArchivedTimeline(instantRange.getStartInstant());
            HoodieTimeline filterCompletedInstants = archivedTimeline.getCommitsTimeline().filterCompletedInstants();
            if (!filterCompletedInstants.empty()) {
                return (List) maySkipCompaction(filterCompletedInstants.getInstants()).map(hoodieInstant -> {
                    return WriteProfiles.getCommitMetadata(str, this.path, hoodieInstant, archivedTimeline);
                }).collect(Collectors.toList());
            }
        }
        return Collections.emptyList();
    }

    private List<HoodieInstant> filterInstantsWithRange(HoodieTimeline hoodieTimeline, String str) {
        HoodieTimeline filterCompletedInstants = hoodieTimeline.filterCompletedInstants();
        if (str != null) {
            return (List) maySkipCompaction(filterCompletedInstants.getInstants()).filter(hoodieInstant -> {
                return HoodieTimeline.compareTimestamps(hoodieInstant.getTimestamp(), HoodieTimeline.GREATER_THAN, str);
            }).collect(Collectors.toList());
        }
        Stream<HoodieInstant> instants = filterCompletedInstants.getInstants();
        if (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent() && !((String) this.conf.get(FlinkOptions.READ_START_COMMIT)).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)) {
            String str2 = (String) this.conf.get(FlinkOptions.READ_START_COMMIT);
            instants = instants.filter(hoodieInstant2 -> {
                return HoodieTimeline.compareTimestamps(hoodieInstant2.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, str2);
            });
        }
        if (this.conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent()) {
            String str3 = (String) this.conf.get(FlinkOptions.READ_END_COMMIT);
            instants = instants.filter(hoodieInstant3 -> {
                return HoodieTimeline.compareTimestamps(hoodieInstant3.getTimestamp(), HoodieTimeline.LESSER_THAN_OR_EQUALS, str3);
            });
        }
        return (List) maySkipCompaction(instants).collect(Collectors.toList());
    }

    private Stream<HoodieInstant> maySkipCompaction(Stream<HoodieInstant> stream) {
        return (this.skipCompaction && OptionsResolver.isMorTable(this.conf)) ? stream.filter(hoodieInstant -> {
            return !hoodieInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION);
        }) : stream;
    }

    private static <T> List<T> mergeList(List<T> list, List<T> list2) {
        ArrayList arrayList = new ArrayList(list);
        arrayList.addAll(list2);
        return arrayList;
    }
}
