package org.apache.hudi.hadoop.realtime;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieColumnProjectionUtils;
import org.apache.hudi.hadoop.HoodieEmptyRecordReader;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.PathWithLogFileStatus;
import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

@UseRecordReaderFromInputFormat
@UseFileSplitsFromInputFormat
/* loaded from: input_file:org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.class */
public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat implements Configurable {
    private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class);

    public InputSplit[] getSplits(JobConf jobConf, int i) throws IOException {
        return HoodieRealtimeInputFormatUtils.getRealtimeSplits(jobConf, Arrays.stream(super.getSplits(jobConf, i)).map(inputSplit -> {
            return (FileSplit) inputSplit;
        }));
    }

    @Override // org.apache.hudi.hadoop.HoodieParquetInputFormat
    protected List<FileStatus> listStatusForIncrementalMode(JobConf jobConf, HoodieTableMetaClient hoodieTableMetaClient, List<Path> list) throws IOException {
        ArrayList arrayList = new ArrayList();
        String tableName = hoodieTableMetaClient.getTableConfig().getTableName();
        Job job = Job.getInstance(jobConf);
        Option<HoodieTimeline> filteredCommitsTimeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(job, hoodieTableMetaClient);
        if (!filteredCommitsTimeline.isPresent()) {
            return arrayList;
        }
        HoodieTimeline findInstantsAfter = filteredCommitsTimeline.get().findInstantsAfter(HoodieHiveUtils.readStartCommitTime(job, tableName), HoodieHiveUtils.readMaxCommits(job, tableName).intValue());
        Option of = Option.of(findInstantsAfter.getInstants().collect(Collectors.toList()));
        if (!of.isPresent()) {
            return arrayList;
        }
        HashMap<String, HashMap<String, FileStatus>> listAffectedFilesForCommits = HoodieInputFormatUtils.listAffectedFilesForCommits(new Path(hoodieTableMetaClient.getBasePath()), (List) of.get(), findInstantsAfter);
        ArrayList arrayList2 = new ArrayList();
        listAffectedFilesForCommits.forEach((str, hashMap) -> {
            hashMap.forEach((str, fileStatus) -> {
                arrayList2.add(fileStatus);
            });
        });
        HoodieTableFileSystemView hoodieTableFileSystemView = new HoodieTableFileSystemView(hoodieTableMetaClient, findInstantsAfter, (FileStatus[]) arrayList2.toArray(new FileStatus[0]));
        String basePath = hoodieTableMetaClient.getBasePath();
        List list2 = (List) listAffectedFilesForCommits.keySet().stream().filter(str2 -> {
            return str2.isEmpty() ? list.contains(new Path(basePath)) : list.contains(new Path(basePath, str2));
        }).collect(Collectors.toList());
        if (list2.isEmpty()) {
            return arrayList;
        }
        List list3 = (List) list2.stream().flatMap(str3 -> {
            return hoodieTableFileSystemView.getAllFileGroups(str3);
        }).collect(Collectors.toList());
        setInputPaths(jobConf, (String) list2.stream().map(str4 -> {
            return str4.isEmpty() ? basePath : new Path(basePath, str4).toUri().toString();
        }).collect(Collectors.joining(",")));
        FileStatus[] status = getStatus(jobConf);
        HashMap hashMap2 = new HashMap();
        for (int i = 0; i < status.length; i++) {
            hashMap2.put(status[i].getPath().toUri().toString(), status[i]);
        }
        String timestamp = hoodieTableFileSystemView.getLastInstant().get().getTimestamp();
        list3.stream().forEach(hoodieFileGroup -> {
            try {
                List list4 = (List) hoodieFileGroup.getAllFileSlices().filter(fileSlice -> {
                    return fileSlice.getBaseFile().isPresent();
                }).collect(Collectors.toList());
                if (!list4.isEmpty()) {
                    String uri = ((FileSlice) list4.get(0)).getBaseFile().get().getFileStatus().getPath().toUri().toString();
                    if (!hashMap2.containsKey(uri)) {
                        throw new HoodieException("Error obtaining fileStatus for file: " + uri);
                    }
                    RealtimeFileStatus realtimeFileStatus = new RealtimeFileStatus((FileStatus) hashMap2.get(uri));
                    realtimeFileStatus.setMaxCommitTime(timestamp);
                    realtimeFileStatus.setIsIncrementalFileStatus(true);
                    realtimeFileStatus.setBasePath(basePath);
                    realtimeFileStatus.setBaseFilePath(uri);
                    realtimeFileStatus.setDeltaLogPaths((List) hoodieFileGroup.getLatestFileSlice().get().getLogFiles().map(hoodieLogFile -> {
                        return hoodieLogFile.getPath().toString();
                    }).collect(Collectors.toList()));
                    arrayList.add(realtimeFileStatus);
                }
                if (hoodieFileGroup.getLatestFileSlice().isPresent() && list4.isEmpty()) {
                    List list5 = (List) hoodieFileGroup.getLatestFileSlice().get().getLogFiles().map(hoodieLogFile2 -> {
                        return hoodieLogFile2.getFileStatus();
                    }).collect(Collectors.toList());
                    if (list5.size() > 0) {
                        RealtimeFileStatus realtimeFileStatus2 = new RealtimeFileStatus((FileStatus) list5.get(0));
                        realtimeFileStatus2.setOnlyLogFile(true);
                        realtimeFileStatus2.setIsIncrementalFileStatus(true);
                        realtimeFileStatus2.setDeltaLogPaths((List) list5.stream().map(fileStatus -> {
                            return fileStatus.getPath().toString();
                        }).collect(Collectors.toList()));
                        realtimeFileStatus2.setMaxCommitTime(timestamp);
                        realtimeFileStatus2.setBasePath(basePath);
                        arrayList.add(realtimeFileStatus2);
                    }
                }
            } catch (IOException e) {
                throw new HoodieException("Error obtaining data file/log file grouping ", e);
            }
        });
        return arrayList;
    }

    @Override // org.apache.hudi.hadoop.HoodieParquetInputFormat
    protected boolean includeLogFilesForSnapShotView() {
        return true;
    }

    @Override // org.apache.hudi.hadoop.HoodieParquetInputFormat
    protected boolean isSplitable(FileSystem fileSystem, Path path) {
        if (path instanceof PathWithLogFileStatus) {
            return ((PathWithLogFileStatus) path).isSplitable();
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hudi.hadoop.HoodieParquetInputFormat
    public FileSplit makeSplit(Path path, long j, long j2, String[] strArr, String[] strArr2) {
        return path instanceof PathWithLogFileStatus ? ((PathWithLogFileStatus) path).buildSplit(path, j, j2, strArr) : super.makeSplit(path, j, j2, strArr, strArr2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hudi.hadoop.HoodieParquetInputFormat
    public FileSplit makeSplit(Path path, long j, long j2, String[] strArr) {
        return path instanceof PathWithLogFileStatus ? ((PathWithLogFileStatus) path).buildSplit(path, j, j2, strArr) : super.makeSplit(path, j, j2, strArr);
    }

    @Override // org.apache.hudi.hadoop.HoodieParquetInputFormat
    public FileStatus[] listStatus(JobConf jobConf) throws IOException {
        return super.listStatus(jobConf);
    }

    @Override // org.apache.hudi.hadoop.HoodieParquetInputFormat
    protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline hoodieDefaultTimeline) {
        return hoodieDefaultTimeline;
    }

    void addProjectionToJobConf(RealtimeSplit realtimeSplit, JobConf jobConf) {
        if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null || !(realtimeSplit.getDeltaLogPaths().isEmpty() || HoodieRealtimeInputFormatUtils.requiredProjectionFieldsExistInConf(jobConf, realtimeSplit.getHoodieVirtualKeyInfo()))) {
            synchronized (jobConf) {
                LOG.info("Before adding Hoodie columns, Projections :" + jobConf.get(HoodieColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + ", Ids :" + jobConf.get(HoodieColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
                if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null || (!realtimeSplit.getDeltaLogPaths().isEmpty() && !HoodieRealtimeInputFormatUtils.requiredProjectionFieldsExistInConf(jobConf, realtimeSplit.getHoodieVirtualKeyInfo()))) {
                    HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConf);
                    if (!realtimeSplit.getDeltaLogPaths().isEmpty()) {
                        HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, realtimeSplit.getHoodieVirtualKeyInfo());
                    }
                    this.conf = jobConf;
                    this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");
                }
            }
        }
    }

    @Override // org.apache.hudi.hadoop.HoodieParquetInputFormat
    public RecordReader<NullWritable, ArrayWritable> getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
        ValidationUtils.checkArgument(inputSplit instanceof RealtimeSplit, "HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + inputSplit);
        RealtimeSplit realtimeSplit = (RealtimeSplit) inputSplit;
        addProjectionToJobConf(realtimeSplit, jobConf);
        LOG.info("Creating record reader with readCols :" + jobConf.get(HoodieColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + ", Ids :" + jobConf.get(HoodieColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
        return realtimeSplit.getIsLogOnlySplit() ? new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader()) : new HoodieRealtimeRecordReader(realtimeSplit, jobConf, super.getRecordReader(inputSplit, jobConf, reporter));
    }

    @Override // org.apache.hudi.hadoop.HoodieParquetInputFormat
    public Configuration getConf() {
        return this.conf;
    }
}
