package org.apache.hudi.utilities;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;

/* loaded from: input_file:org/apache/hudi/utilities/HoodieClusteringJob.class */
public class HoodieClusteringJob {
    private static final Logger LOG = LogManager.getLogger(HoodieClusteringJob.class);
    private final Config cfg;
    private transient FileSystem fs;
    private TypedProperties props;
    private final JavaSparkContext jsc;

    /* loaded from: input_file:org/apache/hudi/utilities/HoodieClusteringJob$Config.class */
    public static class Config implements Serializable {

        @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
        public String basePath = null;

        @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
        public String tableName = null;

        @Parameter(names = {"--instant-time", "-it"}, description = "Clustering Instant time, only need when cluster. And schedule clustering can generate it.", required = false)
        public String clusteringInstantTime = null;

        @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = false)
        public int parallelism = 1;

        @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false)
        public String sparkMaster = null;

        @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
        public String sparkMemory = null;

        @Parameter(names = {"--retry", "-rt"}, description = "number of retries", required = false)
        public int retry = 0;

        @Parameter(names = {"--schedule", "-sc"}, description = "Schedule clustering")
        public Boolean runSchedule = false;

        @Parameter(names = {"--help", "-h"}, help = true)
        public Boolean help = false;

        @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for hoodie client for clustering")
        public String propsFilePath = null;

        @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter = IdentitySplitter.class)
        public List<String> configs = new ArrayList();
    }

    public HoodieClusteringJob(JavaSparkContext javaSparkContext, Config config) {
        this.cfg = config;
        this.jsc = javaSparkContext;
        this.props = config.propsFilePath == null ? UtilHelpers.buildProperties(config.configs) : readConfigFromFileSystem(javaSparkContext, config);
    }

    private TypedProperties readConfigFromFileSystem(JavaSparkContext javaSparkContext, Config config) {
        return UtilHelpers.readConfig(FSUtils.getFs(config.basePath, javaSparkContext.hadoopConfiguration()), new Path(config.propsFilePath), config.configs).getConfig();
    }

    public static void main(String[] strArr) {
        Config config = new Config();
        JCommander jCommander = new JCommander(config, null, strArr);
        if (config.help.booleanValue() || strArr.length == 0 || (!config.runSchedule.booleanValue() && config.clusteringInstantTime == null)) {
            jCommander.usage();
            System.exit(1);
        }
        JavaSparkContext buildSparkContext = UtilHelpers.buildSparkContext("clustering-" + config.tableName, config.sparkMaster, config.sparkMemory);
        int cluster = new HoodieClusteringJob(buildSparkContext, config).cluster(config.retry);
        String format = String.format("Clustering with basePath: %s, tableName: %s, runSchedule: %s", config.basePath, config.tableName, config.runSchedule);
        if (cluster == -1) {
            LOG.error(format + " failed");
        } else {
            LOG.info(format + " success");
        }
        buildSparkContext.stop();
    }

    public int cluster(int i) {
        this.fs = FSUtils.getFs(this.cfg.basePath, this.jsc.hadoopConfiguration());
        return UtilHelpers.retry(i, () -> {
            if (!this.cfg.runSchedule.booleanValue()) {
                LOG.info("Do cluster");
                return Integer.valueOf(doCluster(this.jsc));
            }
            LOG.info("Do schedule");
            Option<String> doSchedule = doSchedule(this.jsc);
            int i2 = doSchedule.isPresent() ? 0 : -1;
            if (i2 == 0) {
                LOG.info("The schedule instant time is " + doSchedule.get());
            }
            return Integer.valueOf(i2);
        }, "Cluster failed");
    }

    private String getSchemaFromLatestInstant() throws Exception {
        HoodieTableMetaClient hoodieTableMetaClient = new HoodieTableMetaClient(this.jsc.hadoopConfiguration(), this.cfg.basePath, true);
        TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(hoodieTableMetaClient);
        if (hoodieTableMetaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0) {
            throw new HoodieException("Cannot run clustering without any completed commits");
        }
        return tableSchemaResolver.getTableAvroSchema(false).toString();
    }

    private int doCluster(JavaSparkContext javaSparkContext) throws Exception {
        return UtilHelpers.handleErrors(javaSparkContext, this.cfg.clusteringInstantTime, UtilHelpers.createHoodieClient(javaSparkContext, this.cfg.basePath, getSchemaFromLatestInstant(), this.cfg.parallelism, Option.empty(), this.props).cluster(this.cfg.clusteringInstantTime, true).getWriteStatuses());
    }

    public Option<String> doSchedule() throws Exception {
        return doSchedule(this.jsc);
    }

    private Option<String> doSchedule(JavaSparkContext javaSparkContext) throws Exception {
        return UtilHelpers.createHoodieClient(javaSparkContext, this.cfg.basePath, getSchemaFromLatestInstant(), this.cfg.parallelism, Option.empty(), this.props).scheduleClustering(Option.empty());
    }
}
