package org.apache.hudi.sink.compact;

import com.beust.jcommander.JCommander;
import java.util.ResourceBundle;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.hudi.async.HoodieAsyncTableService;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/compact/HoodieFlinkCompactor.class */
public class HoodieFlinkCompactor {
    protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class);
    private final AsyncCompactionService compactionScheduleService;

    /* loaded from: input_file:org/apache/hudi/sink/compact/HoodieFlinkCompactor$AsyncCompactionService.class */
    public static class AsyncCompactionService extends HoodieAsyncTableService {
        private static final long serialVersionUID = 1;
        private final FlinkCompactionConfig cfg;
        private final Configuration conf;
        private final HoodieTableMetaClient metaClient;
        private final HoodieFlinkWriteClient<?> writeClient;
        private final HoodieFlinkTable<?> table;
        private final StreamExecutionEnvironment env;
        private final ExecutorService executor = Executors.newFixedThreadPool(1);

        public AsyncCompactionService(FlinkCompactionConfig flinkCompactionConfig, Configuration configuration, StreamExecutionEnvironment streamExecutionEnvironment) throws Exception {
            this.cfg = flinkCompactionConfig;
            this.conf = configuration;
            this.env = streamExecutionEnvironment;
            this.metaClient = StreamerUtil.createMetaClient(configuration);
            configuration.setString(FlinkOptions.TABLE_NAME, this.metaClient.getTableConfig().getTableName());
            CompactionUtil.setAvroSchema(configuration, this.metaClient);
            CompactionUtil.inferChangelogMode(configuration, this.metaClient);
            this.writeClient = StreamerUtil.createWriteClient(configuration);
            this.writeConfig = this.writeClient.getConfig();
            this.table = this.writeClient.getHoodieTable();
        }

        protected Pair<CompletableFuture, ExecutorService> startService() {
            return Pair.of(CompletableFuture.supplyAsync(() -> {
                boolean z = false;
                while (!isShutdownRequested()) {
                    try {
                        try {
                            compact();
                            Thread.sleep(this.cfg.minCompactionIntervalSeconds.intValue() * 1000);
                        } catch (Exception e) {
                            LOG.error("Shutting down compaction service due to exception", e);
                            z = true;
                            throw new HoodieException(e.getMessage(), e);
                        }
                    } catch (Throwable th) {
                        shutdownAsyncService(z);
                        throw th;
                    }
                }
                shutdownAsyncService(false);
                return true;
            }, this.executor), this.executor);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void compact() throws Exception {
            this.table.getMetaClient().reloadActiveTimeline();
            if (this.cfg.schedule.booleanValue()) {
                Option<String> compactionInstantTime = CompactionUtil.getCompactionInstantTime(this.metaClient);
                if (compactionInstantTime.isPresent()) {
                    if (!this.writeClient.scheduleCompactionAtInstant((String) compactionInstantTime.get(), Option.empty())) {
                        LOG.info("No compaction plan for this job ");
                        return;
                    }
                    this.table.getMetaClient().reloadActiveTimeline();
                }
            }
            HoodieTimeline filterPendingCompactionTimeline = this.table.getActiveTimeline().filterPendingCompactionTimeline();
            Option lastInstant = CompactionUtil.isLIFO(this.cfg.compactionSeq) ? filterPendingCompactionTimeline.lastInstant() : filterPendingCompactionTimeline.firstInstant();
            if (!lastInstant.isPresent()) {
                LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option");
                return;
            }
            String timestamp = ((HoodieInstant) lastInstant.get()).getTimestamp();
            HoodieInstant compactionInflightInstant = HoodieTimeline.getCompactionInflightInstant(timestamp);
            if (filterPendingCompactionTimeline.containsInstant(compactionInflightInstant)) {
                LOG.info("Rollback inflight compaction instant: [" + timestamp + "]");
                this.table.rollbackInflightCompaction(compactionInflightInstant);
                this.table.getMetaClient().reloadActiveTimeline();
            }
            HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(this.table.getMetaClient(), timestamp);
            if (compactionPlan == null || compactionPlan.getOperations() == null || compactionPlan.getOperations().isEmpty()) {
                LOG.info("No compaction plan for instant " + timestamp);
                return;
            }
            HoodieInstant compactionRequestedInstant = HoodieTimeline.getCompactionRequestedInstant(timestamp);
            if (!this.table.getActiveTimeline().filterPendingCompactionTimeline().containsInstant(compactionRequestedInstant)) {
                LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\nClean the compaction plan in auxiliary path and cancels the compaction");
                CompactionUtil.cleanInstant(this.table.getMetaClient(), compactionRequestedInstant);
                return;
            }
            int size = this.conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1 ? compactionPlan.getOperations().size() : this.conf.getInteger(FlinkOptions.COMPACTION_TASKS);
            LOG.info("Start to compaction for instant " + timestamp);
            this.table.getActiveTimeline().transitionCompactionRequestedToInflight(compactionRequestedInstant);
            this.table.getMetaClient().reloadActiveTimeline();
            this.env.addSource(new CompactionPlanSourceFunction(compactionPlan, timestamp)).name("compaction_source").uid("uid_compaction_source").rebalance().transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), new ProcessOperator(new CompactFunction(this.conf))).setParallelism(size).addSink(new CompactionCommitSink(this.conf)).name("clean_commits").uid("uid_clean_commits").setParallelism(1);
            this.env.execute("flink_hudi_compaction_" + timestamp);
        }

        public void shutdownAsyncService(boolean z) {
            LOG.info("Gracefully shutting down compactor. Error ?" + z);
            this.executor.shutdown();
            this.writeClient.close();
        }

        @VisibleForTesting
        public void shutDown() {
            shutdownAsyncService(false);
        }
    }

    public HoodieFlinkCompactor(AsyncCompactionService asyncCompactionService) {
        this.compactionScheduleService = asyncCompactionService;
    }

    public static void main(String[] strArr) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkCompactionConfig flinkCompactionConfig = getFlinkCompactionConfig(strArr);
        new HoodieFlinkCompactor(new AsyncCompactionService(flinkCompactionConfig, FlinkCompactionConfig.toFlinkConfig(flinkCompactionConfig), executionEnvironment)).start(flinkCompactionConfig.serviceMode.booleanValue());
    }

    /* JADX WARN: Finally extract failed */
    public void start(boolean z) throws Exception {
        if (z) {
            this.compactionScheduleService.start(null);
            try {
                try {
                    this.compactionScheduleService.waitForShutdown();
                    LOG.info("Shut down hoodie flink compactor");
                    return;
                } catch (Exception e) {
                    throw new HoodieException(e.getMessage(), e);
                }
            } catch (Throwable th) {
                throw th;
            }
        }
        LOG.info("Hoodie Flink Compactor running only single round");
        try {
            try {
                this.compactionScheduleService.compact();
                LOG.info("Shut down hoodie flink compactor");
            } catch (Exception e2) {
                LOG.error("Got error running delta sync once. Shutting down", e2);
                throw e2;
            }
        } finally {
            LOG.info("Shut down hoodie flink compactor");
        }
    }

    public static FlinkCompactionConfig getFlinkCompactionConfig(String[] strArr) {
        FlinkCompactionConfig flinkCompactionConfig = new FlinkCompactionConfig();
        JCommander jCommander = new JCommander(flinkCompactionConfig, (ResourceBundle) null, strArr);
        if (flinkCompactionConfig.help.booleanValue() || strArr.length == 0) {
            jCommander.usage();
            System.exit(1);
        }
        return flinkCompactionConfig;
    }
}
