package org.apache.hudi.sink.compact;

import java.io.IOException;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/compact/CompactFunction.class */
public class CompactFunction extends ProcessFunction<CompactionPlanEvent, CompactionCommitEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(CompactFunction.class);
    private final Configuration conf;
    private transient HoodieFlinkWriteClient<?> writeClient;
    private final boolean asyncCompaction;
    private int taskID;
    private transient NonThrownExecutor executor;

    public CompactFunction(Configuration configuration) {
        this.conf = configuration;
        this.asyncCompaction = StreamerUtil.needsAsyncCompaction(configuration);
    }

    public void open(Configuration configuration) throws Exception {
        this.taskID = getRuntimeContext().getIndexOfThisSubtask();
        this.writeClient = FlinkWriteClients.createWriteClient(this.conf, getRuntimeContext());
        if (this.asyncCompaction) {
            this.executor = NonThrownExecutor.builder(LOG).build();
        }
    }

    public void processElement(CompactionPlanEvent compactionPlanEvent, ProcessFunction<CompactionPlanEvent, CompactionCommitEvent>.Context context, Collector<CompactionCommitEvent> collector) throws Exception {
        String compactionInstantTime = compactionPlanEvent.getCompactionInstantTime();
        CompactionOperation operation = compactionPlanEvent.getOperation();
        if (this.asyncCompaction) {
            this.executor.execute(() -> {
                doCompaction(compactionInstantTime, operation, collector, reloadWriteConfig());
            }, (str, th) -> {
                collector.collect(new CompactionCommitEvent(compactionInstantTime, operation.getFileId(), this.taskID));
            }, "Execute compaction for instant %s from task %d", compactionInstantTime, Integer.valueOf(this.taskID));
        } else {
            LOG.info("Execute compaction for instant {} from task {}", compactionInstantTime, Integer.valueOf(this.taskID));
            doCompaction(compactionInstantTime, operation, collector, this.writeClient.getConfig());
        }
    }

    private void doCompaction(String str, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector, HoodieWriteConfig hoodieWriteConfig) throws IOException {
        HoodieFlinkCopyOnWriteTable hoodieFlinkCopyOnWriteTable = new HoodieFlinkCopyOnWriteTable(hoodieWriteConfig, this.writeClient.getEngineContext(), this.writeClient.getHoodieTable().getMetaClient());
        HoodieWriteConfig hoodieWriteConfig2 = hoodieWriteConfig;
        boolean isPresent = new TableSchemaResolver(hoodieFlinkCopyOnWriteTable.getMetaClient()).getTableInternalSchemaFromCommitMetadata().isPresent();
        Pair<Option<String>, Option<String>> of = Pair.of(Option.empty(), Option.empty());
        if (isPresent) {
            of = InternalSchemaCache.getInternalSchemaAndAvroSchemaForClusteringAndCompaction(hoodieFlinkCopyOnWriteTable.getMetaClient(), str);
        }
        if (isPresent && of.getLeft().isPresent() && of.getRight().isPresent()) {
            hoodieWriteConfig2 = HoodieWriteConfig.newBuilder().withProperties(hoodieWriteConfig.getProps()).build();
            hoodieWriteConfig2.setSchema(of.getRight().get());
            hoodieWriteConfig2.setInternalSchemaString(of.getLeft().get());
        }
        collector.collect(new CompactionCommitEvent(str, compactionOperation.getFileId(), new HoodieFlinkMergeOnReadTableCompactor().compact(hoodieFlinkCopyOnWriteTable, this.writeClient.getHoodieTable().getMetaClient(), hoodieWriteConfig2, compactionOperation, str, this.writeClient.getHoodieTable().getTaskContextSupplier()), this.taskID));
    }

    private HoodieWriteConfig reloadWriteConfig() throws Exception {
        HoodieWriteConfig config = this.writeClient.getConfig();
        CompactionUtil.setAvroSchema(config, this.writeClient.getHoodieTable().getMetaClient());
        return config;
    }

    @VisibleForTesting
    public void setExecutor(NonThrownExecutor nonThrownExecutor) {
        this.executor = nonThrownExecutor;
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
        processElement((CompactionPlanEvent) obj, (ProcessFunction<CompactionPlanEvent, CompactionCommitEvent>.Context) context, (Collector<CompactionCommitEvent>) collector);
    }
}
