package org.apache.hudi.spark3.internal;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.deletionvector.DeletionVectorWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.spark.sql.MowUpsertExec;
import org.apache.spark.sql.MowUpsertExec$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.sketch.BloomFilter;
import org.apache.spark.util.sketch.IncompatibleMergeException;
import scala.collection.JavaConverters;

/* loaded from: input_file:org/apache/hudi/spark3/internal/HoodieMOWDataSourceInternalBatchWrite.class */
public class HoodieMOWDataSourceInternalBatchWrite extends HoodieDataSourceInternalBatchWrite {
    private final SparkSession spark;
    private final HoodieWriteConfig writeConfig;
    private final StructType structType;
    private final MowUpsertExec mowUpsertExec;
    private BloomFilter filter;
    private boolean forceDisableBloomFilter;

    public HoodieMOWDataSourceInternalBatchWrite(String str, HoodieWriteConfig hoodieWriteConfig, StructType structType, SparkSession sparkSession, StorageConfiguration<?> storageConfiguration, Map<String, String> map, boolean z, boolean z2) {
        super(str, hoodieWriteConfig, structType, sparkSession, storageConfiguration, map, z, z2);
        this.filter = null;
        this.forceDisableBloomFilter = false;
        this.writeConfig = hoodieWriteConfig;
        this.spark = sparkSession;
        this.structType = structType;
        this.mowUpsertExec = MowUpsertExec.createMowUpsertExec(this.spark, structType, hoodieWriteConfig, false, str, storageConfiguration);
    }

    @Override // org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWrite
    public void commit(WriterCommitMessage[] writerCommitMessageArr) {
        List<WriteStatus> list = (List) Arrays.stream(writerCommitMessageArr).map(writerCommitMessage -> {
            return (HoodieWriterCommitMessage) writerCommitMessage;
        }).flatMap(hoodieWriterCommitMessage -> {
            return hoodieWriterCommitMessage.getWriteStatuses().stream();
        }).collect(Collectors.toList());
        Map<String, List<DeletionVectorWriteStat>> produceDVs = produceDVs(list);
        if (!produceDVs.isEmpty()) {
            HashSet hashSet = new HashSet();
            list.forEach(writeStatus -> {
                HoodieWriteStat stat = writeStatus.getStat();
                List list2 = (List) produceDVs.get(stat.getPartitionPath());
                if (list2 == null || hashSet.contains(stat.getPartitionPath())) {
                    return;
                }
                list2.forEach(deletionVectorWriteStat -> {
                    stat.addDV(deletionVectorWriteStat.dvFileName(), deletionVectorWriteStat.dvSize());
                });
                hashSet.add(stat.getPartitionPath());
            });
        }
        super.commit(writerCommitMessageArr);
    }

    @Override // org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWrite
    public void onDataWriterCommit(WriterCommitMessage writerCommitMessage) {
        ((HoodieWriterCommitMessage) writerCommitMessage).getWriteStatuses().stream().forEach(writeStatus -> {
            if (writeStatus.getStat().getBloomFilterBytes() != null) {
                if (this.filter == null) {
                    this.filter = MowUpsertExec$.MODULE$.deserializeBloomFilter(writeStatus.getStat().getBloomFilterBytes());
                } else {
                    try {
                        this.filter.mergeInPlace(MowUpsertExec$.MODULE$.deserializeBloomFilter(writeStatus.getStat().getBloomFilterBytes()));
                    } catch (IncompatibleMergeException e) {
                        this.forceDisableBloomFilter = true;
                    }
                }
                writeStatus.getStat().setBloomFilterBytes((byte[]) null);
            }
        });
        super.onDataWriterCommit(writerCommitMessage);
    }

    private Map<String, List<DeletionVectorWriteStat>> produceDVs(List<WriteStatus> list) {
        if (this.dataSourceInternalWriterHelper.getHoodieTable().getMetaClient().getActiveTimeline().getWriteTimeline().filterCompletedInstants().getInstants().isEmpty()) {
            return Collections.EMPTY_MAP;
        }
        return this.mowUpsertExec.persistDeleteVector(JavaConverters.asScalaBuffer(list).toSeq(), JavaConverters.asScalaBuffer((List) this.dataSourceInternalWriterHelper.getHoodieTable().getMetaClient().getTableConfig().getPartitionFields().map((v0) -> {
            return Arrays.asList(v0);
        }).orElse(Collections.emptyList())), this.forceDisableBloomFilter ? null : this.filter);
    }
}
