package org.apache.hudi.utilities.streamer;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieConversionUtils;
import org.apache.hudi.HoodieWriterUtils;
import org.apache.hudi.async.AsyncClusteringService;
import org.apache.hudi.async.AsyncCompactService;
import org.apache.hudi.async.SparkAsyncClusteringService;
import org.apache.hudi.async.SparkAsyncCompactService;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.OperationConverter;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.com.beust.jcommander.Parameter;
import org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringUpdateException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.index.HoodieIndexManager;
import org.apache.hudi.org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
import org.apache.hudi.utilities.ingestion.HoodieIngestionException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.ingestion.HoodieIngestionService;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.JsonDFSSource;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/utilities/streamer/HoodieStreamer.class */
public class HoodieStreamer implements Serializable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(HoodieStreamer.class);
    private static final List<String> DEFAULT_SENSITIVE_CONFIG_KEYS = Arrays.asList(HoodieWriteConfig.SENSITIVE_CONFIG_KEYS_FILTER.defaultValue().split(","));
    private static final String SENSITIVE_VALUES_MASKED = "SENSITIVE_INFO_MASKED";
    public static final String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
    public static final String CHECKPOINT_RESET_KEY = "deltastreamer.checkpoint.reset_key";
    protected final transient Config cfg;
    private final TypedProperties properties;
    protected transient Option<HoodieIngestionService> ingestionService;
    private final Option<BootstrapExecutor> bootstrapExecutor;
    public static final String STREAMSYNC_POOL_NAME = "hoodiedeltasync";

    /* loaded from: input_file:org/apache/hudi/utilities/streamer/HoodieStreamer$Config.class */
    public static class Config implements Serializable {
        public static final String DEFAULT_DFS_SOURCE_PROPERTIES = "file://" + System.getProperty("user.dir") + "/src/test/resources/streamer-config/dfs-source.properties";

        @Parameter(names = {"--target-base-path"}, description = "base path for the target hoodie table. (Will be created if did not exist first time around. If exists, expected to be a hoodie table)", required = true)
        public String targetBasePath;

        @Parameter(names = {"--target-table"}, description = "name of the target table", required = true)
        public String targetTableName;

        @Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ (or) MERGE_ON_WRITE", required = true)
        public String tableType;

        @Parameter(names = {"--base-file-format"}, description = "File format for the base files. PARQUET (or) HFILE", required = false)
        public String baseFileFormat = "PARQUET";

        @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, referto individual classes, for supported properties. Properties in this file can be overridden by \"--hoodie-conf\"")
        public String propsFilePath = DEFAULT_DFS_SOURCE_PROPERTIES;

        @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file (using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", splitter = IdentitySplitter.class)
        public List<String> configs = new ArrayList();

        @Parameter(names = {"--source-class"}, description = "Subclass of org.apache.hudi.utilities.sources to read data. Built-in options: org.apache.hudi.utilities.sources.{JsonDFSSource (default), AvroDFSSource, JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}")
        public String sourceClassName = JsonDFSSource.class.getName();

        @Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how to break ties between records with same key in input data. Default: 'ts' holding unix timestamp of record")
        public String sourceOrderingField = "ts";

        @Parameter(names = {"--payload-class"}, description = "subclass of HoodieRecordPayload, that works off a GenericRecord. Implement your own, if you want to do something other than overwriting existing value")
        public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName();

        @Parameter(names = {"--schemaprovider-class"}, description = "subclass of org.apache.hudi.utilities.schema.SchemaProvider to attach schemas to input & target table data, built in options: org.apache.hudi.utilities.schema.FilebasedSchemaProvider.Source (See org.apache.hudi.utilities.sources.Source) implementation can implement their own SchemaProvider. For Sources that return Dataset<Row>, the schema is obtained implicitly. However, this CLI option allows overriding the schemaprovider returned by Source.")
        public String schemaProviderClassName = null;

        @Parameter(names = {"--transformer-class"}, description = "A subclass or a list of subclasses of org.apache.hudi.utilities.transform.Transformer. Allows transforming raw source Dataset to a target Dataset (conforming to target schema) before writing. Default : Not set. E:g - org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which allows a SQL query templated to be passed as a transformation function). Pass a comma-separated list of subclass names to chain the transformations. If there are two or more transformers using the same config keys and expect different values for those keys, then transformer can include an identifier. E:g - tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer. Here the identifier tr1 can be used along with property key like `hoodie.streamer.transformer.sql.tr1` to identify properties related to the transformer. So effective value for `hoodie.streamer.transformer.sql` is determined by key `hoodie.streamer.transformer.sql.tr1` for this transformer. If identifier is used, it should be specified for all the transformers. Further the order in which transformer is applied is determined by the occurrence of transformer irrespective of the identifier used for the transformer. For example: In the configured value below tr2:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer,tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer , tr2 is applied before tr1 based on order of occurrence.")
        public List<String> transformerClassNames = null;

        @Parameter(names = {"--source-limit"}, description = "Maximum amount of data to read from source. Default: No limit, e.g: DFS-Source => max bytes to read, Kafka-Source => max events to read")
        public long sourceLimit = Long.MAX_VALUE;

        @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT, BULK_INSERT, INSERT_OVERWRITE, INSERT_OVERWRITE_TABLE, DELETE_PARTITION", converter = OperationConverter.class)
        public WriteOperationType operation = WriteOperationType.UPSERT;

        @Parameter(names = {"--filter-dupes"}, description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert")
        public Boolean filterDupes = false;

        @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive")
        public Boolean enableHiveSync = false;

        @Parameter(names = {"--enable-sync"}, description = "Enable syncing meta")
        public Boolean enableMetaSync = false;

        @Parameter(names = {"--force-empty-sync"}, description = "Force syncing meta even on empty commit")
        public Boolean forceEmptyMetaSync = false;

        @Parameter(names = {"--sync-tool-classes"}, description = "Meta sync client tool, using comma to separate multi tools")
        public String syncClientToolClassNames = HiveSyncTool.class.getName();

        @Parameter(names = {"--max-pending-compactions"}, description = "Maximum number of outstanding inflight/requested compactions. Delta Sync will not happen unlessoutstanding compactions is less than this number")
        public Integer maxPendingCompactions = 5;

        @Parameter(names = {"--max-pending-clustering"}, description = "Maximum number of outstanding inflight/requested clustering. Delta Sync will not happen unlessoutstanding clustering is less than this number")
        public Integer maxPendingClustering = 5;

        @Parameter(names = {"--continuous"}, description = "Hudi Streamer runs in continuous mode running source-fetch -> Transform -> Hudi Write in loop")
        public Boolean continuousMode = false;

        @Parameter(names = {"--min-sync-interval-seconds"}, description = "the min sync interval of each sync in continuous mode")
        public Integer minSyncIntervalSeconds = 0;

        @Parameter(names = {"--spark-master"}, description = "spark master to use, if not defined inherits from your environment taking into account Spark Configuration priority rules (e.g. not using spark-submit command).")
        public String sparkMaster = "";

        @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written")
        public Boolean commitOnErrors = false;

        @Parameter(names = {"--delta-sync-scheduling-weight"}, description = "Scheduling weight for delta sync as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer deltaSyncSchedulingWeight = 1;

        @Parameter(names = {"--compact-scheduling-weight"}, description = "Scheduling weight for compaction as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer compactSchedulingWeight = 1;

        @Parameter(names = {"--delta-sync-scheduling-minshare"}, description = "Minshare for delta sync as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer deltaSyncSchedulingMinShare = 0;

        @Parameter(names = {"--compact-scheduling-minshare"}, description = "Minshare for compaction as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer compactSchedulingMinShare = 0;

        @Parameter(names = {"--disable-compaction"}, description = "Compaction is enabled for MoR table by default. This flag disables it ")
        public Boolean forceDisableCompaction = false;

        @Parameter(names = {"--checkpoint"}, description = "Resume Hudi Streamer from this checkpoint.")
        public String checkpoint = null;

        @Parameter(names = {"--initial-checkpoint-provider"}, description = "subclass of org.apache.hudi.utilities.checkpointing.InitialCheckpointProvider. Generate check point for Hudi Streamer for the first run. This field will override the checkpoint of last commit using the checkpoint field. Use this field only when switching source, for example, from DFS source to Kafka Source.")
        public String initialCheckpointProvider = null;

        @Parameter(names = {"--run-bootstrap"}, description = "Run bootstrap if bootstrap index is not found")
        public Boolean runBootstrap = false;

        @Parameter(names = {"--bootstrap-overwrite"}, description = "Overwrite existing target table, default false")
        public Boolean bootstrapOverwrite = false;

        @Parameter(names = {"--bootstrap-index-class"}, description = "subclass of BootstrapIndex")
        public String bootstrapIndexClass = HFileBootstrapIndex.class.getName();

        @Parameter(names = {"--retry-on-source-failures"}, description = "Retry on any source failures")
        public Boolean retryOnSourceFailures = false;

        @Parameter(names = {"--retry-interval-seconds"}, description = "the retry interval for source failures if --retry-on-source-failures is enabled")
        public Integer retryIntervalSecs = 30;

        @Parameter(names = {"--max-retry-count"}, description = "the max retry count if --retry-on-source-failures is enabled")
        public Integer maxRetryCount = 3;

        @Parameter(names = {"--allow-commit-on-no-checkpoint-change"}, description = "allow commits even if checkpoint has not changed before and after fetch datafrom source. This might be useful in sources like SqlSource where there is not checkpoint. And is not recommended to enable in continuous mode.")
        public Boolean allowCommitOnNoCheckpointChange = false;

        @Parameter(names = {"--help", "-h"}, help = true)
        public Boolean help = false;

        @Parameter(names = {"--retry-last-pending-inline-clustering", "-rc"}, description = "Retry last pending inline clustering plan before writing to sink.")
        public Boolean retryLastPendingInlineClusteringJob = false;

        @Parameter(names = {"--retry-last-pending-inline-compaction"}, description = "Retry last pending inline compaction plan before writing to sink.")
        public Boolean retryLastPendingInlineCompactionJob = false;

        @Parameter(names = {"--cluster-scheduling-weight"}, description = "Scheduling weight for clustering as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer clusterSchedulingWeight = 1;

        @Parameter(names = {"--cluster-scheduling-minshare"}, description = "Minshare for clustering as defined in https://spark.apache.org/docs/latest/job-scheduling.html")
        public Integer clusterSchedulingMinShare = 0;

        @Parameter(names = {"--post-write-termination-strategy-class"}, description = "Post writer termination strategy class to gracefully shutdown deltastreamer in continuous mode")
        public String postWriteTerminationStrategyClass = "";

        @Parameter(names = {"--ingestion-metrics-class"}, description = "Ingestion metrics class for reporting metrics during ingestion lifecycles.")
        public String ingestionMetricsClass = HoodieStreamerMetrics.class.getCanonicalName();

        @Parameter(names = {"--config-hot-update-strategy-class"}, description = "Configuration hot update in continuous mode")
        public String configHotUpdateStrategyClass = "";

        @Parameter(names = {"--ignore-checkpoint"}, description = "Set this config with a unique value, recommend using a timestamp value or UUID. Setting this config indicates that the subsequent sync should ignore the last committed checkpoint for the source. The config value is stored in the commit history, so setting the config with same values would not have any affect. This config can be used in scenarios like kafka topic change, where we would want to start ingesting from the latest or earliest offset after switching the topic (in this case we would want to ignore the previously committed checkpoint, and rely on other configs to pick the starting offsets).")
        public String ignoreCheckpoint = null;

        public boolean isAsyncCompactionEnabled() {
            return (!this.continuousMode.booleanValue() || this.forceDisableCompaction.booleanValue() || HoodieTableType.COPY_ON_WRITE.equals(HoodieTableType.valueOf(this.tableType))) ? false : true;
        }

        public boolean isInlineCompactionEnabled() {
            return (this.continuousMode.booleanValue() || this.forceDisableCompaction.booleanValue() || HoodieTableType.COPY_ON_WRITE.equals(HoodieTableType.valueOf(this.tableType))) ? false : true;
        }

        public static TypedProperties getProps(Configuration configuration, Config config) {
            return config.propsFilePath.isEmpty() ? UtilHelpers.buildProperties(config.configs) : UtilHelpers.readConfig(configuration, new Path(config.propsFilePath), config.configs).getProps();
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Config config = (Config) obj;
            return this.sourceLimit == config.sourceLimit && Objects.equals(this.targetBasePath, config.targetBasePath) && Objects.equals(this.targetTableName, config.targetTableName) && Objects.equals(this.tableType, config.tableType) && Objects.equals(this.baseFileFormat, config.baseFileFormat) && Objects.equals(this.propsFilePath, config.propsFilePath) && Objects.equals(this.configs, config.configs) && Objects.equals(this.sourceClassName, config.sourceClassName) && Objects.equals(this.sourceOrderingField, config.sourceOrderingField) && Objects.equals(this.payloadClassName, config.payloadClassName) && Objects.equals(this.schemaProviderClassName, config.schemaProviderClassName) && Objects.equals(this.transformerClassNames, config.transformerClassNames) && this.operation == config.operation && Objects.equals(this.filterDupes, config.filterDupes) && Objects.equals(this.enableHiveSync, config.enableHiveSync) && Objects.equals(this.enableMetaSync, config.enableMetaSync) && Objects.equals(this.forceEmptyMetaSync, config.forceEmptyMetaSync) && Objects.equals(this.syncClientToolClassNames, config.syncClientToolClassNames) && Objects.equals(this.maxPendingCompactions, config.maxPendingCompactions) && Objects.equals(this.maxPendingClustering, config.maxPendingClustering) && Objects.equals(this.continuousMode, config.continuousMode) && Objects.equals(this.minSyncIntervalSeconds, config.minSyncIntervalSeconds) && Objects.equals(this.sparkMaster, config.sparkMaster) && Objects.equals(this.commitOnErrors, config.commitOnErrors) && Objects.equals(this.deltaSyncSchedulingWeight, config.deltaSyncSchedulingWeight) && Objects.equals(this.compactSchedulingWeight, config.compactSchedulingWeight) && Objects.equals(this.clusterSchedulingWeight, config.clusterSchedulingWeight) && Objects.equals(this.deltaSyncSchedulingMinShare, config.deltaSyncSchedulingMinShare) && Objects.equals(this.compactSchedulingMinShare, config.compactSchedulingMinShare) && Objects.equals(this.clusterSchedulingMinShare, config.clusterSchedulingMinShare) && Objects.equals(this.forceDisableCompaction, config.forceDisableCompaction) && Objects.equals(this.checkpoint, config.checkpoint) && Objects.equals(this.initialCheckpointProvider, config.initialCheckpointProvider) && Objects.equals(this.ingestionMetricsClass, config.ingestionMetricsClass) && Objects.equals(this.help, config.help);
        }

        public int hashCode() {
            return Objects.hash(this.targetBasePath, this.targetTableName, this.tableType, this.baseFileFormat, this.propsFilePath, this.configs, this.sourceClassName, this.sourceOrderingField, this.payloadClassName, this.schemaProviderClassName, this.transformerClassNames, Long.valueOf(this.sourceLimit), this.operation, this.filterDupes, this.enableHiveSync, this.enableMetaSync, this.forceEmptyMetaSync, this.syncClientToolClassNames, this.maxPendingCompactions, this.maxPendingClustering, this.continuousMode, this.minSyncIntervalSeconds, this.sparkMaster, this.commitOnErrors, this.deltaSyncSchedulingWeight, this.compactSchedulingWeight, this.clusterSchedulingWeight, this.deltaSyncSchedulingMinShare, this.compactSchedulingMinShare, this.clusterSchedulingMinShare, this.forceDisableCompaction, this.checkpoint, this.initialCheckpointProvider, this.ingestionMetricsClass, this.help);
        }

        public String toString() {
            return "Config{targetBasePath='" + this.targetBasePath + "', targetTableName='" + this.targetTableName + "', tableType='" + this.tableType + "', baseFileFormat='" + this.baseFileFormat + "', propsFilePath='" + this.propsFilePath + "', configs=" + this.configs + ", sourceClassName='" + this.sourceClassName + "', sourceOrderingField='" + this.sourceOrderingField + "', payloadClassName='" + this.payloadClassName + "', schemaProviderClassName='" + this.schemaProviderClassName + "', transformerClassNames=" + this.transformerClassNames + ", sourceLimit=" + this.sourceLimit + ", operation=" + this.operation + ", filterDupes=" + this.filterDupes + ", enableHiveSync=" + this.enableHiveSync + ", enableMetaSync=" + this.enableMetaSync + ", forceEmptyMetaSync=" + this.forceEmptyMetaSync + ", syncClientToolClassNames=" + this.syncClientToolClassNames + ", maxPendingCompactions=" + this.maxPendingCompactions + ", maxPendingClustering=" + this.maxPendingClustering + ", continuousMode=" + this.continuousMode + ", minSyncIntervalSeconds=" + this.minSyncIntervalSeconds + ", sparkMaster='" + this.sparkMaster + "', commitOnErrors=" + this.commitOnErrors + ", deltaSyncSchedulingWeight=" + this.deltaSyncSchedulingWeight + ", compactSchedulingWeight=" + this.compactSchedulingWeight + ", clusterSchedulingWeight=" + this.clusterSchedulingWeight + ", deltaSyncSchedulingMinShare=" + this.deltaSyncSchedulingMinShare + ", compactSchedulingMinShare=" + this.compactSchedulingMinShare + ", clusterSchedulingMinShare=" + this.clusterSchedulingMinShare + ", forceDisableCompaction=" + this.forceDisableCompaction + ", checkpoint='" + this.checkpoint + "', initialCheckpointProvider='" + this.initialCheckpointProvider + "', ingestionMetricsClass='" + this.ingestionMetricsClass + "', help=" + this.help + '}';
        }
    }

    /* loaded from: input_file:org/apache/hudi/utilities/streamer/HoodieStreamer$StreamSyncService.class */
    public static class StreamSyncService extends HoodieIngestionService {
        private static final long serialVersionUID = 1;
        private final Config cfg;
        private transient SchemaProvider schemaProvider;
        private transient SparkSession sparkSession;
        private final transient HoodieSparkEngineContext hoodieSparkContext;
        private transient HoodieStorage storage;
        private transient Configuration hiveConf;
        TypedProperties props;
        private Option<AsyncCompactService> asyncCompactService;
        private Option<AsyncClusteringService> asyncClusteringService;
        private HoodieTableType tableType;
        private transient StreamSync streamSync;
        private final Option<PostWriteTerminationStrategy> postWriteTerminationStrategy;
        private final Option<ConfigurationHotUpdateStrategy> configurationHotUpdateStrategyOpt;

        public StreamSyncService(Config config, HoodieSparkEngineContext hoodieSparkEngineContext, FileSystem fileSystem, Configuration configuration, Option<TypedProperties> option, Option<SourceProfileSupplier> option2) throws IOException {
            super(HoodieIngestionService.HoodieIngestionConfig.newBuilder().isContinuous(config.continuousMode.booleanValue()).withMinSyncInternalSeconds(config.minSyncIntervalSeconds.intValue()).build());
            this.cfg = config;
            this.hoodieSparkContext = hoodieSparkEngineContext;
            this.storage = new HoodieHadoopStorage(fileSystem);
            this.hiveConf = configuration;
            this.sparkSession = SparkSession.builder().config(hoodieSparkEngineContext.getConf()).getOrCreate();
            this.asyncCompactService = Option.empty();
            this.asyncClusteringService = Option.empty();
            this.postWriteTerminationStrategy = StringUtils.isNullOrEmpty(config.postWriteTerminationStrategyClass) ? Option.empty() : TerminationStrategyUtils.createPostWriteTerminationStrategy(option.get(), config.postWriteTerminationStrategyClass);
            this.configurationHotUpdateStrategyOpt = StringUtils.isNullOrEmpty(config.configHotUpdateStrategyClass) ? Option.empty() : ConfigurationHotUpdateStrategyUtils.createConfigurationHotUpdateStrategy(config.configHotUpdateStrategyClass, config, option.get());
            if (this.storage.exists(new StoragePath(config.targetBasePath))) {
                try {
                    HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(this.storage.getConf().newInstance()).setBasePath(config.targetBasePath).setLoadActiveTimelineOnLoad(false).build();
                    this.tableType = build.getTableType();
                    HoodieConfig indexPropertiesFromFile = HoodieIndexManager.getIndexPropertiesFromFile(this.storage, config.targetBasePath);
                    ValidationUtils.checkArgument(this.tableType.equals(HoodieTableType.valueOf(config.tableType)), "Hoodie table is of type " + this.tableType + " but passed in CLI argument is " + config.tableType);
                    String hoodieFileFormat = build.getTableConfig().getBaseFileFormat().toString();
                    ValidationUtils.checkArgument(hoodieFileFormat.equals(config.baseFileFormat) || config.baseFileFormat == null, String.format("Hoodie table's base file format is of type %s but passed in CLI argument is %s", hoodieFileFormat, config.baseFileFormat));
                    config.baseFileFormat = hoodieFileFormat;
                    this.cfg.baseFileFormat = hoodieFileFormat;
                    HashMap hashMap = new HashMap();
                    option.get().forEach((obj, obj2) -> {
                    });
                    HoodieWriterUtils.validateTableConfig(this.sparkSession, HoodieConversionUtils.mapAsScalaImmutableMap(hashMap), build.getTableConfig(), indexPropertiesFromFile);
                } catch (HoodieIOException e) {
                    HoodieStreamer.LOG.warn("Full exception msg " + e.getLocalizedMessage() + ",  msg " + e.getMessage());
                    if (!e.getMessage().contains("Could not load Hoodie properties") || !e.getMessage().contains(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
                        throw e;
                    }
                    initializeTableTypeAndBaseFileFormat();
                }
            } else {
                initializeTableTypeAndBaseFileFormat();
            }
            ValidationUtils.checkArgument((config.filterDupes.booleanValue() && config.operation == WriteOperationType.UPSERT) ? false : true, "'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed.");
            this.props = option.get();
            HoodieStreamer.LOG.info(HoodieStreamer.toSortedTruncatedString(this.props));
            this.schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(UtilHelpers.createSchemaProvider(config.schemaProviderClassName, this.props, hoodieSparkEngineContext.jsc()), this.props, hoodieSparkEngineContext.jsc(), config.transformerClassNames);
            this.streamSync = new StreamSync(config, this.sparkSession, this.props, hoodieSparkEngineContext, fileSystem, configuration, (Function<SparkRDDWriteClient, Boolean>) this::onInitializingWriteClient, new DefaultStreamContext(this.schemaProvider, option2));
        }

        public StreamSyncService(Config config, HoodieSparkEngineContext hoodieSparkEngineContext, FileSystem fileSystem, Configuration configuration) throws IOException {
            this(config, hoodieSparkEngineContext, fileSystem, configuration, Option.empty(), Option.empty());
        }

        public StreamSyncService(Config config, HoodieSparkEngineContext hoodieSparkEngineContext, FileSystem fileSystem, Configuration configuration, Option<TypedProperties> option) throws IOException {
            this(config, hoodieSparkEngineContext, fileSystem, configuration, option, Option.empty());
        }

        private void initializeTableTypeAndBaseFileFormat() {
            this.tableType = HoodieTableType.valueOf(this.cfg.tableType);
            if (this.cfg.baseFileFormat == null) {
                this.cfg.baseFileFormat = "PARQUET";
            }
        }

        private void reInitDeltaSync() throws IOException {
            if (this.streamSync != null) {
                this.streamSync.close();
            }
            this.streamSync = new StreamSync(this.cfg, this.sparkSession, this.props, this.hoodieSparkContext, (FileSystem) this.storage.getFileSystem(), this.hiveConf, (Function<SparkRDDWriteClient, Boolean>) this::onInitializingWriteClient, new DefaultStreamContext(this.schemaProvider, Option.empty()));
        }

        @Override // org.apache.hudi.utilities.ingestion.HoodieIngestionService, org.apache.hudi.async.HoodieAsyncService
        protected Pair<CompletableFuture, ExecutorService> startService() {
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
            return Pair.of(CompletableFuture.supplyAsync(() -> {
                boolean z = false;
                if (this.cfg.isAsyncCompactionEnabled()) {
                    HoodieStreamer.LOG.info("Setting Spark Pool name for delta-sync to hoodiedeltasync");
                    this.hoodieSparkContext.setProperty(EngineProperty.DELTASYNC_POOL_NAME, "hoodiedeltasync");
                }
                HoodieClusteringConfig from = HoodieClusteringConfig.from(this.props);
                while (!isShutdownRequested()) {
                    try {
                        try {
                            long currentTimeMillis = System.currentTimeMillis();
                            this.streamSync.getMetrics().updateStreamerHeartbeatTimestamp(currentTimeMillis);
                            if (this.configurationHotUpdateStrategyOpt.isPresent()) {
                                Option<TypedProperties> updateProperties = this.configurationHotUpdateStrategyOpt.get().updateProperties(this.props);
                                if (updateProperties.isPresent()) {
                                    this.props = updateProperties.get();
                                    HoodieStreamer.LOG.info("Re-init delta sync with new config properties:");
                                    HoodieStreamer.LOG.info(HoodieStreamer.toSortedTruncatedString(this.props));
                                    reInitDeltaSync();
                                }
                            }
                            Option ofNullable = Option.ofNullable(this.streamSync.syncOnce());
                            if (ofNullable.isPresent() && ((Option) ((Pair) ofNullable.get()).getLeft()).isPresent()) {
                                HoodieStreamer.LOG.info("Enqueuing new pending compaction instant (" + ((Pair) ofNullable.get()).getLeft() + VisibilityConstants.CLOSED_PARAN);
                                this.asyncCompactService.get().enqueuePendingAsyncServiceInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, (String) ((Option) ((Pair) ofNullable.get()).getLeft()).get()));
                                this.asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(this.cfg.maxPendingCompactions.intValue());
                                if (this.asyncCompactService.get().hasError()) {
                                    throw new HoodieException("Async compaction failed.  Shutting down Delta Sync...");
                                }
                            }
                            if (from.isAsyncClusteringEnabled()) {
                                Option<String> clusteringInstantOpt = this.streamSync.getClusteringInstantOpt();
                                if (clusteringInstantOpt.isPresent()) {
                                    HoodieStreamer.LOG.info("Scheduled async clustering for instant: " + clusteringInstantOpt.get());
                                    this.asyncClusteringService.get().enqueuePendingAsyncServiceInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstantOpt.get()));
                                    this.asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(this.cfg.maxPendingClustering.intValue());
                                    if (this.asyncClusteringService.get().hasError()) {
                                        throw new HoodieException("Async clustering failed.  Shutting down Delta Sync...");
                                    }
                                }
                            }
                            if (requestShutdownIfNeeded(Option.ofNullable(ofNullable.isPresent() ? HoodieJavaRDD.of((JavaRDD) ((Pair) ofNullable.get()).getRight()) : null))) {
                                HoodieStreamer.LOG.warn("Closing and shutting down ingestion service");
                                z = true;
                                onIngestionCompletes(false);
                                shutdown(true);
                            } else {
                                sleepBeforeNextIngestion(currentTimeMillis);
                            }
                        } catch (HoodieUpsertException e) {
                            handleUpsertException(e);
                        } catch (Exception e2) {
                            HoodieStreamer.LOG.error("Shutting down delta-sync due to exception", e2);
                            throw new HoodieException(e2.getMessage(), e2);
                        }
                    } finally {
                        shutdownAsyncServices(z);
                        newFixedThreadPool.shutdownNow();
                    }
                }
                return true;
            }, newFixedThreadPool), newFixedThreadPool);
        }

        private void handleUpsertException(HoodieUpsertException hoodieUpsertException) {
            if (!(hoodieUpsertException.getCause() instanceof HoodieClusteringUpdateException)) {
                throw hoodieUpsertException;
            }
            HoodieStreamer.LOG.warn("Write rejected due to conflicts with pending clustering operation. Going to retry after 1 min with the hope that clustering will complete by then.", hoodieUpsertException);
            try {
                Thread.sleep(60000L);
            } catch (InterruptedException e) {
                throw new HoodieException("Deltastreamer interrupted while waiting for next round ", e);
            }
        }

        private void shutdownAsyncServices(boolean z) {
            HoodieStreamer.LOG.info("Delta Sync shutdown. Error ?" + z);
            if (this.asyncCompactService.isPresent()) {
                HoodieStreamer.LOG.warn("Gracefully shutting down compactor");
                this.asyncCompactService.get().shutdown(false);
            }
            if (this.asyncClusteringService.isPresent()) {
                HoodieStreamer.LOG.warn("Gracefully shutting down clustering service");
                this.asyncClusteringService.get().shutdown(false);
            }
        }

        @Override // org.apache.hudi.utilities.ingestion.HoodieIngestionService
        public void ingestOnce() {
            try {
                try {
                    this.streamSync.syncOnce();
                    close();
                } catch (IOException e) {
                    throw new HoodieIngestionException(String.format("Ingestion via %s failed with exception.", getClass()), e);
                }
            } catch (Throwable th) {
                close();
                throw th;
            }
        }

        @Override // org.apache.hudi.utilities.ingestion.HoodieIngestionService
        protected boolean requestShutdownIfNeeded(Option<HoodieData<WriteStatus>> option) {
            return this.postWriteTerminationStrategy.isPresent() && this.postWriteTerminationStrategy.get().shouldShutdown(Option.ofNullable(option.isPresent() ? HoodieJavaRDD.getJavaRDD(option.get()) : null));
        }

        protected Boolean onInitializingWriteClient(SparkRDDWriteClient sparkRDDWriteClient) {
            if (this.cfg.isAsyncCompactionEnabled()) {
                if (this.asyncCompactService.isPresent()) {
                    this.asyncCompactService.get().updateWriteClient(sparkRDDWriteClient);
                } else {
                    this.asyncCompactService = Option.ofNullable(new SparkAsyncCompactService(this.hoodieSparkContext, sparkRDDWriteClient));
                    CompactionUtils.getPendingCompactionInstantTimes(HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConfWithCopy(this.hoodieSparkContext.hadoopConfiguration())).setBasePath(this.cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build()).forEach(hoodieInstant -> {
                        this.asyncCompactService.get().enqueuePendingAsyncServiceInstant(hoodieInstant);
                    });
                    this.asyncCompactService.get().start(bool -> {
                        return true;
                    });
                    try {
                        this.asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(this.cfg.maxPendingCompactions.intValue());
                        if (this.asyncCompactService.get().hasError()) {
                            throw new HoodieException("Async compaction failed during write client initialization.");
                        }
                    } catch (InterruptedException e) {
                        throw new HoodieException(e);
                    }
                }
            }
            if (HoodieClusteringConfig.from(this.props).isAsyncClusteringEnabled()) {
                if (this.asyncClusteringService.isPresent()) {
                    this.asyncClusteringService.get().updateWriteClient(sparkRDDWriteClient);
                } else {
                    this.asyncClusteringService = Option.ofNullable(new SparkAsyncClusteringService(this.hoodieSparkContext, sparkRDDWriteClient));
                    List<HoodieInstant> pendingClusteringInstantTimes = ClusteringUtils.getPendingClusteringInstantTimes(HoodieTableMetaClient.builder().setConf(HadoopFSUtils.getStorageConfWithCopy(this.hoodieSparkContext.hadoopConfiguration())).setBasePath(this.cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build());
                    HoodieStreamer.LOG.info(String.format("Found %d pending clustering instants ", Integer.valueOf(pendingClusteringInstantTimes.size())));
                    pendingClusteringInstantTimes.forEach(hoodieInstant2 -> {
                        this.asyncClusteringService.get().enqueuePendingAsyncServiceInstant(hoodieInstant2);
                    });
                    this.asyncClusteringService.get().start(bool2 -> {
                        return true;
                    });
                    try {
                        this.asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(this.cfg.maxPendingClustering.intValue());
                        if (this.asyncClusteringService.get().hasError()) {
                            throw new HoodieException("Async clustering failed during write client initialization.");
                        }
                    } catch (InterruptedException e2) {
                        throw new HoodieException(e2);
                    }
                }
            }
            return true;
        }

        @Override // org.apache.hudi.utilities.ingestion.HoodieIngestionService
        protected boolean onIngestionCompletes(boolean z) {
            HoodieStreamer.LOG.info("Ingestion completed. Has error: " + z);
            close();
            return true;
        }

        @Override // org.apache.hudi.utilities.ingestion.HoodieIngestionService
        public Option<HoodieIngestionMetrics> getMetrics() {
            return Option.ofNullable(this.streamSync.getMetrics());
        }

        @Override // org.apache.hudi.utilities.ingestion.HoodieIngestionService
        public void close() {
            if (this.streamSync != null) {
                this.streamSync.close();
            }
        }

        public SchemaProvider getSchemaProvider() {
            return this.schemaProvider;
        }

        public SparkSession getSparkSession() {
            return this.sparkSession;
        }

        public TypedProperties getProps() {
            return this.props;
        }

        @VisibleForTesting
        public HoodieSparkEngineContext getHoodieSparkContext() {
            return this.hoodieSparkContext;
        }

        @VisibleForTesting
        public StreamSync getStreamSync() {
            return this.streamSync;
        }
    }

    public HoodieStreamer(Config config, JavaSparkContext javaSparkContext) throws IOException {
        this(config, javaSparkContext, HadoopFSUtils.getFs(config.targetBasePath, javaSparkContext.hadoopConfiguration()), javaSparkContext.hadoopConfiguration(), Option.empty());
    }

    public HoodieStreamer(Config config, JavaSparkContext javaSparkContext, Option<TypedProperties> option) throws IOException {
        this(config, javaSparkContext, HadoopFSUtils.getFs(config.targetBasePath, javaSparkContext.hadoopConfiguration()), javaSparkContext.hadoopConfiguration(), option);
    }

    public HoodieStreamer(Config config, JavaSparkContext javaSparkContext, FileSystem fileSystem, Configuration configuration) throws IOException {
        this(config, javaSparkContext, fileSystem, configuration, Option.empty());
    }

    public HoodieStreamer(Config config, JavaSparkContext javaSparkContext, FileSystem fileSystem, Configuration configuration, Option<TypedProperties> option) throws IOException {
        this(config, javaSparkContext, fileSystem, configuration, option, Option.empty());
    }

    public HoodieStreamer(Config config, JavaSparkContext javaSparkContext, FileSystem fileSystem, Configuration configuration, Option<TypedProperties> option, Option<SourceProfileSupplier> option2) throws IOException {
        this.properties = combineProperties(config, option, javaSparkContext.hadoopConfiguration());
        if (config.initialCheckpointProvider != null && config.checkpoint == null) {
            InitialCheckPointProvider createInitialCheckpointProvider = UtilHelpers.createInitialCheckpointProvider(config.initialCheckpointProvider, this.properties);
            createInitialCheckpointProvider.init(configuration);
            config.checkpoint = createInitialCheckpointProvider.getCheckpoint();
        }
        this.cfg = config;
        this.bootstrapExecutor = Option.ofNullable(config.runBootstrap.booleanValue() ? new BootstrapExecutor(config, javaSparkContext, fileSystem, configuration, this.properties) : null);
        this.ingestionService = Option.ofNullable(config.runBootstrap.booleanValue() ? null : new StreamSyncService(config, new HoodieSparkEngineContext(javaSparkContext), fileSystem, configuration, Option.ofNullable(this.properties), option2));
    }

    private static TypedProperties combineProperties(Config config, Option<TypedProperties> option, Configuration configuration) {
        HoodieConfig hoodieConfig = new HoodieConfig();
        if (option.isPresent()) {
            hoodieConfig.setAll(option.get());
        } else if (config.propsFilePath.equals(Config.DEFAULT_DFS_SOURCE_PROPERTIES)) {
            hoodieConfig.setAll(UtilHelpers.getConfig(config.configs).getProps());
        } else {
            hoodieConfig.setAll(UtilHelpers.readConfig(configuration, new Path(config.propsFilePath), config.configs).getProps());
        }
        hoodieConfig.setDefaultValue(DataSourceWriteOptions.RECONCILE_SCHEMA());
        hoodieConfig.setValue(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "true");
        if (config.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
            hoodieConfig.setValue(HoodieTableConfig.TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
        }
        return hoodieConfig.getProps(true);
    }

    public void shutdownGracefully() {
        this.ingestionService.ifPresent(hoodieIngestionService -> {
            LOG.info("Shutting down DeltaStreamer");
            hoodieIngestionService.shutdown(false);
            LOG.info("Async service shutdown complete. Closing DeltaSync ");
            hoodieIngestionService.close();
        });
    }

    public void sync() throws Exception {
        if (!this.bootstrapExecutor.isPresent()) {
            this.ingestionService.ifPresent((v0) -> {
                v0.startIngestion();
            });
        } else {
            LOG.info("Performing bootstrap. Source=" + this.bootstrapExecutor.get().getBootstrapConfig().getBootstrapSourceBasePath());
            this.bootstrapExecutor.get().execute();
        }
    }

    public Config getConfig() {
        return this.cfg;
    }

    public static String toSortedTruncatedString(TypedProperties typedProperties) {
        List<String> stringList = typedProperties.getStringList(HoodieWriteConfig.SENSITIVE_CONFIG_KEYS_FILTER.key(), ",", DEFAULT_SENSITIVE_CONFIG_KEYS);
        ArrayList<String> arrayList = new ArrayList();
        Iterator it = typedProperties.keySet().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().toString());
        }
        Collections.sort(arrayList);
        StringBuilder sb = new StringBuilder("Creating Hudi Streamer with configs:\n");
        for (String str : arrayList) {
            String obj = Option.ofNullable(typedProperties.get(str)).orElse("").toString();
            if (obj.length() > 255 && !LOG.isDebugEnabled()) {
                obj = obj.substring(0, 128) + "[...]";
            }
            Stream<String> stream = stringList.stream();
            str.getClass();
            if (stream.anyMatch((v1) -> {
                return r1.contains(v1);
            })) {
                obj = SENSITIVE_VALUES_MASKED;
            }
            sb.append(str).append(": ").append(obj).append("\n");
        }
        return sb.toString();
    }

    public static final Config getConfig(String[] strArr) {
        Config config = new Config();
        JCommander jCommander = new JCommander(config, null, strArr);
        if (config.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        return config;
    }

    public static void main(String[] strArr) throws Exception {
        Config config = getConfig(strArr);
        Map<String, String> sparkSchedulingConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(config);
        JavaSparkContext buildSparkContext = StringUtils.isNullOrEmpty(config.sparkMaster) ? UtilHelpers.buildSparkContext("streamer-" + config.targetTableName, sparkSchedulingConfigs) : UtilHelpers.buildSparkContext("streamer-" + config.targetTableName, config.sparkMaster, sparkSchedulingConfigs);
        if (config.enableHiveSync.booleanValue()) {
            LOG.warn("--enable-hive-sync will be deprecated in a future release; please use --enable-sync instead for Hive syncing");
        }
        try {
            new HoodieStreamer(config, buildSparkContext).sync();
            buildSparkContext.stop();
        } catch (Throwable th) {
            buildSparkContext.stop();
            throw th;
        }
    }

    @VisibleForTesting
    public HoodieIngestionService getIngestionService() {
        return this.ingestionService.get();
    }
}
