package org.apache.hudi.sink.compact;

import java.io.IOException;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.Collector;
import org.apache.hudi.adapter.MaskingOutputAdapter;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.utils.MOWUtils;
import org.apache.hudi.common.model.CompactionOperationGroup;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.metrics.FlinkCompactionMetrics;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnWriteTableCompactor;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/compact/CompactOperator.class */
public class CompactOperator extends TableStreamOperator<CompactionCommitEvent> implements OneInputStreamOperator<CompactionPlanEvent, CompactionCommitEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(CompactOperator.class);
    private final Configuration conf;
    private transient HoodieFlinkWriteClient<?> writeClient;
    private final boolean asyncCompaction;
    private int taskID;
    private transient NonThrownExecutor executor;
    private transient StreamRecordCollector<CompactionCommitEvent> collector;
    private transient FlinkCompactionMetrics compactionMetrics;

    public CompactOperator(Configuration configuration) {
        this.conf = configuration;
        this.asyncCompaction = OptionsResolver.needsAsyncCompaction(configuration);
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<CompactionCommitEvent>> output) {
        super.setup(streamTask, streamConfig, new MaskingOutputAdapter(output));
    }

    public void open() throws Exception {
        this.taskID = getRuntimeContext().getIndexOfThisSubtask();
        this.writeClient = FlinkWriteClients.createWriteClient(this.conf, getRuntimeContext());
        if (this.asyncCompaction) {
            this.executor = NonThrownExecutor.builder(LOG).build();
        }
        this.collector = new StreamRecordCollector<>(this.output);
        registerMetrics();
    }

    public void processElement(StreamRecord<CompactionPlanEvent> streamRecord) throws Exception {
        CompactionPlanEvent compactionPlanEvent = (CompactionPlanEvent) streamRecord.getValue();
        String compactionInstantTime = compactionPlanEvent.getCompactionInstantTime();
        if (this.asyncCompaction) {
            this.executor.execute(() -> {
                doCompaction(compactionInstantTime, compactionPlanEvent.getOperationGroup(), this.collector, reloadWriteConfig());
            }, (str, th) -> {
                compactionPlanEvent.getOperationGroup().getCompactionOperations().forEach(compactionOperation -> {
                    this.collector.collect(new CompactionCommitEvent(compactionInstantTime, compactionOperation.getFileId(), this.taskID));
                });
            }, "Execute compaction for instant %s from task %d", compactionInstantTime, Integer.valueOf(this.taskID));
        } else {
            LOG.info("Execute compaction for instant {} from task {}", compactionInstantTime, Integer.valueOf(this.taskID));
            doCompaction(compactionInstantTime, compactionPlanEvent.getOperationGroup(), this.collector, this.writeClient.getConfig());
        }
    }

    private void doCompaction(String str, CompactionOperationGroup compactionOperationGroup, Collector<CompactionCommitEvent> collector, HoodieWriteConfig hoodieWriteConfig) throws IOException {
        this.compactionMetrics.startCompaction();
        HoodieFlinkMergeOnWriteTableCompactor hoodieFlinkMergeOnWriteTableCompactor = OptionsResolver.isMowTable(this.conf) ? new HoodieFlinkMergeOnWriteTableCompactor() : new HoodieFlinkMergeOnReadTableCompactor();
        HoodieTableMetaClient metaClient = this.writeClient.getHoodieTable().getMetaClient();
        String maxInstantTime = hoodieFlinkMergeOnWriteTableCompactor.getMaxInstantTime(metaClient);
        HoodieFlinkCopyOnWriteTable hoodieFlinkCopyOnWriteTable = new HoodieFlinkCopyOnWriteTable(hoodieWriteConfig, this.writeClient.getEngineContext(), metaClient);
        HoodieWriteConfig hoodieWriteConfig2 = hoodieWriteConfig;
        boolean isPresent = new TableSchemaResolver(hoodieFlinkCopyOnWriteTable.getMetaClient()).getTableInternalSchemaFromCommitMetadata().isPresent();
        Pair of = Pair.of(Option.empty(), Option.empty());
        if (isPresent) {
            of = InternalSchemaCache.getInternalSchemaAndAvroSchemaForClusteringAndCompaction(hoodieFlinkCopyOnWriteTable.getMetaClient(), str);
        }
        if (isPresent && ((Option) of.getLeft()).isPresent() && ((Option) of.getRight()).isPresent()) {
            hoodieWriteConfig2 = HoodieWriteConfig.newBuilder().withProperties(hoodieWriteConfig.getProps()).build();
            hoodieWriteConfig2.setSchema((String) ((Option) of.getRight()).get());
            hoodieWriteConfig2.setInternalSchemaString((String) ((Option) of.getLeft()).get());
        }
        List compact = hoodieFlinkMergeOnWriteTableCompactor.compact(hoodieFlinkCopyOnWriteTable, metaClient, hoodieWriteConfig2, compactionOperationGroup, str, maxInstantTime, this.writeClient.getHoodieTable().getTaskContextSupplier());
        this.compactionMetrics.endCompaction();
        MOWUtils.mergeDV(hoodieFlinkCopyOnWriteTable, compact, str);
        compactionOperationGroup.getCompactionOperations().forEach(compactionOperation -> {
            collector.collect(new CompactionCommitEvent(str, compactionOperation.getFileId(), compact, this.taskID));
        });
    }

    private HoodieWriteConfig reloadWriteConfig() throws Exception {
        HoodieWriteConfig config = this.writeClient.getConfig();
        CompactionUtil.setAvroSchema(config, this.writeClient.getHoodieTable().getMetaClient());
        return config;
    }

    @VisibleForTesting
    public void setExecutor(NonThrownExecutor nonThrownExecutor) {
        this.executor = nonThrownExecutor;
    }

    public void close() throws Exception {
        if (null != this.executor) {
            this.executor.close();
        }
        if (null != this.writeClient) {
            this.writeClient.close();
            this.writeClient = null;
        }
    }

    private void registerMetrics() {
        this.compactionMetrics = new FlinkCompactionMetrics(getRuntimeContext().getMetricGroup());
        this.compactionMetrics.registerMetrics();
    }
}
