package org.apache.hudi.repair;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.repair.RepairUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

/* loaded from: input_file:org/apache/hudi/repair/HoodieRepairTool.class */
public class HoodieRepairTool {
    private static final Logger LOG = LogManager.getLogger(HoodieRepairTool.class);
    private static final String BACKUP_DIR_PREFIX = "hoodie_repair_backup_";
    private final HoodieEngineContext context;
    private final HoodieTableMetaClient metaClient;
    private final HoodieTableMetadata tableMetadata;
    private final JavaSparkContext jsc;
    private String tableBasePath;
    private Mode mode;
    private String startingInstantTime;
    private String endingInstantTime;
    private String backupPathStr;
    private int parallelism = 2;
    private Boolean assumeDatePartitioning = false;

    /* loaded from: input_file:org/apache/hudi/repair/HoodieRepairTool$Mode.class */
    public enum Mode {
        REPAIR,
        DRY_RUN,
        UNDO,
        QUERY
    }

    public HoodieRepairTool(JavaSparkContext javaSparkContext, String str, Mode mode, String str2, String str3, String str4) {
        this.jsc = javaSparkContext;
        this.tableBasePath = str;
        this.mode = mode;
        this.startingInstantTime = str2;
        this.endingInstantTime = str3;
        if (mode == Mode.UNDO) {
            this.backupPathStr = str4;
        } else {
            this.backupPathStr = str + "/.hoodie/.cleanbackup/" + BACKUP_DIR_PREFIX + HoodieActiveTimeline.getDateFormat().format(new Date());
        }
        this.context = new HoodieSparkEngineContext(javaSparkContext);
        this.metaClient = HoodieTableMetaClient.builder().setConf(javaSparkContext.hadoopConfiguration()).setBasePath(str).setLoadActiveTimelineOnLoad(true).build();
        this.tableMetadata = new FileSystemBackedTableMetadata(this.context, this.context.getHadoopConf(), str, this.assumeDatePartitioning.booleanValue());
    }

    public List<Tuple2<String, String>> run() {
        Option<String> ofNullable = Option.ofNullable(this.startingInstantTime);
        Option<String> ofNullable2 = Option.ofNullable(this.endingInstantTime);
        if (ofNullable.isPresent() && ofNullable2.isPresent()) {
            LOG.info(String.format("Start repairing completed instants between %s and %s (inclusive)", ofNullable.get(), ofNullable2.get()));
        } else if (ofNullable.isPresent()) {
            LOG.info(String.format("Start repairing completed instants from %s (inclusive)", ofNullable.get()));
        } else if (ofNullable2.isPresent()) {
            LOG.info(String.format("Start repairing completed instants till %s (inclusive)", ofNullable2.get()));
        } else {
            LOG.info("Start repairing all completed instants");
        }
        try {
            switch (this.mode) {
                case REPAIR:
                    LOG.info(" ****** The repair tool is in REPAIR mode, dangling data and logs files not belonging to any commit are going to be DELETED from the table ******");
                    return doRepair(ofNullable, ofNullable2, false);
                case DRY_RUN:
                    LOG.info(" ****** The repair tool is in DRY_RUN mode, only LOOKING FOR dangling data and log files from the table ******");
                    return doRepair(ofNullable, ofNullable2, true);
                case UNDO:
                    return undoRepair();
                case QUERY:
                    return queryBackupPaths();
                default:
                    LOG.info("Unsupported running mode, quit the job directly");
                    return Collections.EMPTY_LIST;
            }
        } catch (IOException e) {
            throw new HoodieIOException("Unable to repair table in " + this.tableBasePath, e);
        }
    }

    List<Tuple2<String, String>> queryBackupPaths() throws IOException {
        LinkedList linkedList = new LinkedList();
        HoodieWrapperFileSystem fs = this.metaClient.getFs();
        PathFilter pathFilter = path -> {
            return path.getName().startsWith(BACKUP_DIR_PREFIX);
        };
        Path path2 = new Path(this.metaClient.getBasePath() + "/.hoodie/.cleanbackup/");
        if (!fs.exists(path2)) {
            return linkedList;
        }
        for (FileStatus fileStatus : fs.listStatus(path2, pathFilter)) {
            if (fs.exists(new Path(fileStatus.getPath(), "_SUCCESS"))) {
                linkedList.add(new Tuple2(fileStatus.getPath().toString(), ""));
            }
        }
        return linkedList;
    }

    static JavaRDD<Tuple2<String, String>> copyFiles(JavaSparkContext javaSparkContext, SerializableConfiguration serializableConfiguration, List<String> list, String str, String str2) {
        return javaSparkContext.parallelize(list).mapPartitions(it -> {
            ArrayList arrayList = new ArrayList();
            FileSystem fs = FSUtils.getFs(str2, serializableConfiguration.get());
            it.forEachRemaining(str3 -> {
                Boolean bool = false;
                Path path = new Path(str, str3);
                Path path2 = new Path(str2, str3);
                try {
                    try {
                        if (!fs.exists(path2)) {
                            FileIOUtils.copy(fs, path, path2);
                            bool = true;
                        }
                        arrayList.add(new Tuple2(str3, bool.toString()));
                    } catch (IOException e) {
                        LOG.error(String.format("Copying file fails: source [%s], destination [%s]", path, path2));
                        arrayList.add(new Tuple2(str3, bool.toString()));
                    }
                } catch (Throwable th) {
                    arrayList.add(new Tuple2(str3, bool.toString()));
                    throw th;
                }
            });
            return arrayList.iterator();
        }, true);
    }

    static List<String> listFilesFromBasePath(HoodieEngineContext hoodieEngineContext, String str, int i, int i2) {
        return (List) FSUtils.getFileStatusAtLevel(hoodieEngineContext, FSUtils.getFs(str, hoodieEngineContext.getHadoopConf().get()), new Path(str), i, i2).stream().filter(fileStatus -> {
            if (fileStatus.isFile()) {
                return FSUtils.isDataFile(fileStatus.getPath());
            }
            return false;
        }).map(fileStatus2 -> {
            return fileStatus2.getPath().toString();
        }).collect(Collectors.toList());
    }

    static List<Tuple2<String, String>> deleteFiles(JavaSparkContext javaSparkContext, SerializableConfiguration serializableConfiguration, String str, List<String> list) {
        return javaSparkContext.parallelize(list).mapPartitions(it -> {
            FileSystem fs = FSUtils.getFs(str, serializableConfiguration.get());
            ArrayList arrayList = new ArrayList();
            it.forEachRemaining(str2 -> {
                Boolean bool = false;
                try {
                    try {
                        bool = Boolean.valueOf(fs.delete(new Path(str, str2), false));
                        arrayList.add(new Tuple2(str2, bool.toString()));
                    } catch (IOException e) {
                        LOG.warn("Failed to delete file " + str2);
                        arrayList.add(new Tuple2(str2, bool.toString()));
                    }
                } catch (Throwable th) {
                    arrayList.add(new Tuple2(str2, bool.toString()));
                    throw th;
                }
            });
            return arrayList.iterator();
        }, true).collect();
    }

    List<Tuple2<String, String>> doRepair(Option<String> option, Option<String> option2, boolean z) throws IOException {
        Map<String, List<String>> tagInstantsOfBaseAndLogFiles = RepairUtils.tagInstantsOfBaseAndLogFiles(this.metaClient.getBasePath(), getBaseAndLogFilePathsFromFileSystem(this.tableMetadata, this.tableBasePath));
        List<String> list = (List) tagInstantsOfBaseAndLogFiles.keySet().stream().filter(str -> {
            return (!option.isPresent() || str.compareTo((String) option.get()) >= 0) && (!option2.isPresent() || str.compareTo((String) option2.get()) <= 0);
        }).collect(Collectors.toList());
        HoodieActiveTimeline activeTimeline = this.metaClient.getActiveTimeline();
        HoodieArchivedTimeline archivedTimeline = this.metaClient.getArchivedTimeline();
        archivedTimeline.loadCompletedInstantDetailsInMemory(list);
        List list2 = (List) this.jsc.parallelize(list).map(str2 -> {
            return new ImmutablePair(str2, RepairUtils.findInstantFilesToRemove(str2, (List) tagInstantsOfBaseAndLogFiles.get(str2), activeTimeline, archivedTimeline));
        }).collect().stream().filter(immutablePair -> {
            return !((List) immutablePair.getValue()).isEmpty();
        }).collect(Collectors.toList());
        printRepairInfo(list, list2);
        if (z) {
            return (List) list2.stream().flatMap(immutablePair2 -> {
                return ((List) immutablePair2.getValue()).stream();
            }).map(str3 -> {
                return new Tuple2(str3, "false");
            }).collect(Collectors.toList());
        }
        List<String> list3 = (List) list2.stream().flatMap(immutablePair3 -> {
            return ((List) immutablePair3.getValue()).stream();
        }).collect(Collectors.toList());
        if (list3.size() <= 0) {
            LOG.info(String.format("Table repair on %s is successful", this.tableBasePath));
            return Collections.EMPTY_LIST;
        }
        if (!backupFiles(list3)) {
            LOG.error("Error backing up dangling files. Exiting...");
            throw new IOException("Error backing up dangling files. Exiting...");
        }
        FSUtils.getFs(this.backupPathStr, this.context.getHadoopConf().get()).create(new Path(this.backupPathStr, "_SUCCESS"), true).close();
        LOG.info(String.format("Table repair on %s is successful", this.tableBasePath));
        return deleteFiles(this.jsc, this.context.getHadoopConf(), this.tableBasePath, list3);
    }

    List<Tuple2<String, String>> undoRepair() throws IOException {
        HoodieWrapperFileSystem fs = this.metaClient.getFs();
        if (this.backupPathStr == null) {
            LOG.error("Backup path is null");
            throw new IOException("backup path is null");
        }
        Path path = new Path(this.backupPathStr);
        if (!fs.exists(path)) {
            LOG.error("Cannot find backup path: " + path);
            throw new IOException("Cannot find backup path: " + path);
        }
        List<String> allPartitionPaths = this.tableMetadata.getAllPartitionPaths();
        if (allPartitionPaths.isEmpty()) {
            LOG.error("Cannot get one partition path since there is no partition available");
            return Collections.EMPTY_LIST;
        }
        return restoreFiles((List) listFilesFromBasePath(this.context, this.backupPathStr, getExpectedLevelBasedOnPartitionPath(allPartitionPaths.get(0)), this.parallelism).stream().map(str -> {
            return FSUtils.getRelativePartitionPath(new Path(this.backupPathStr), new Path(str));
        }).collect(Collectors.toList()));
    }

    int getExpectedLevelBasedOnPartitionPath(String str) {
        if (StringUtils.isNullOrEmpty(str)) {
            return 0;
        }
        return str.split("/").length;
    }

    boolean backupFiles(List<String> list) {
        List collect = copyFiles(this.jsc, this.context.getHadoopConf(), list, this.tableBasePath, this.backupPathStr).filter(tuple2 -> {
            return Boolean.valueOf(!Boolean.valueOf((String) tuple2._2()).booleanValue());
        }).collect();
        Iterator it = collect.iterator();
        while (it.hasNext()) {
            LOG.error("Backup file failed: " + ((String) ((Tuple2) it.next())._1()));
        }
        return collect.isEmpty();
    }

    List<Tuple2<String, String>> restoreFiles(List<String> list) {
        return copyFiles(this.jsc, this.context.getHadoopConf(), list, this.backupPathStr, this.tableBasePath).collect();
    }

    private static void printRepairInfo(List<String> list, List<ImmutablePair<String, List<String>>> list2) {
        int size = list2.size();
        LOG.warn("Number of instants verified based on the base and log files: " + list.size());
        LOG.warn("Instant timestamps: " + list);
        LOG.warn("Number of instants to repair: " + size);
        if (size > 0) {
            list2.forEach(immutablePair -> {
                LOG.warn("   ** Removing files: " + immutablePair.getValue());
            });
        }
    }

    private static List<Path> getBaseAndLogFilePathsFromFileSystem(HoodieTableMetadata hoodieTableMetadata, String str) throws IOException {
        return (List) hoodieTableMetadata.getAllFilesInPartitions((List) hoodieTableMetadata.getAllPartitionPaths().stream().map(str2 -> {
            return FSUtils.getPartitionPath(str, str2).toString();
        }).collect(Collectors.toList())).values().stream().map(fileStatusArr -> {
            return (List) Arrays.stream(fileStatusArr).map(fileStatus -> {
                return fileStatus.getPath();
            }).collect(Collectors.toList());
        }).flatMap(list -> {
            return list.stream();
        }).collect(Collectors.toList());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -989277060:
                if (implMethodName.equals("lambda$doRepair$90eb97f4$1")) {
                    z = 2;
                    break;
                }
                break;
            case -907647914:
                if (implMethodName.equals("lambda$copyFiles$918aee9b$1")) {
                    z = false;
                    break;
                }
                break;
            case 2060753511:
                if (implMethodName.equals("lambda$backupFiles$bc8537d0$1")) {
                    z = true;
                    break;
                }
                break;
            case 2068944137:
                if (implMethodName.equals("lambda$deleteFiles$839f5422$1")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/hudi/repair/HoodieRepairTool") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/hudi/common/config/SerializableConfiguration;Ljava/lang/String;Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    SerializableConfiguration serializableConfiguration = (SerializableConfiguration) serializedLambda.getCapturedArg(1);
                    String str2 = (String) serializedLambda.getCapturedArg(2);
                    return it -> {
                        List arrayList = new ArrayList();
                        FileSystem fs = FSUtils.getFs(str, serializableConfiguration.get());
                        it.forEachRemaining(str3 -> {
                            Boolean bool = false;
                            Path path = new Path(str2, str3);
                            Path path2 = new Path(str, str3);
                            try {
                                try {
                                    if (!fs.exists(path2)) {
                                        FileIOUtils.copy(fs, path, path2);
                                        bool = true;
                                    }
                                    arrayList.add(new Tuple2(str3, bool.toString()));
                                } catch (IOException e) {
                                    LOG.error(String.format("Copying file fails: source [%s], destination [%s]", path, path2));
                                    arrayList.add(new Tuple2(str3, bool.toString()));
                                }
                            } catch (Throwable th) {
                                arrayList.add(new Tuple2(str3, bool.toString()));
                                throw th;
                            }
                        });
                        return arrayList.iterator();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/repair/HoodieRepairTool") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Ljava/lang/Boolean;")) {
                    return tuple2 -> {
                        return Boolean.valueOf(!Boolean.valueOf((String) tuple2._2()).booleanValue());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/repair/HoodieRepairTool") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map;Lorg/apache/hudi/common/table/timeline/HoodieActiveTimeline;Lorg/apache/hudi/common/table/timeline/HoodieArchivedTimeline;Ljava/lang/String;)Lorg/apache/hudi/common/util/collection/ImmutablePair;")) {
                    Map map = (Map) serializedLambda.getCapturedArg(0);
                    HoodieActiveTimeline hoodieActiveTimeline = (HoodieActiveTimeline) serializedLambda.getCapturedArg(1);
                    HoodieArchivedTimeline hoodieArchivedTimeline = (HoodieArchivedTimeline) serializedLambda.getCapturedArg(2);
                    return str22 -> {
                        return new ImmutablePair(str22, RepairUtils.findInstantFilesToRemove(str22, (List) map.get(str22), hoodieActiveTimeline, hoodieArchivedTimeline));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/hudi/repair/HoodieRepairTool") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/hudi/common/config/SerializableConfiguration;Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    String str3 = (String) serializedLambda.getCapturedArg(0);
                    SerializableConfiguration serializableConfiguration2 = (SerializableConfiguration) serializedLambda.getCapturedArg(1);
                    return it2 -> {
                        FileSystem fs = FSUtils.getFs(str3, serializableConfiguration2.get());
                        List arrayList = new ArrayList();
                        it2.forEachRemaining(str23 -> {
                            Boolean bool = false;
                            try {
                                try {
                                    bool = Boolean.valueOf(fs.delete(new Path(str3, str23), false));
                                    arrayList.add(new Tuple2(str23, bool.toString()));
                                } catch (IOException e) {
                                    LOG.warn("Failed to delete file " + str23);
                                    arrayList.add(new Tuple2(str23, bool.toString()));
                                }
                            } catch (Throwable th) {
                                arrayList.add(new Tuple2(str23, bool.toString()));
                                throw th;
                            }
                        });
                        return arrayList.iterator();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
