package org.apache.hudi.utilities;

import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.com.beust.jcommander.IValueValidator;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.com.beust.jcommander.ParameterException;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.utilities.exception.HoodieSnapshotExporterException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import scala.Tuple2;
import scala.collection.JavaConversions;

/* loaded from: input_file:org/apache/hudi/utilities/HoodieSnapshotExporter.class */
public class HoodieSnapshotExporter {
    private static final Logger LOG = LogManager.getLogger(HoodieSnapshotExporter.class);

    /* loaded from: input_file:org/apache/hudi/utilities/HoodieSnapshotExporter$Config.class */
    public static class Config implements Serializable {

        @Parameter(names = {"--source-base-path"}, description = "Base path for the source Hudi dataset to be snapshotted", required = true)
        public String sourceBasePath;

        @Parameter(names = {"--target-output-path"}, description = "Base path for the target output files (snapshots)", required = true)
        public String targetOutputPath;

        @Parameter(names = {"--output-format"}, description = "Output format for the exported dataset; accept these values: json|parquet|hudi", required = true, validateValueWith = {OutputFormatValidator.class})
        public String outputFormat;

        @Parameter(names = {"--output-partition-field"}, description = "A field to be used by Spark repartitioning")
        public String outputPartitionField = null;

        @Parameter(names = {"--output-partitioner"}, description = "A class to facilitate custom repartitioning")
        public String outputPartitioner = null;
    }

    /* loaded from: input_file:org/apache/hudi/utilities/HoodieSnapshotExporter$OutputFormatValidator.class */
    public static class OutputFormatValidator implements IValueValidator<String> {
        public static final String HUDI = "hudi";
        public static final List<String> FORMATS = CollectionUtils.createImmutableList("json", "parquet", HUDI);

        @Override // org.apache.hudi.com.beust.jcommander.IValueValidator
        public void validate(String str, String str2) {
            if (str2 == null || !FORMATS.contains(str2)) {
                throw new ParameterException(String.format("Invalid output format: value:%s: supported formats:%s", str2, FORMATS));
            }
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:org/apache/hudi/utilities/HoodieSnapshotExporter$Partitioner.class */
    public interface Partitioner {
        DataFrameWriter<Row> partition(Dataset<Row> dataset);
    }

    public void export(JavaSparkContext javaSparkContext, Config config) throws IOException {
        FileSystem fs = FSUtils.getFs(config.sourceBasePath, javaSparkContext.hadoopConfiguration());
        HoodieSparkEngineContext hoodieSparkEngineContext = new HoodieSparkEngineContext(javaSparkContext);
        if (outputPathExists(fs, config)) {
            throw new HoodieSnapshotExporterException("The target output path already exists.");
        }
        String orElseThrow = getLatestCommitTimestamp(fs, config).orElseThrow(() -> {
            throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot.");
        });
        LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.", orElseThrow));
        List<String> partitions = getPartitions(hoodieSparkEngineContext, config);
        if (partitions.isEmpty()) {
            throw new HoodieSnapshotExporterException("The source dataset has 0 partition to snapshot.");
        }
        LOG.info(String.format("The job needs to export %d partitions.", Integer.valueOf(partitions.size())));
        if (config.outputFormat.equals(OutputFormatValidator.HUDI)) {
            exportAsHudi(javaSparkContext, config, partitions, orElseThrow);
        } else {
            exportAsNonHudi(javaSparkContext, config, partitions, orElseThrow);
        }
        createSuccessTag(fs, config);
    }

    private boolean outputPathExists(FileSystem fileSystem, Config config) throws IOException {
        return fileSystem.exists(new Path(config.targetOutputPath));
    }

    private Option<String> getLatestCommitTimestamp(FileSystem fileSystem, Config config) {
        Option<HoodieInstant> lastInstant = HoodieTableMetaClient.builder().setConf(fileSystem.getConf()).setBasePath(config.sourceBasePath).build().getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant();
        return lastInstant.isPresent() ? Option.of(lastInstant.get().getTimestamp()) : Option.empty();
    }

    private List<String> getPartitions(HoodieEngineContext hoodieEngineContext, Config config) {
        return FSUtils.getAllPartitionPaths(hoodieEngineContext, config.sourceBasePath, true, false, false);
    }

    private void createSuccessTag(FileSystem fileSystem, Config config) throws IOException {
        Path path = new Path(config.targetOutputPath + "/_SUCCESS");
        if (fileSystem.exists(path)) {
            return;
        }
        LOG.info(String.format("Creating _SUCCESS under target output path: %s", config.targetOutputPath));
        fileSystem.createNewFile(path);
    }

    private void exportAsNonHudi(JavaSparkContext javaSparkContext, Config config, List<String> list, String str) {
        Partitioner partitioner = StringUtils.isNullOrEmpty(config.outputPartitioner) ? dataset -> {
            Dataset drop = dataset.drop(JavaConversions.asScalaIterator(HoodieRecord.HOODIE_META_COLUMNS.iterator()).toSeq());
            return StringUtils.isNullOrEmpty(config.outputPartitionField) ? drop.write() : drop.repartition(new Column[]{new Column(config.outputPartitionField)}).write().partitionBy(new String[]{config.outputPartitionField});
        } : (Partitioner) ReflectionUtils.loadClass(config.outputPartitioner);
        new HoodieSparkEngineContext(javaSparkContext).setJobStatus(getClass().getSimpleName(), "Exporting as non-HUDI dataset");
        TableFileSystemView.BaseFileOnlyView baseFileOnlyView = getBaseFileOnlyView(javaSparkContext, config);
        partitioner.partition(new SQLContext(javaSparkContext).read().parquet(JavaConversions.asScalaIterator(javaSparkContext.parallelize(list, list.size()).flatMap(str2 -> {
            return baseFileOnlyView.getLatestBaseFilesBeforeOrOn(str2, str).map((v0) -> {
                return v0.getPath();
            }).iterator();
        }).toLocalIterator()).toSeq())).format(config.outputFormat).mode(SaveMode.Overwrite).save(config.targetOutputPath);
    }

    private void exportAsHudi(JavaSparkContext javaSparkContext, Config config, List<String> list, String str) throws IOException {
        TableFileSystemView.BaseFileOnlyView baseFileOnlyView = getBaseFileOnlyView(javaSparkContext, config);
        HoodieSparkEngineContext hoodieSparkEngineContext = new HoodieSparkEngineContext(javaSparkContext);
        SerializableConfiguration hadoopConf = hoodieSparkEngineContext.getHadoopConf();
        hoodieSparkEngineContext.setJobStatus(getClass().getSimpleName(), "Exporting as HUDI dataset");
        List flatMap = hoodieSparkEngineContext.flatMap(list, str2 -> {
            ArrayList arrayList = new ArrayList();
            baseFileOnlyView.getLatestBaseFilesBeforeOrOn(str2, str).forEach(hoodieBaseFile -> {
                arrayList.add(new Tuple2(str2, hoodieBaseFile.getPath()));
            });
            Path path = new Path(FSUtils.getPartitionPath(config.sourceBasePath, str2), HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
            if (FSUtils.getFs(config.sourceBasePath, hadoopConf.newCopy()).exists(path)) {
                arrayList.add(new Tuple2(str2, path.toString()));
            }
            return arrayList.stream();
        }, list.size());
        hoodieSparkEngineContext.foreach(flatMap, tuple2 -> {
            String str3 = (String) tuple2._1();
            Path path = new Path((String) tuple2._2());
            Path partitionPath = FSUtils.getPartitionPath(config.targetOutputPath, str3);
            FileSystem fs = FSUtils.getFs(config.targetOutputPath, hadoopConf.newCopy());
            if (!fs.exists(partitionPath)) {
                fs.mkdirs(partitionPath);
            }
            FileUtil.copy(fs, path, fs, new Path(partitionPath, path.getName()), false, fs.getConf());
        }, flatMap.size());
        LOG.info(String.format("Copying .commit files which are no-late-than %s.", str));
        FileSystem fs = FSUtils.getFs(config.sourceBasePath, javaSparkContext.hadoopConfiguration());
        for (FileStatus fileStatus : fs.listStatus(new Path(config.sourceBasePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME), path -> {
            if (path.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
                return true;
            }
            return HoodieTimeline.compareTimestamps(FSUtils.getCommitFromCommitFile(path.getName()), HoodieTimeline.LESSER_THAN_OR_EQUALS, str);
        })) {
            Path path2 = new Path(config.targetOutputPath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + fileStatus.getPath().getName());
            if (!fs.exists(path2.getParent())) {
                fs.mkdirs(path2.getParent());
            }
            if (fs.exists(path2)) {
                LOG.error(String.format("The target output commit file (%s targetBasePath) already exists.", path2));
            }
            FileUtil.copy(fs, fileStatus.getPath(), fs, path2, false, fs.getConf());
        }
    }

    private TableFileSystemView.BaseFileOnlyView getBaseFileOnlyView(JavaSparkContext javaSparkContext, Config config) {
        HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(FSUtils.getFs(config.sourceBasePath, javaSparkContext.hadoopConfiguration()).getConf()).setBasePath(config.sourceBasePath).build();
        return new HoodieTableFileSystemView(build, build.getActiveTimeline().getWriteTimeline().filterCompletedInstants());
    }

    public static void main(String[] strArr) throws IOException {
        Config config = new Config();
        new JCommander(config, null, strArr);
        SparkConf appName = new SparkConf().setAppName("Hoodie-snapshot-exporter");
        appName.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        JavaSparkContext javaSparkContext = new JavaSparkContext(appName);
        LOG.info("Initializing spark job.");
        try {
            new HoodieSnapshotExporter().export(javaSparkContext, config);
            javaSparkContext.stop();
        } catch (Throwable th) {
            javaSparkContext.stop();
            throw th;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1831599588:
                if (implMethodName.equals("lambda$exportAsNonHudi$a863234f$1")) {
                    z = true;
                    break;
                }
                break;
            case -157810015:
                if (implMethodName.equals("lambda$exportAsHudi$70bfaf25$1")) {
                    z = 2;
                    break;
                }
                break;
            case 123641358:
                if (implMethodName.equals("lambda$exportAsHudi$c86d6d34$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/hudi/common/function/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/HoodieSnapshotExporter") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/common/table/view/TableFileSystemView$BaseFileOnlyView;Ljava/lang/String;Lorg/apache/hudi/utilities/HoodieSnapshotExporter$Config;Lorg/apache/hudi/common/config/SerializableConfiguration;Ljava/lang/String;)Ljava/util/stream/Stream;")) {
                    TableFileSystemView.BaseFileOnlyView baseFileOnlyView = (TableFileSystemView.BaseFileOnlyView) serializedLambda.getCapturedArg(0);
                    String str = (String) serializedLambda.getCapturedArg(1);
                    Config config = (Config) serializedLambda.getCapturedArg(2);
                    SerializableConfiguration serializableConfiguration = (SerializableConfiguration) serializedLambda.getCapturedArg(3);
                    return str2 -> {
                        List arrayList = new ArrayList();
                        baseFileOnlyView.getLatestBaseFilesBeforeOrOn(str2, str).forEach(hoodieBaseFile -> {
                            arrayList.add(new Tuple2(str2, hoodieBaseFile.getPath()));
                        });
                        Path path = new Path(FSUtils.getPartitionPath(config.sourceBasePath, str2), HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
                        if (FSUtils.getFs(config.sourceBasePath, serializableConfiguration.newCopy()).exists(path)) {
                            arrayList.add(new Tuple2(str2, path.toString()));
                        }
                        return arrayList.stream();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/HoodieSnapshotExporter") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/common/table/view/TableFileSystemView$BaseFileOnlyView;Ljava/lang/String;Ljava/lang/String;)Ljava/util/Iterator;")) {
                    TableFileSystemView.BaseFileOnlyView baseFileOnlyView2 = (TableFileSystemView.BaseFileOnlyView) serializedLambda.getCapturedArg(0);
                    String str3 = (String) serializedLambda.getCapturedArg(1);
                    return str22 -> {
                        return baseFileOnlyView2.getLatestBaseFilesBeforeOrOn(str22, str3).map((v0) -> {
                            return v0.getPath();
                        }).iterator();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/hudi/common/function/SerializableConsumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/apache/hudi/utilities/HoodieSnapshotExporter") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/utilities/HoodieSnapshotExporter$Config;Lorg/apache/hudi/common/config/SerializableConfiguration;Lscala/Tuple2;)V")) {
                    Config config2 = (Config) serializedLambda.getCapturedArg(0);
                    SerializableConfiguration serializableConfiguration2 = (SerializableConfiguration) serializedLambda.getCapturedArg(1);
                    return tuple2 -> {
                        String str32 = (String) tuple2._1();
                        Path path = new Path((String) tuple2._2());
                        Path partitionPath = FSUtils.getPartitionPath(config2.targetOutputPath, str32);
                        FileSystem fs = FSUtils.getFs(config2.targetOutputPath, serializableConfiguration2.newCopy());
                        if (!fs.exists(partitionPath)) {
                            fs.mkdirs(partitionPath);
                        }
                        FileUtil.copy(fs, path, fs, new Path(partitionPath, path.getName()), false, fs.getConf());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
