package org.apache.hudi.hadoop.realtime;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieColumnProjectionUtils;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.InputPathHandler;
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

@UseRecordReaderFromInputFormat
@UseFileSplitsFromInputFormat
/* loaded from: input_file:org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.class */
public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat implements Configurable {
    private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class);

    public InputSplit[] getSplits(JobConf jobConf, int i) throws IOException {
        InputSplit[] realtimeSplits = HoodieRealtimeInputFormatUtils.getRealtimeSplits(jobConf, Arrays.stream(super.getSplits(jobConf, i)).map(inputSplit -> {
            return (FileSplit) inputSplit;
        }));
        InputSplit[] inputSplitArr = (InputSplit[]) new ArrayList().toArray(new InputSplit[0]);
        if (Boolean.parseBoolean(jobConf.get("hoodie.mor.incremental.log.read", HoodieRealtimeRecordReader.DEFAULT_REALTIME_SKIP_MERGE))) {
            inputSplitArr = getDeltaOnlySplits(jobConf);
        }
        InputSplit[] inputSplitArr2 = new InputSplit[realtimeSplits.length + inputSplitArr.length];
        System.arraycopy(realtimeSplits, 0, inputSplitArr2, 0, realtimeSplits.length);
        System.arraycopy(inputSplitArr, 0, inputSplitArr2, realtimeSplits.length, inputSplitArr.length);
        return inputSplitArr2;
    }

    private InputSplit[] getDeltaOnlySplits(JobConf jobConf) throws IOException {
        ArrayList arrayList = new ArrayList();
        List<String> incrementalTableNames = HoodieHiveUtils.getIncrementalTableNames(Job.getInstance(jobConf));
        Map<String, HoodieTableMetaClient> tableMetaClientMap = new InputPathHandler(this.conf, getInputPaths(jobConf), incrementalTableNames).getTableMetaClientMap();
        Iterator<String> it = incrementalTableNames.iterator();
        while (it.hasNext()) {
            HoodieTableMetaClient hoodieTableMetaClient = tableMetaClientMap.get(it.next());
            if (hoodieTableMetaClient != null) {
                String tableName = hoodieTableMetaClient.getTableConfig().getTableName();
                Job job = Job.getInstance(jobConf);
                Option<HoodieTimeline> filteredCommitsTimeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(job, hoodieTableMetaClient);
                if (!filteredCommitsTimeline.isPresent()) {
                    return (InputSplit[]) arrayList.toArray(new InputSplit[0]);
                }
                HoodieTimeline findInstantsAfter = ((HoodieTimeline) filteredCommitsTimeline.get()).findInstantsAfter(HoodieHiveUtils.readStartCommitTime(job, tableName), HoodieHiveUtils.readMaxCommits(job, tableName).intValue());
                Option of = Option.of(findInstantsAfter.getInstants().collect(Collectors.toList()));
                if (!of.isPresent()) {
                    return (InputSplit[]) arrayList.toArray(new InputSplit[0]);
                }
                HashMap<String, HashMap<String, FileStatus>> listAffectedFilesForCommits = HoodieInputFormatUtils.listAffectedFilesForCommits(new Path(hoodieTableMetaClient.getBasePath()), (List) of.get(), findInstantsAfter);
                ArrayList arrayList2 = new ArrayList();
                listAffectedFilesForCommits.forEach((str, hashMap) -> {
                    hashMap.forEach((str, fileStatus) -> {
                        arrayList2.add(fileStatus);
                    });
                });
                HoodieTableFileSystemView hoodieTableFileSystemView = new HoodieTableFileSystemView(hoodieTableMetaClient, findInstantsAfter, (FileStatus[]) arrayList2.toArray(new FileStatus[0]));
                List list = (List) listAffectedFilesForCommits.keySet().stream().flatMap(str2 -> {
                    return hoodieTableFileSystemView.getAllFileGroups(str2);
                }).collect(Collectors.toList());
                String timestamp = ((HoodieInstant) hoodieTableFileSystemView.getLastInstant().get()).getTimestamp();
                list.stream().forEach(hoodieFileGroup -> {
                    try {
                        List list2 = (List) hoodieFileGroup.getAllFileSlices().filter(fileSlice -> {
                            return fileSlice.getBaseFile().isPresent();
                        }).collect(Collectors.toList());
                        if (hoodieFileGroup.getLatestFileSlice().isPresent() && list2.isEmpty()) {
                            List list3 = (List) ((FileSlice) hoodieFileGroup.getLatestFileSlice().get()).getLogFiles().map(hoodieLogFile -> {
                                return hoodieLogFile.getPath().toString();
                            }).collect(Collectors.toList());
                            arrayList.add(new HoodieRealtimeFileSplit(new FileSplit(new Path((String) list3.get(0)), 0L, 2147483647L, new String[]{""}), hoodieTableMetaClient.getBasePath(), list3, timestamp));
                            LOG.info("add DeltaOnlySplits: " + ((InputSplit) arrayList.get(arrayList.size() - 1)).toString());
                        }
                    } catch (IOException e) {
                        throw new HoodieException("Error obtaining data file/log file grouping ", e);
                    }
                });
            }
        }
        LOG.info("Returning a total DeltaOnlySplits of " + arrayList.size());
        return (InputSplit[]) arrayList.toArray(new InputSplit[0]);
    }

    @Override // org.apache.hudi.hadoop.HoodieParquetInputFormat
    public FileStatus[] listStatus(JobConf jobConf) throws IOException {
        return super.listStatus(jobConf);
    }

    @Override // org.apache.hudi.hadoop.HoodieParquetInputFormat
    protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline hoodieDefaultTimeline) {
        return hoodieDefaultTimeline;
    }

    void addProjectionToJobConf(RealtimeSplit realtimeSplit, JobConf jobConf) {
        if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null || !(realtimeSplit.getDeltaLogPaths().isEmpty() || HoodieRealtimeInputFormatUtils.requiredProjectionFieldsExistInConf(jobConf))) {
            synchronized (jobConf) {
                LOG.info("Before adding Hoodie columns, Projections :" + jobConf.get(HoodieColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + ", Ids :" + jobConf.get(HoodieColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
                if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null || (!realtimeSplit.getDeltaLogPaths().isEmpty() && !HoodieRealtimeInputFormatUtils.requiredProjectionFieldsExistInConf(jobConf))) {
                    HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConf);
                    if (!realtimeSplit.getDeltaLogPaths().isEmpty()) {
                        HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf);
                    }
                    this.conf = jobConf;
                    this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED);
                }
            }
        }
    }

    @Override // org.apache.hudi.hadoop.HoodieParquetInputFormat
    public RecordReader<NullWritable, ArrayWritable> getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
        ValidationUtils.checkArgument(inputSplit instanceof RealtimeSplit, "HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + inputSplit);
        RealtimeSplit realtimeSplit = (RealtimeSplit) inputSplit;
        addProjectionToJobConf(realtimeSplit, jobConf);
        LOG.info("Creating record reader with readCols :" + jobConf.get(HoodieColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + ", Ids :" + jobConf.get(HoodieColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
        return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, super.getRecordReader(inputSplit, jobConf, reporter));
    }

    @Override // org.apache.hudi.hadoop.HoodieParquetInputFormat
    public Configuration getConf() {
        return this.conf;
    }
}
