package org.apache.hudi.table.action.deltacommit;

import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieDumpRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieAppendException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.index.bucket.HoodieBucketIndex;
import org.apache.hudi.io.BucketAppendHandle;
import org.apache.hudi.io.HoodieAppendHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.HoodieWriteHelper;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaRDD$;
import org.apache.spark.rdd.SimpleBucketWriterRDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

/* loaded from: input_file:org/apache/hudi/table/action/deltacommit/SparkBucketUpsertDeltaCommitActionExecutor.class */
public class SparkBucketUpsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseSparkDeltaCommitActionExecutor<T> {
    private static final Logger LOG = LoggerFactory.getLogger(SparkBucketUpsertDeltaCommitActionExecutor.class);
    private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
    private final int bucketNum;
    private final String indexKeyField;
    private final int partitionNum;
    private final boolean withSort;
    private final int coalesceNum;
    private Map<String, Map<Integer, FileSlice>> bucketFiles;

    public SparkBucketUpsertDeltaCommitActionExecutor(HoodieSparkEngineContext hoodieSparkEngineContext, HoodieWriteConfig hoodieWriteConfig, HoodieTable hoodieTable, String str, HoodieData<HoodieRecord<T>> hoodieData) {
        super(hoodieSparkEngineContext, hoodieWriteConfig, hoodieTable, str, WriteOperationType.UPSERT);
        this.bucketFiles = new HashMap();
        this.inputRecordsRDD = ((HoodieJavaRDD) hoodieData).get();
        this.allowCombineOptimization = true;
        this.bucketNum = hoodieWriteConfig.getBucketIndexNumBuckets();
        this.indexKeyField = hoodieWriteConfig.getBucketIndexHashField();
        this.withSort = hoodieWriteConfig.getBucketWriteSupportSort();
        this.coalesceNum = hoodieWriteConfig.getSmallBucketWriteCoalesceNum();
        this.partitionNum = Math.max(this.bucketNum, Integer.valueOf(hoodieWriteConfig.getProps(true).getString("hoodie.upsert.shuffle.parallelism", "200")).intValue());
        setFileSlices(hoodieTable.getLatestBucketFiles());
    }

    @Override // org.apache.hudi.table.action.BaseActionExecutor
    public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
        return HoodieWriteHelper.newInstance().write(this.instantTime, HoodieJavaRDD.of(this.inputRecordsRDD), this.context, this.table, false, this.config.getUpsertShuffleParallelism(), this, WriteOperationType.INSERT_PREPPED);
    }

    private void setFileSlices(List<FileSlice> list) {
        list.stream().forEach(fileSlice -> {
            String partitionPath = fileSlice.getPartitionPath();
            int bucketIdFromFileId = BucketIdentifier.bucketIdFromFileId(fileSlice.getFileId());
            Map<Integer, FileSlice> hashMap = this.bucketFiles.containsKey(partitionPath) ? this.bucketFiles.get(partitionPath) : new HashMap<>();
            hashMap.put(Integer.valueOf(bucketIdFromFileId), fileSlice);
            this.bucketFiles.put(partitionPath, hashMap);
        });
    }

    @Override // org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor, org.apache.hudi.table.action.commit.BaseCommitActionExecutor
    public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> hoodieData) {
        HoodieData<WriteStatus> internalWriteWithSort;
        LOG.info("start execute bucket write");
        this.context.setJobStatus(getClass().getSimpleName(), "Doing partition and writing data");
        if (this.inputRecordsRDD.getNumPartitions() < this.coalesceNum) {
            internalWriteWithSort = writeWithOutShuffle();
        } else {
            internalWriteWithSort = this.withSort ? internalWriteWithSort() : internalWrite();
        }
        this.table.getActiveTimeline().transitionRequestedToInflight(HoodieTimeline.DELTA_COMMIT_ACTION, this.instantTime);
        HoodieWriteMetadata<HoodieData<WriteStatus>> hoodieWriteMetadata = new HoodieWriteMetadata<>();
        updateIndexAndCommitIfNeeded(internalWriteWithSort, hoodieWriteMetadata);
        LOG.info("bucket write end");
        return hoodieWriteMetadata;
    }

    private HoodieData<WriteStatus> internalWriteWithSort() {
        return HoodieJavaRDD.of(this.inputRecordsRDD.mapToPair(hoodieRecord -> {
            int bucketId = BucketIdentifier.getBucketId(hoodieRecord.getRecordKey(), this.indexKeyField, this.bucketNum);
            String partitionPath = hoodieRecord.getPartitionPath();
            return new Tuple2(new Tuple2(partitionPath, Integer.valueOf(StringUtils.isNullOrEmpty(partitionPath) ? bucketId : BucketIdentifier.mod(bucketId + ((partitionPath.hashCode() & Integer.MAX_VALUE) % this.partitionNum), this.partitionNum))), hoodieRecord);
        }).repartitionAndSortWithinPartitions(new Partitioner() { // from class: org.apache.hudi.table.action.deltacommit.SparkBucketUpsertDeltaCommitActionExecutor.1
            public int numPartitions() {
                return SparkBucketUpsertDeltaCommitActionExecutor.this.partitionNum;
            }

            public int getPartition(Object obj) {
                return ((Integer) ((Tuple2) obj)._2).intValue();
            }
        }, (Comparator) ((Serializable) (tuple2, tuple22) -> {
            int compareTo = ((String) tuple2._1).compareTo((String) tuple22._1);
            return compareTo == 0 ? ((Integer) tuple2._2).compareTo((Integer) tuple22._2) : compareTo;
        })).mapPartitions(it -> {
            HoodieRecord taggedRecord;
            HashMap hashMap = new HashMap();
            SparkBucketAppendHelper sparkBucketAppendHelper = new SparkBucketAppendHelper(this.table, this.instantTime, this.taskContextSupplier, this.config);
            BucketAppendHandle bucketAppendHandle = null;
            ArrayList arrayList = new ArrayList();
            String str = "";
            while (it.hasNext()) {
                HoodieRecord hoodieRecord2 = (HoodieRecord) ((Tuple2) it.next())._2;
                int bucketId = BucketIdentifier.getBucketId(hoodieRecord2, this.indexKeyField, this.bucketNum);
                String partitionPath = hoodieRecord2.getPartitionPath();
                if (!hashMap.containsKey(partitionPath)) {
                    hashMap.put(partitionPath, HoodieBucketIndex.loadPartitionBucketFileSliceMapping(this.table, partitionPath));
                }
                FileSlice fileSlice = null;
                if (((Map) hashMap.get(partitionPath)).containsKey(Integer.valueOf(bucketId))) {
                    Pair pair = (Pair) ((Map) hashMap.get(partitionPath)).get(Integer.valueOf(bucketId));
                    taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord2, Option.of(new HoodieRecordLocation((String) pair.getRight(), (String) ((Pair) pair.getLeft()).getRight())));
                    fileSlice = (FileSlice) ((Pair) pair.getLeft()).getLeft();
                } else {
                    String newBucketFileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(bucketId);
                    HashMap hashMap2 = new HashMap();
                    hashMap2.put(Integer.valueOf(bucketId), Pair.of(Pair.of(new FileSlice(partitionPath, this.instantTime, newBucketFileIdPrefix), newBucketFileIdPrefix), this.instantTime));
                    hashMap.put(partitionPath, hashMap2);
                    taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord2, Option.of(new HoodieRecordLocation(this.instantTime, newBucketFileIdPrefix)));
                }
                String fileId = taggedRecord.getCurrentLocation().getFileId();
                if (bucketAppendHandle == null || str.isEmpty() || !str.equals(partitionPath + fileId)) {
                    if (bucketAppendHandle != null) {
                        bucketAppendHandle.close().forEach(obj -> {
                            arrayList.add((WriteStatus) obj);
                        });
                    }
                    bucketAppendHandle = sparkBucketAppendHelper.getBucketAppendCreateHandle(partitionPath, bucketId, fileId, Option.ofNullable(fileSlice));
                    str = partitionPath + fileId;
                    if (bucketAppendHandle.isClosed()) {
                        throw new HoodieAppendException(String.format("try to reopen closed file handle for partitionPath: %s, fileID: %s", partitionPath, fileId));
                    }
                }
                bucketAppendHandle.writeRecord(taggedRecord);
            }
            sparkBucketAppendHelper.close().forEachRemaining(obj2 -> {
                arrayList.add((WriteStatus) obj2);
            });
            return arrayList.iterator();
        }));
    }

    private HoodieData<WriteStatus> writeWithOutShuffle() {
        ClassTag apply = ClassTag$.MODULE$.apply(HoodieRecord.class);
        return HoodieJavaRDD.of(JavaRDD$.MODULE$.fromRDD(new SimpleBucketWriterRDD(JavaRDD.toRDD(this.inputRecordsRDD), 1, this.bucketNum, this.table, this.instantTime, this.indexKeyField, this.config, this.bucketFiles, apply), apply).mapPartitions(it -> {
            ArrayList arrayList = new ArrayList();
            while (it.hasNext()) {
                arrayList.addAll(((HoodieDumpRecord) it.next()).getWriteStatuses());
            }
            return arrayList.iterator();
        }));
    }

    private HoodieData<WriteStatus> internalWrite() {
        return HoodieJavaRDD.of(this.inputRecordsRDD.mapToPair(hoodieRecord -> {
            int bucketId = BucketIdentifier.getBucketId(hoodieRecord.getRecordKey(), this.indexKeyField, this.bucketNum);
            String partitionPath = hoodieRecord.getPartitionPath();
            return new Tuple2(Integer.valueOf(StringUtils.isNullOrEmpty(partitionPath) ? bucketId : BucketIdentifier.mod(bucketId + ((partitionPath.hashCode() & Integer.MAX_VALUE) % this.partitionNum), this.partitionNum)), hoodieRecord);
        }).partitionBy(getPartitioner()).map((v0) -> {
            return v0._2();
        }).mapPartitions(it -> {
            HoodieRecord taggedRecord;
            HashMap hashMap = new HashMap();
            SparkBucketAppendHelper sparkBucketAppendHelper = new SparkBucketAppendHelper(this.table, this.instantTime, this.taskContextSupplier, this.config);
            while (it.hasNext()) {
                HoodieRecord hoodieRecord2 = (HoodieRecord) it.next();
                int bucketId = BucketIdentifier.getBucketId(hoodieRecord2, this.indexKeyField, this.bucketNum);
                String partitionPath = hoodieRecord2.getPartitionPath();
                FileSlice fileSlice = null;
                if (this.bucketFiles.containsKey(partitionPath) && this.bucketFiles.get(partitionPath).size() == this.bucketNum) {
                    fileSlice = this.bucketFiles.get(partitionPath).get(Integer.valueOf(bucketId));
                    taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord2, Option.of(new HoodieRecordLocation(fileSlice.getBaseInstantTime(), fileSlice.getFileId())));
                } else {
                    if (!hashMap.containsKey(partitionPath)) {
                        hashMap.put(partitionPath, HoodieBucketIndex.loadAllPartitionBucketFileSliceMapping(this.table, partitionPath));
                    }
                    if (((Map) hashMap.get(partitionPath)).containsKey(Integer.valueOf(bucketId))) {
                        Pair pair = (Pair) ((Map) hashMap.get(partitionPath)).get(Integer.valueOf(bucketId));
                        taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord2, Option.of(new HoodieRecordLocation((String) pair.getRight(), (String) ((Pair) pair.getLeft()).getRight())));
                        fileSlice = (FileSlice) ((Pair) pair.getLeft()).getLeft();
                    } else {
                        String newBucketFileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(bucketId);
                        HashMap hashMap2 = new HashMap();
                        hashMap2.put(Integer.valueOf(bucketId), Pair.of(Pair.of(new FileSlice(partitionPath, this.instantTime, newBucketFileIdPrefix), newBucketFileIdPrefix), this.instantTime));
                        hashMap.put(partitionPath, hashMap2);
                        taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord2, Option.of(new HoodieRecordLocation(this.instantTime, newBucketFileIdPrefix)));
                    }
                }
                sparkBucketAppendHelper.getBucketAppendCreateHandle(partitionPath, bucketId, taggedRecord.getCurrentLocation().getFileId(), Option.ofNullable(fileSlice)).writeRecord(taggedRecord);
            }
            return sparkBucketAppendHelper.close();
        }));
    }

    private Partitioner getPartitioner() {
        return new Partitioner() { // from class: org.apache.hudi.table.action.deltacommit.SparkBucketUpsertDeltaCommitActionExecutor.2
            public int numPartitions() {
                return SparkBucketUpsertDeltaCommitActionExecutor.this.partitionNum;
            }

            public int getPartition(Object obj) {
                return ((Integer) obj).intValue();
            }
        };
    }

    @Override // org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor, org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor
    public Iterator<List<WriteStatus>> handleUpdate(String str, String str2, Iterator<HoodieRecord<T>> it, FileSlice fileSlice) throws IOException {
        LOG.info("Merging updates for commit " + this.instantTime + " for file " + str2);
        HoodieAppendHandle hoodieAppendHandle = new HoodieAppendHandle(this.config, this.instantTime, this.table, str, str2, it, this.taskContextSupplier, Option.ofNullable(fileSlice));
        hoodieAppendHandle.doAppend();
        return Collections.singletonList(hoodieAppendHandle.close()).iterator();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -813569037:
                if (implMethodName.equals("lambda$internalWriteWithSort$7115966f$1")) {
                    z = true;
                    break;
                }
                break;
            case -637499758:
                if (implMethodName.equals("lambda$internalWriteWithSort$831e9bf8$1")) {
                    z = false;
                    break;
                }
                break;
            case -66423095:
                if (implMethodName.equals("lambda$writeWithOutShuffle$7115966f$1")) {
                    z = 2;
                    break;
                }
                break;
            case 2995:
                if (implMethodName.equals("_2")) {
                    z = 3;
                    break;
                }
                break;
            case 348174006:
                if (implMethodName.equals("lambda$internalWriteWithSort$836f8ec9$1")) {
                    z = 4;
                    break;
                }
                break;
            case 795775255:
                if (implMethodName.equals("lambda$internalWrite$7115966f$1")) {
                    z = 6;
                    break;
                }
                break;
            case 1957518298:
                if (implMethodName.equals("lambda$internalWrite$836f8ec9$1")) {
                    z = 5;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/Comparator") && serializedLambda.getFunctionalInterfaceMethodName().equals("compare") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)I") && serializedLambda.getImplClass().equals("org/apache/hudi/table/action/deltacommit/SparkBucketUpsertDeltaCommitActionExecutor") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;Lscala/Tuple2;)I")) {
                    return (tuple2, tuple22) -> {
                        int compareTo = ((String) tuple2._1).compareTo((String) tuple22._1);
                        return compareTo == 0 ? ((Integer) tuple2._2).compareTo((Integer) tuple22._2) : compareTo;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/action/deltacommit/SparkBucketUpsertDeltaCommitActionExecutor") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    SparkBucketUpsertDeltaCommitActionExecutor sparkBucketUpsertDeltaCommitActionExecutor = (SparkBucketUpsertDeltaCommitActionExecutor) serializedLambda.getCapturedArg(0);
                    return it -> {
                        HoodieRecord taggedRecord;
                        HashMap hashMap = new HashMap();
                        SparkBucketAppendHelper sparkBucketAppendHelper = new SparkBucketAppendHelper(this.table, this.instantTime, this.taskContextSupplier, this.config);
                        BucketAppendHandle bucketAppendHandle = null;
                        List arrayList = new ArrayList();
                        String str = "";
                        while (it.hasNext()) {
                            HoodieRecord hoodieRecord2 = (HoodieRecord) ((Tuple2) it.next())._2;
                            int bucketId = BucketIdentifier.getBucketId(hoodieRecord2, this.indexKeyField, this.bucketNum);
                            String partitionPath = hoodieRecord2.getPartitionPath();
                            if (!hashMap.containsKey(partitionPath)) {
                                hashMap.put(partitionPath, HoodieBucketIndex.loadPartitionBucketFileSliceMapping(this.table, partitionPath));
                            }
                            FileSlice fileSlice = null;
                            if (((Map) hashMap.get(partitionPath)).containsKey(Integer.valueOf(bucketId))) {
                                Pair pair = (Pair) ((Map) hashMap.get(partitionPath)).get(Integer.valueOf(bucketId));
                                taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord2, Option.of(new HoodieRecordLocation((String) pair.getRight(), (String) ((Pair) pair.getLeft()).getRight())));
                                fileSlice = (FileSlice) ((Pair) pair.getLeft()).getLeft();
                            } else {
                                String newBucketFileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(bucketId);
                                HashMap hashMap2 = new HashMap();
                                hashMap2.put(Integer.valueOf(bucketId), Pair.of(Pair.of(new FileSlice(partitionPath, this.instantTime, newBucketFileIdPrefix), newBucketFileIdPrefix), this.instantTime));
                                hashMap.put(partitionPath, hashMap2);
                                taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord2, Option.of(new HoodieRecordLocation(this.instantTime, newBucketFileIdPrefix)));
                            }
                            String fileId = taggedRecord.getCurrentLocation().getFileId();
                            if (bucketAppendHandle == null || str.isEmpty() || !str.equals(partitionPath + fileId)) {
                                if (bucketAppendHandle != null) {
                                    bucketAppendHandle.close().forEach(obj -> {
                                        arrayList.add((WriteStatus) obj);
                                    });
                                }
                                bucketAppendHandle = sparkBucketAppendHelper.getBucketAppendCreateHandle(partitionPath, bucketId, fileId, Option.ofNullable(fileSlice));
                                str = partitionPath + fileId;
                                if (bucketAppendHandle.isClosed()) {
                                    throw new HoodieAppendException(String.format("try to reopen closed file handle for partitionPath: %s, fileID: %s", partitionPath, fileId));
                                }
                            }
                            bucketAppendHandle.writeRecord(taggedRecord);
                        }
                        sparkBucketAppendHelper.close().forEachRemaining(obj2 -> {
                            arrayList.add((WriteStatus) obj2);
                        });
                        return arrayList.iterator();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/action/deltacommit/SparkBucketUpsertDeltaCommitActionExecutor") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    return it2 -> {
                        ArrayList arrayList = new ArrayList();
                        while (it2.hasNext()) {
                            arrayList.addAll(((HoodieDumpRecord) it2.next()).getWriteStatuses());
                        }
                        return arrayList.iterator();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("scala/Tuple2") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0._2();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Lscala/Tuple2;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/action/deltacommit/SparkBucketUpsertDeltaCommitActionExecutor") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/common/model/HoodieRecord;)Lscala/Tuple2;")) {
                    SparkBucketUpsertDeltaCommitActionExecutor sparkBucketUpsertDeltaCommitActionExecutor2 = (SparkBucketUpsertDeltaCommitActionExecutor) serializedLambda.getCapturedArg(0);
                    return hoodieRecord -> {
                        int bucketId = BucketIdentifier.getBucketId(hoodieRecord.getRecordKey(), this.indexKeyField, this.bucketNum);
                        String partitionPath = hoodieRecord.getPartitionPath();
                        return new Tuple2(new Tuple2(partitionPath, Integer.valueOf(StringUtils.isNullOrEmpty(partitionPath) ? bucketId : BucketIdentifier.mod(bucketId + ((partitionPath.hashCode() & Integer.MAX_VALUE) % this.partitionNum), this.partitionNum))), hoodieRecord);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Lscala/Tuple2;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/action/deltacommit/SparkBucketUpsertDeltaCommitActionExecutor") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/common/model/HoodieRecord;)Lscala/Tuple2;")) {
                    SparkBucketUpsertDeltaCommitActionExecutor sparkBucketUpsertDeltaCommitActionExecutor3 = (SparkBucketUpsertDeltaCommitActionExecutor) serializedLambda.getCapturedArg(0);
                    return hoodieRecord2 -> {
                        int bucketId = BucketIdentifier.getBucketId(hoodieRecord2.getRecordKey(), this.indexKeyField, this.bucketNum);
                        String partitionPath = hoodieRecord2.getPartitionPath();
                        return new Tuple2(Integer.valueOf(StringUtils.isNullOrEmpty(partitionPath) ? bucketId : BucketIdentifier.mod(bucketId + ((partitionPath.hashCode() & Integer.MAX_VALUE) % this.partitionNum), this.partitionNum)), hoodieRecord2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("org/apache/hudi/table/action/deltacommit/SparkBucketUpsertDeltaCommitActionExecutor") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    SparkBucketUpsertDeltaCommitActionExecutor sparkBucketUpsertDeltaCommitActionExecutor4 = (SparkBucketUpsertDeltaCommitActionExecutor) serializedLambda.getCapturedArg(0);
                    return it3 -> {
                        HoodieRecord taggedRecord;
                        HashMap hashMap = new HashMap();
                        SparkBucketAppendHelper sparkBucketAppendHelper = new SparkBucketAppendHelper(this.table, this.instantTime, this.taskContextSupplier, this.config);
                        while (it3.hasNext()) {
                            HoodieRecord hoodieRecord22 = (HoodieRecord) it3.next();
                            int bucketId = BucketIdentifier.getBucketId(hoodieRecord22, this.indexKeyField, this.bucketNum);
                            String partitionPath = hoodieRecord22.getPartitionPath();
                            FileSlice fileSlice = null;
                            if (this.bucketFiles.containsKey(partitionPath) && this.bucketFiles.get(partitionPath).size() == this.bucketNum) {
                                fileSlice = this.bucketFiles.get(partitionPath).get(Integer.valueOf(bucketId));
                                taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord22, Option.of(new HoodieRecordLocation(fileSlice.getBaseInstantTime(), fileSlice.getFileId())));
                            } else {
                                if (!hashMap.containsKey(partitionPath)) {
                                    hashMap.put(partitionPath, HoodieBucketIndex.loadAllPartitionBucketFileSliceMapping(this.table, partitionPath));
                                }
                                if (((Map) hashMap.get(partitionPath)).containsKey(Integer.valueOf(bucketId))) {
                                    Pair pair = (Pair) ((Map) hashMap.get(partitionPath)).get(Integer.valueOf(bucketId));
                                    taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord22, Option.of(new HoodieRecordLocation((String) pair.getRight(), (String) ((Pair) pair.getLeft()).getRight())));
                                    fileSlice = (FileSlice) ((Pair) pair.getLeft()).getLeft();
                                } else {
                                    String newBucketFileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(bucketId);
                                    HashMap hashMap2 = new HashMap();
                                    hashMap2.put(Integer.valueOf(bucketId), Pair.of(Pair.of(new FileSlice(partitionPath, this.instantTime, newBucketFileIdPrefix), newBucketFileIdPrefix), this.instantTime));
                                    hashMap.put(partitionPath, hashMap2);
                                    taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord22, Option.of(new HoodieRecordLocation(this.instantTime, newBucketFileIdPrefix)));
                                }
                            }
                            sparkBucketAppendHelper.getBucketAppendCreateHandle(partitionPath, bucketId, taggedRecord.getCurrentLocation().getFileId(), Option.ofNullable(fileSlice)).writeRecord(taggedRecord);
                        }
                        return sparkBucketAppendHelper.close();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
