package org.apache.hudi.sink.bucket;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.sink.StreamWriteFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/bucket/BucketStreamWriteFunction.class */
public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
    private static final Logger LOG = LoggerFactory.getLogger(BucketStreamWriteFunction.class);
    private int parallelism;
    private int bucketNum;
    private String indexKeyFields;
    private Map<String, Map<Integer, String>> bucketIndex;
    private Set<String> incBucketIndex;

    public BucketStreamWriteFunction(Configuration configuration) {
        super(configuration);
    }

    @Override // org.apache.hudi.sink.StreamWriteFunction
    public void open(Configuration configuration) throws IOException {
        super.open(configuration);
        this.bucketNum = this.config.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
        this.indexKeyFields = this.config.getString(FlinkOptions.INDEX_KEY_FIELD);
        this.taskID = getRuntimeContext().getIndexOfThisSubtask();
        this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
        this.bucketIndex = new HashMap();
        this.incBucketIndex = new HashSet();
    }

    @Override // org.apache.hudi.sink.StreamWriteFunction, org.apache.hudi.sink.common.AbstractStreamWriteFunction
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        super.initializeState(functionInitializationContext);
    }

    @Override // org.apache.hudi.sink.StreamWriteFunction, org.apache.hudi.sink.common.AbstractStreamWriteFunction
    public void snapshotStateInternal(FunctionSnapshotContext functionSnapshotContext) throws IOException {
        super.snapshotStateInternal(functionSnapshotContext);
        this.incBucketIndex.clear();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.hudi.sink.StreamWriteFunction
    public void processElement(I i, ProcessFunction<I, Object>.Context context, Collector<Object> collector) throws Exception {
        HoodieRecordLocation hoodieRecordLocation;
        this.timer.markStart();
        HoodieRecord<?> hoodieRecord = (HoodieRecord) i;
        if (filterDeleteRecord(hoodieRecord)) {
            return;
        }
        HoodieKey key = hoodieRecord.getKey();
        String partitionPath = key.getPartitionPath();
        bootstrapIndexIfNeed(partitionPath);
        Map<Integer, String> computeIfAbsent = this.bucketIndex.computeIfAbsent(partitionPath, str -> {
            return new HashMap();
        });
        int bucketId = BucketIdentifier.getBucketId(key, this.indexKeyFields, this.bucketNum);
        String str2 = partitionPath + bucketId;
        if (this.incBucketIndex.contains(str2)) {
            hoodieRecordLocation = new HoodieRecordLocation("I", computeIfAbsent.get(Integer.valueOf(bucketId)));
        } else if (computeIfAbsent.containsKey(Integer.valueOf(bucketId))) {
            hoodieRecordLocation = new HoodieRecordLocation("U", computeIfAbsent.get(Integer.valueOf(bucketId)));
        } else {
            String newBucketFileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(bucketId);
            hoodieRecordLocation = new HoodieRecordLocation("I", newBucketFileIdPrefix);
            computeIfAbsent.put(Integer.valueOf(bucketId), newBucketFileIdPrefix);
            this.incBucketIndex.add(str2);
        }
        hoodieRecord.unseal();
        hoodieRecord.setCurrentLocation(hoodieRecordLocation);
        hoodieRecord.seal();
        bufferRecord(hoodieRecord);
        this.timer.markEnd();
    }

    public boolean isBucketToLoad(int i, String str) {
        return BucketIdentifier.mod(((str.hashCode() & Integer.MAX_VALUE) % this.parallelism) + i, this.parallelism) == this.taskID;
    }

    private void bootstrapIndexIfNeed(String str) {
        if (this.bucketIndex.containsKey(str)) {
            return;
        }
        LOG.info(String.format("Loading Hoodie Table %s, with path %s", this.metaClient.getTableConfig().getTableName(), this.metaClient.getBasePath() + "/" + str));
        HashMap hashMap = new HashMap();
        this.writeClient.getHoodieTable().getFileSystemView().getAllFileGroups(str).forEach(hoodieFileGroup -> {
            String fileId = hoodieFileGroup.getFileGroupId().getFileId();
            int bucketIdFromFileId = BucketIdentifier.bucketIdFromFileId(fileId);
            if (isBucketToLoad(bucketIdFromFileId, str)) {
                LOG.info(String.format("Should load this partition bucket %s with fileID %s", Integer.valueOf(bucketIdFromFileId), fileId));
                if (hashMap.containsKey(Integer.valueOf(bucketIdFromFileId))) {
                    throw new RuntimeException(String.format("Duplicate fileID %s from bucket %s of partition %s found during the BucketStreamWriteFunction index bootstrap.", fileId, Integer.valueOf(bucketIdFromFileId), str));
                }
                LOG.info(String.format("Adding fileID %s to the bucket %s of partition %s.", fileId, Integer.valueOf(bucketIdFromFileId), str));
                hashMap.put(Integer.valueOf(bucketIdFromFileId), fileId);
            }
        });
        this.bucketIndex.put(str, hashMap);
    }
}
