package org.apache.hudi.sink.clustering;

import com.beust.jcommander.JCommander;
import java.util.List;
import java.util.ResourceBundle;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.client.deployment.application.ApplicationExecutionException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.async.HoodieAsyncTableService;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.class */
public class HoodieFlinkClusteringJob {
    protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkClusteringJob.class);
    private static final String NO_EXECUTE_KEYWORD = "no execute";
    private final AsyncClusteringService clusteringScheduleService;

    /* loaded from: input_file:org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob$AsyncClusteringService.class */
    public static class AsyncClusteringService extends HoodieAsyncTableService {
        private static final long serialVersionUID = 1;
        private final FlinkClusteringConfig cfg;
        private final Configuration conf;
        private final HoodieTableMetaClient metaClient;
        private final HoodieFlinkWriteClient<?> writeClient;
        private final HoodieFlinkTable<?> table;
        private final ExecutorService executor = Executors.newFixedThreadPool(1);

        public AsyncClusteringService(FlinkClusteringConfig flinkClusteringConfig, Configuration configuration) throws Exception {
            this.cfg = flinkClusteringConfig;
            this.conf = configuration;
            this.metaClient = StreamerUtil.createMetaClient(configuration);
            configuration.setString(FlinkOptions.TABLE_NAME, this.metaClient.getTableConfig().getTableName());
            configuration.setString(FlinkOptions.TABLE_TYPE, this.metaClient.getTableConfig().getTableType().name());
            configuration.setString(FlinkOptions.RECORD_KEY_FIELD, this.metaClient.getTableConfig().getRecordKeyFieldProp());
            configuration.setString(FlinkOptions.PARTITION_PATH_FIELD, this.metaClient.getTableConfig().getPartitionFieldProp());
            CompactionUtil.setAvroSchema(configuration, this.metaClient);
            CompactionUtil.inferMetadataConf(configuration, this.metaClient);
            this.writeClient = FlinkWriteClients.createWriteClientV2(configuration);
            this.writeConfig = this.writeClient.getConfig();
            this.table = this.writeClient.getHoodieTable();
        }

        protected Pair<CompletableFuture, ExecutorService> startService() {
            return Pair.of(CompletableFuture.supplyAsync(() -> {
                while (!isShutdownRequested()) {
                    try {
                        try {
                            cluster();
                            Thread.sleep(this.cfg.minClusteringIntervalSeconds.intValue() * 1000);
                        } catch (Exception e) {
                            LOG.error("Shutting down clustering service due to exception", e);
                            throw new HoodieException(e.getMessage(), e);
                        } catch (ApplicationExecutionException e2) {
                            if (!e2.getMessage().contains(HoodieFlinkClusteringJob.NO_EXECUTE_KEYWORD)) {
                                throw new HoodieException(e2.getMessage(), e2);
                            }
                            LOG.info("Clustering is not performed.");
                        }
                    } finally {
                        shutdownAsyncService(false);
                    }
                }
                return true;
            }, this.executor), this.executor);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void cluster() throws Exception {
            HoodieInstant hoodieInstant;
            this.table.getMetaClient().reloadActiveTimeline();
            if (this.cfg.schedule.booleanValue()) {
                ClusteringUtil.validateClusteringScheduling(this.conf);
                String createNewInstantTime = this.cfg.clusteringInstantTime != null ? this.cfg.clusteringInstantTime : HoodieActiveTimeline.createNewInstantTime();
                LOG.info("Creating a clustering plan for instant [" + createNewInstantTime + "]");
                if (!this.writeClient.scheduleClusteringAtInstant(createNewInstantTime, Option.empty())) {
                    LOG.info("No clustering plan for this job");
                    return;
                }
                this.table.getMetaClient().reloadActiveTimeline();
            }
            List pendingClusteringInstantTimes = ClusteringUtils.getPendingClusteringInstantTimes(this.table.getMetaClient());
            if (pendingClusteringInstantTimes.isEmpty()) {
                LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option");
                return;
            }
            if (this.cfg.clusteringInstantTime != null) {
                hoodieInstant = (HoodieInstant) pendingClusteringInstantTimes.stream().filter(hoodieInstant2 -> {
                    return hoodieInstant2.getTimestamp().equals(this.cfg.clusteringInstantTime);
                }).findFirst().orElseThrow(() -> {
                    return new HoodieException("Clustering instant [" + this.cfg.clusteringInstantTime + "] not found");
                });
            } else {
                hoodieInstant = CompactionUtil.isLIFO(this.cfg.clusteringSeq) ? (HoodieInstant) pendingClusteringInstantTimes.get(pendingClusteringInstantTimes.size() - 1) : (HoodieInstant) pendingClusteringInstantTimes.get(0);
            }
            HoodieInstant replaceCommitInflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(hoodieInstant.getTimestamp());
            if (this.table.getMetaClient().getActiveTimeline().containsInstant(replaceCommitInflightInstant)) {
                LOG.info("Rollback inflight clustering instant: [" + hoodieInstant + "]");
                this.table.rollbackInflightClustering(replaceCommitInflightInstant, str -> {
                    return this.writeClient.getTableServiceClient().getPendingRollbackInfo(this.table.getMetaClient(), str, false);
                });
                this.table.getMetaClient().reloadActiveTimeline();
            }
            Option clusteringPlan = ClusteringUtils.getClusteringPlan(this.table.getMetaClient(), hoodieInstant);
            if (!clusteringPlan.isPresent()) {
                LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option");
                return;
            }
            HoodieClusteringPlan hoodieClusteringPlan = (HoodieClusteringPlan) ((Pair) clusteringPlan.get()).getRight();
            if (hoodieClusteringPlan == null || hoodieClusteringPlan.getInputGroups() == null || hoodieClusteringPlan.getInputGroups().isEmpty()) {
                LOG.info("No clustering plan for instant " + hoodieInstant.getTimestamp());
                return;
            }
            HoodieInstant replaceCommitRequestedInstant = HoodieTimeline.getReplaceCommitRequestedInstant(hoodieInstant.getTimestamp());
            int size = hoodieClusteringPlan.getInputGroups().size();
            int min = this.conf.getInteger(FlinkOptions.CLUSTERING_TASKS) == -1 ? size : Math.min(this.conf.getInteger(FlinkOptions.CLUSTERING_TASKS), size);
            this.table.getActiveTimeline().transitionReplaceRequestedToInflight(replaceCommitRequestedInstant, Option.empty());
            RowType logicalType = AvroSchemaConverter.convertToDataType(StreamerUtil.getTableAvroSchema(this.table.getMetaClient(), false)).getLogicalType();
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            this.conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, executionEnvironment.getCheckpointConfig().getCheckpointTimeout());
            SingleOutputStreamOperator parallelism = executionEnvironment.addSource(new ClusteringPlanSourceFunction(hoodieInstant.getTimestamp(), hoodieClusteringPlan, this.conf)).name("clustering_source").uid("uid_clustering_source").rebalance().transform("clustering_task", TypeInformation.of(ClusteringCommitEvent.class), new ClusteringOperator(this.conf, logicalType)).setParallelism(min);
            if (OptionsResolver.sortClusteringEnabled(this.conf)) {
                ExecNodeUtil.setManagedMemoryWeight(parallelism.getTransformation(), this.conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024 * 1024);
            }
            parallelism.addSink(new ClusteringCommitSink(this.conf)).name("clustering_commit").uid("uid_clustering_commit").setParallelism(1);
            executionEnvironment.execute("flink_hudi_clustering_" + hoodieInstant.getTimestamp());
        }

        public void shutdownAsyncService(boolean z) {
            LOG.info("Gracefully shutting down clustering job. Error ?" + z);
            this.executor.shutdown();
            this.writeClient.close();
        }

        @VisibleForTesting
        public void shutDown() {
            shutdownAsyncService(false);
        }
    }

    public HoodieFlinkClusteringJob(AsyncClusteringService asyncClusteringService) {
        this.clusteringScheduleService = asyncClusteringService;
    }

    public static void main(String[] strArr) throws Exception {
        FlinkClusteringConfig flinkClusteringConfig = getFlinkClusteringConfig(strArr);
        new HoodieFlinkClusteringJob(new AsyncClusteringService(flinkClusteringConfig, FlinkClusteringConfig.toFlinkConfig(flinkClusteringConfig))).start(flinkClusteringConfig.serviceMode.booleanValue());
    }

    /* JADX WARN: Finally extract failed */
    public void start(boolean z) throws Exception {
        if (z) {
            this.clusteringScheduleService.start(null);
            try {
                try {
                    this.clusteringScheduleService.waitForShutdown();
                    LOG.info("Shut down hoodie flink clustering");
                    return;
                } catch (Exception e) {
                    throw new HoodieException(e.getMessage(), e);
                }
            } catch (Throwable th) {
                LOG.info("Shut down hoodie flink clustering");
                throw th;
            }
        }
        LOG.info("Hoodie Flink Clustering running only single round");
        try {
            try {
                this.clusteringScheduleService.cluster();
                LOG.info("Shut down hoodie flink clustering");
            } catch (Throwable th2) {
                LOG.info("Shut down hoodie flink clustering");
                throw th2;
            }
        } catch (Exception e2) {
            LOG.error("Got error running delta sync once. Shutting down", e2);
            throw e2;
        } catch (ApplicationExecutionException e3) {
            if (!e3.getMessage().contains(NO_EXECUTE_KEYWORD)) {
                LOG.error("Got error trying to perform clustering. Shutting down", e3);
                throw e3;
            }
            LOG.info("Clustering is not performed");
            LOG.info("Shut down hoodie flink clustering");
        }
    }

    public static FlinkClusteringConfig getFlinkClusteringConfig(String[] strArr) {
        FlinkClusteringConfig flinkClusteringConfig = new FlinkClusteringConfig();
        JCommander jCommander = new JCommander(flinkClusteringConfig, (ResourceBundle) null, strArr);
        if (flinkClusteringConfig.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        return flinkClusteringConfig;
    }
}
