package org.apache.hudi.sink.clustering.update.strategy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
import org.apache.hudi.table.action.cluster.util.ConsistentHashingUpdateStrategyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/clustering/update/strategy/FlinkConsistentBucketUpdateStrategy.class */
public class FlinkConsistentBucketUpdateStrategy<T extends HoodieRecordPayload> extends UpdateStrategy<T, List<Pair<List<HoodieRecord>, String>>> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkConsistentBucketUpdateStrategy.class);
    private boolean initialized;
    private List<String> indexKeyFields;
    private Map<String, Pair<String, ConsistentBucketIdentifier>> partitionToIdentifier;
    private String lastRefreshInstant;

    public FlinkConsistentBucketUpdateStrategy(HoodieFlinkWriteClient hoodieFlinkWriteClient, List<String> list) {
        super(hoodieFlinkWriteClient.getEngineContext(), hoodieFlinkWriteClient.getHoodieTable(), Collections.emptySet());
        this.initialized = false;
        this.lastRefreshInstant = "00000000000000";
        this.indexKeyFields = list;
        this.partitionToIdentifier = new HashMap();
    }

    public void initialize(HoodieFlinkWriteClient hoodieFlinkWriteClient) {
        if (this.initialized) {
            return;
        }
        HoodieFlinkTable hoodieTable = hoodieFlinkWriteClient.getHoodieTable();
        List<HoodieInstant> pendingClusteringInstantTimes = ClusteringUtils.getPendingClusteringInstantTimes(hoodieTable.getMetaClient());
        if (!pendingClusteringInstantTimes.isEmpty()) {
            HoodieInstant hoodieInstant = pendingClusteringInstantTimes.get(pendingClusteringInstantTimes.size() - 1);
            if (hoodieInstant.getTimestamp().compareTo(this.lastRefreshInstant) > 0) {
                LOG.info("Found new pending replacement commit. Last pending replacement commit is {}.", hoodieInstant);
                this.table = hoodieTable;
                this.fileGroupsInPendingClustering = (Set) hoodieTable.getFileSystemView().getFileGroupsInPendingClustering().map((v0) -> {
                    return v0.getKey();
                }).collect(Collectors.toSet());
                this.lastRefreshInstant = hoodieInstant.getTimestamp();
                this.partitionToIdentifier.clear();
            }
        }
        this.initialized = true;
    }

    public void reset() {
        this.initialized = false;
    }

    @Override // org.apache.hudi.table.action.cluster.strategy.UpdateStrategy
    public Pair<List<Pair<List<HoodieRecord>, String>>, Set<HoodieFileGroupId>> handleUpdate(List<Pair<List<HoodieRecord>, String>> list) {
        ValidationUtils.checkArgument(this.initialized, "Strategy has not been initialized");
        ValidationUtils.checkArgument(list.size() == 1);
        Pair<List<HoodieRecord>, String> pair = list.get(0);
        HoodieRecord hoodieRecord = pair.getLeft().get(0);
        HoodieFileGroupId hoodieFileGroupId = new HoodieFileGroupId(hoodieRecord.getPartitionPath(), hoodieRecord.getCurrentLocation().getFileId());
        return (this.fileGroupsInPendingClustering.isEmpty() || !this.fileGroupsInPendingClustering.contains(hoodieFileGroupId)) ? Pair.of(list, Collections.singleton(hoodieFileGroupId)) : doHandleUpdate(hoodieFileGroupId, pair);
    }

    private Pair<List<Pair<List<HoodieRecord>, String>>, Set<HoodieFileGroupId>> doHandleUpdate(HoodieFileGroupId hoodieFileGroupId, Pair<List<HoodieRecord>, String> pair) {
        Pair<String, ConsistentBucketIdentifier> bucketIdentifierOfPartition = getBucketIdentifierOfPartition(hoodieFileGroupId.getPartitionPath());
        String left = bucketIdentifierOfPartition.getLeft();
        ConsistentBucketIdentifier right = bucketIdentifierOfPartition.getRight();
        Map map = (Map) pair.getLeft().stream().map((v0) -> {
            return v0.newInstance();
        }).collect(Collectors.groupingBy(hoodieRecord -> {
            return right.getBucket(hoodieRecord.getKey(), this.indexKeyFields).getFileIdPrefix();
        }));
        ArrayList arrayList = new ArrayList();
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        for (Map.Entry entry : map.entrySet()) {
            String createNewFileId = FSUtils.createNewFileId((String) entry.getKey(), 0);
            patchFileIdToRecords((List) entry.getValue(), createNewFileId);
            arrayList.add(Pair.of((List) entry.getValue(), left));
            linkedHashSet.add(new HoodieFileGroupId(hoodieFileGroupId.getPartitionPath(), createNewFileId));
        }
        LOG.info("Apply duplicate update for FileGroup {}, routing records to: {}.", hoodieFileGroupId, String.join(",", map.keySet()));
        arrayList.add(pair);
        linkedHashSet.add(hoodieFileGroupId);
        return Pair.of(arrayList, linkedHashSet);
    }

    private Pair<String, ConsistentBucketIdentifier> getBucketIdentifierOfPartition(String str) {
        return this.partitionToIdentifier.computeIfAbsent(str, str2 -> {
            return ConsistentHashingUpdateStrategyUtils.constructPartitionToIdentifier(Collections.singleton(str2), this.table).get(str2);
        });
    }

    private void patchFileIdToRecords(List<HoodieRecord> list, String str) {
        HoodieRecord hoodieRecord = list.get(0);
        HoodieAvroRecord hoodieAvroRecord = new HoodieAvroRecord(hoodieRecord.getKey(), (HoodieRecordPayload) hoodieRecord.getData(), hoodieRecord.getOperation());
        hoodieAvroRecord.setCurrentLocation(new HoodieRecordLocation("U", str));
        list.set(0, hoodieAvroRecord);
    }
}
