package org.apache.hudi.table.action.commit;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.JavaLazyInsertIterable;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.HoodieConcatHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.org.apache.hadoop.hbase.util.Strings;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.class */
public abstract class BaseJavaCommitActionExecutor<T extends HoodieRecordPayload> extends BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, HoodieWriteMetadata> {
    private static final Logger LOG = LogManager.getLogger(BaseJavaCommitActionExecutor.class);

    public BaseJavaCommitActionExecutor(HoodieEngineContext hoodieEngineContext, HoodieWriteConfig hoodieWriteConfig, HoodieTable hoodieTable, String str, WriteOperationType writeOperationType) {
        super(hoodieEngineContext, hoodieWriteConfig, hoodieTable, str, writeOperationType, Option.empty());
    }

    public BaseJavaCommitActionExecutor(HoodieEngineContext hoodieEngineContext, HoodieWriteConfig hoodieWriteConfig, HoodieTable hoodieTable, String str, WriteOperationType writeOperationType, Option option) {
        super(hoodieEngineContext, hoodieWriteConfig, hoodieTable, str, writeOperationType, option);
    }

    @Override // org.apache.hudi.table.action.commit.BaseCommitActionExecutor
    public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> list) {
        HoodieWriteMetadata<List<WriteStatus>> hoodieWriteMetadata = new HoodieWriteMetadata<>();
        WorkloadProfile workloadProfile = null;
        if (isWorkloadProfileNeeded()) {
            workloadProfile = new WorkloadProfile(buildProfile(list), this.table.getIndex().canIndexLogFiles());
            LOG.info("Input workload profile :" + workloadProfile);
        }
        Partitioner partitioner = getPartitioner(workloadProfile);
        try {
            saveWorkloadProfileMetadataToInflight(workloadProfile, this.instantTime);
        } catch (Exception e) {
            HoodieTableMetaClient metaClient = this.table.getMetaClient();
            try {
                if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), this.instantTime).getFileName()))) {
                    throw new HoodieCommitException("Failed to commit " + this.instantTime + " unable to save inflight metadata ", e);
                }
            } catch (IOException e2) {
                LOG.error("Check file exists failed");
                throw new HoodieCommitException("Failed to commit " + this.instantTime + " unable to save inflight metadata ", e2);
            }
        }
        Map<Integer, List<HoodieRecord<T>>> partition = partition(list, partitioner);
        LinkedList linkedList = new LinkedList();
        partition.forEach((num, list2) -> {
            if (WriteOperationType.isChangingRecords(this.operationType)) {
                Iterator<List<WriteStatus>> handleUpsertPartition = handleUpsertPartition(this.instantTime, num, list2.iterator(), partitioner);
                linkedList.getClass();
                handleUpsertPartition.forEachRemaining((v1) -> {
                    r1.addAll(v1);
                });
            } else {
                Iterator<List<WriteStatus>> handleInsertPartition = handleInsertPartition(this.instantTime, num, list2.iterator(), partitioner);
                linkedList.getClass();
                handleInsertPartition.forEachRemaining((v1) -> {
                    r1.addAll(v1);
                });
            }
        });
        updateIndex(linkedList, hoodieWriteMetadata);
        updateIndexAndCommitIfNeeded(linkedList, hoodieWriteMetadata);
        return hoodieWriteMetadata;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<WriteStatus> updateIndex(List<WriteStatus> list, HoodieWriteMetadata<List<WriteStatus>> hoodieWriteMetadata) {
        Instant now = Instant.now();
        List<WriteStatus> list2 = HoodieList.getList(this.table.getIndex().updateLocation(HoodieList.of(list), this.context, (HoodieTable) this.table));
        hoodieWriteMetadata.setIndexUpdateDuration(Duration.between(now, Instant.now()));
        hoodieWriteMetadata.setWriteStatuses(list2);
        return list2;
    }

    @Override // org.apache.hudi.table.action.commit.BaseCommitActionExecutor
    protected String getCommitActionType() {
        return this.table.getMetaClient().getCommitActionType();
    }

    private Partitioner getPartitioner(WorkloadProfile workloadProfile) {
        return WriteOperationType.isChangingRecords(this.operationType) ? getUpsertPartitioner(workloadProfile) : getInsertPartitioner(workloadProfile);
    }

    private Map<Integer, List<HoodieRecord<T>>> partition(List<HoodieRecord<T>> list, Partitioner partitioner) {
        Map map = (Map) list.stream().map(hoodieRecord -> {
            return Pair.of(Pair.of(hoodieRecord.getKey(), Option.ofNullable(hoodieRecord.getCurrentLocation())), hoodieRecord);
        }).collect(Collectors.groupingBy(pair -> {
            return Integer.valueOf(partitioner.getPartition(pair.getLeft()));
        }));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        map.forEach((num, list2) -> {
        });
        return linkedHashMap;
    }

    protected Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(List<HoodieRecord<T>> list) {
        HashMap hashMap = new HashMap();
        WorkloadStat workloadStat = new WorkloadStat();
        for (Map.Entry entry : ((Map) list.stream().map(hoodieRecord -> {
            return Pair.of(Pair.of(hoodieRecord.getPartitionPath(), Option.ofNullable(hoodieRecord.getCurrentLocation())), hoodieRecord);
        }).collect(Collectors.groupingBy((v0) -> {
            return v0.getLeft();
        }, Collectors.counting()))).entrySet()) {
            String str = (String) ((Pair) entry.getKey()).getLeft();
            Long l = (Long) entry.getValue();
            Option option = (Option) ((Pair) entry.getKey()).getRight();
            if (!hashMap.containsKey(str)) {
                hashMap.put(str, new WorkloadStat());
            }
            if (option.isPresent()) {
                ((WorkloadStat) hashMap.get(str)).addUpdates((HoodieRecordLocation) option.get(), l.longValue());
                workloadStat.addUpdates((HoodieRecordLocation) option.get(), l.longValue());
            } else {
                ((WorkloadStat) hashMap.get(str)).addInserts(l.longValue());
                workloadStat.addInserts(l.longValue());
            }
        }
        return Pair.of(hashMap, workloadStat);
    }

    @Override // org.apache.hudi.table.action.commit.BaseCommitActionExecutor
    protected void commit(Option<Map<String, String>> option, HoodieWriteMetadata<List<WriteStatus>> hoodieWriteMetadata) {
        commit(option, hoodieWriteMetadata, (List) hoodieWriteMetadata.getWriteStatuses().stream().map((v0) -> {
            return v0.getStat();
        }).collect(Collectors.toList()));
    }

    @Override // org.apache.hudi.table.action.commit.BaseCommitActionExecutor
    protected void setCommitMetadata(HoodieWriteMetadata<List<WriteStatus>> hoodieWriteMetadata) {
        hoodieWriteMetadata.setCommitMetadata(Option.of(CommitUtils.buildMetadata((List) hoodieWriteMetadata.getWriteStatuses().stream().map((v0) -> {
            return v0.getStat();
        }).collect(Collectors.toList()), hoodieWriteMetadata.getPartitionToReplaceFileIds(), this.extraMetadata, this.operationType, getSchemaToStoreInCommit(), getCommitActionType())));
    }

    protected void commit(Option<Map<String, String>> option, HoodieWriteMetadata<List<WriteStatus>> hoodieWriteMetadata, List<HoodieWriteStat> list) {
        String commitActionType = getCommitActionType();
        LOG.info("Committing " + this.instantTime + ", action Type " + commitActionType);
        hoodieWriteMetadata.setCommitted(true);
        hoodieWriteMetadata.setWriteStats(list);
        finalizeWrite(this.instantTime, list, hoodieWriteMetadata);
        try {
            LOG.info("Committing " + this.instantTime + ", action Type " + getCommitActionType());
            HoodieActiveTimeline activeTimeline = this.table.getActiveTimeline();
            HoodieCommitMetadata hoodieCommitMetadata = hoodieWriteMetadata.getCommitMetadata().get();
            writeTableMetadata(hoodieCommitMetadata, commitActionType);
            activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), this.instantTime), Option.of(hoodieCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
            LOG.info("Committed " + this.instantTime);
            hoodieWriteMetadata.setCommitMetadata(Option.of(hoodieCommitMetadata));
        } catch (IOException e) {
            throw new HoodieCommitException("Failed to complete commit " + this.config.getBasePath() + " at time " + this.instantTime, e);
        }
    }

    protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<List<WriteStatus>> hoodieWriteMetadata) {
        return Collections.emptyMap();
    }

    @Override // org.apache.hudi.table.action.commit.BaseCommitActionExecutor
    protected boolean isWorkloadProfileNeeded() {
        return true;
    }

    protected Iterator<List<WriteStatus>> handleUpsertPartition(String str, Integer num, Iterator it, Partitioner partitioner) {
        BucketInfo bucketInfo = ((JavaUpsertPartitioner) partitioner).getBucketInfo(num.intValue());
        BucketType bucketType = bucketInfo.bucketType;
        try {
            if (bucketType.equals(BucketType.INSERT)) {
                return handleInsert(bucketInfo.fileIdPrefix, it);
            }
            if (bucketType.equals(BucketType.UPDATE)) {
                return handleUpdate(bucketInfo.partitionPath, bucketInfo.fileIdPrefix, it);
            }
            throw new HoodieUpsertException("Unknown bucketType " + bucketType + " for partition :" + num);
        } catch (Throwable th) {
            String str2 = "Error upserting bucketType " + bucketType + " for partition :" + num;
            LOG.error(str2, th);
            throw new HoodieUpsertException(str2, th);
        }
    }

    protected Iterator<List<WriteStatus>> handleInsertPartition(String str, Integer num, Iterator it, Partitioner partitioner) {
        return handleUpsertPartition(str, num, it, partitioner);
    }

    @Override // org.apache.hudi.table.action.commit.BaseCommitActionExecutor
    public Iterator<List<WriteStatus>> handleUpdate(String str, String str2, Iterator<HoodieRecord<T>> it) throws IOException {
        if (it.hasNext()) {
            return handleUpdateInternal(getUpdateHandle(str, str2, it), str2);
        }
        LOG.info("Empty partition with fileId => " + str2);
        return Collections.singletonList(Collections.EMPTY_LIST).iterator();
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> hoodieMergeHandle, String str) throws IOException {
        if (hoodieMergeHandle.getOldFilePath() == null) {
            throw new HoodieUpsertException("Error in finding the old file path at commit " + this.instantTime + " for fileId: " + str);
        }
        JavaMergeHelper.newInstance().runMerge(this.table, hoodieMergeHandle);
        List<WriteStatus> writeStatuses = hoodieMergeHandle.writeStatuses();
        if (hoodieMergeHandle.getPartitionPath() == null) {
            LOG.info("Upsert Handle has partition path as null " + hoodieMergeHandle.getOldFilePath() + Strings.DEFAULT_KEYVALUE_SEPARATOR + writeStatuses);
        }
        return Collections.singletonList(writeStatuses).iterator();
    }

    protected HoodieMergeHandle getUpdateHandle(String str, String str2, Iterator<HoodieRecord<T>> it) {
        return this.table.requireSortedRecords() ? new HoodieSortedMergeHandle(this.config, this.instantTime, this.table, it, str, str2, this.taskContextSupplier, Option.empty()) : (WriteOperationType.isChangingRecords(this.operationType) || !this.config.allowDuplicateInserts()) ? new HoodieMergeHandle(this.config, this.instantTime, this.table, it, str, str2, this.taskContextSupplier, Option.empty()) : new HoodieConcatHandle(this.config, this.instantTime, this.table, it, str, str2, this.taskContextSupplier, (Option<BaseKeyGenerator>) Option.empty());
    }

    protected HoodieMergeHandle getUpdateHandle(String str, String str2, Map<String, HoodieRecord<T>> map, HoodieBaseFile hoodieBaseFile) {
        return new HoodieMergeHandle(this.config, this.instantTime, this.table, map, str, str2, hoodieBaseFile, this.taskContextSupplier, (Option<BaseKeyGenerator>) Option.empty());
    }

    @Override // org.apache.hudi.table.action.commit.BaseCommitActionExecutor
    public Iterator<List<WriteStatus>> handleInsert(String str, Iterator<HoodieRecord<T>> it) {
        if (it.hasNext()) {
            return new JavaLazyInsertIterable(it, true, this.config, this.instantTime, this.table, str, this.taskContextSupplier, new CreateHandleFactory());
        }
        LOG.info("Empty partition");
        return Collections.singletonList(Collections.EMPTY_LIST).iterator();
    }

    public Partitioner getUpsertPartitioner(WorkloadProfile workloadProfile) {
        if (workloadProfile == null) {
            throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
        }
        return new JavaUpsertPartitioner(workloadProfile, this.context, this.table, this.config);
    }

    public Partitioner getInsertPartitioner(WorkloadProfile workloadProfile) {
        return getUpsertPartitioner(workloadProfile);
    }

    public void updateIndexAndCommitIfNeeded(List<WriteStatus> list, HoodieWriteMetadata hoodieWriteMetadata) {
        Instant now = Instant.now();
        List list2 = HoodieList.getList(this.table.getIndex().updateLocation(HoodieList.of(list), this.context, (HoodieTable) this.table));
        hoodieWriteMetadata.setIndexUpdateDuration(Duration.between(now, Instant.now()));
        hoodieWriteMetadata.setWriteStatuses(list2);
        hoodieWriteMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(hoodieWriteMetadata));
        commitOnAutoCommit(hoodieWriteMetadata);
    }
}
