package org.apache.hudi.io;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.org.apache.avro.generic.GenericRecord;
import org.apache.hudi.org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

@NotThreadSafe
/* loaded from: input_file:org/apache/hudi/io/HoodieMergeHandle.class */
public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> {
    private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class);
    protected Map<String, HoodieRecord<T>> keyToNewRecords;
    protected Set<String> writtenRecordKeys;
    protected HoodieFileWriter<IndexedRecord> fileWriter;
    private boolean preserveMetadata;
    protected Path newFilePath;
    protected Path oldFilePath;
    protected long recordsWritten;
    protected long recordsDeleted;
    protected long updatedRecordsWritten;
    protected long insertRecordsWritten;
    protected boolean useWriterSchemaForCompaction;
    protected Option<BaseKeyGenerator> keyGeneratorOpt;
    private HoodieBaseFile baseFileToMerge;

    public HoodieMergeHandle(HoodieWriteConfig hoodieWriteConfig, String str, HoodieTable<T, I, K, O> hoodieTable, Iterator<HoodieRecord<T>> it, String str2, String str3, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> option) {
        this(hoodieWriteConfig, str, hoodieTable, it, str2, str3, taskContextSupplier, hoodieTable.getBaseFileOnlyView().getLatestBaseFile(str2, str3).get(), option);
    }

    public HoodieMergeHandle(HoodieWriteConfig hoodieWriteConfig, String str, HoodieTable<T, I, K, O> hoodieTable, Iterator<HoodieRecord<T>> it, String str2, String str3, TaskContextSupplier taskContextSupplier, HoodieBaseFile hoodieBaseFile, Option<BaseKeyGenerator> option) {
        super(hoodieWriteConfig, str, str2, str3, hoodieTable, taskContextSupplier);
        this.preserveMetadata = false;
        this.recordsWritten = 0L;
        this.recordsDeleted = 0L;
        this.updatedRecordsWritten = 0L;
        this.insertRecordsWritten = 0L;
        init(str3, it);
        init(str3, str2, hoodieBaseFile);
        validateAndSetAndKeyGenProps(option, hoodieWriteConfig.populateMetaFields());
    }

    public HoodieMergeHandle(HoodieWriteConfig hoodieWriteConfig, String str, HoodieTable<T, I, K, O> hoodieTable, Map<String, HoodieRecord<T>> map, String str2, String str3, HoodieBaseFile hoodieBaseFile, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> option) {
        super(hoodieWriteConfig, str, str2, str3, hoodieTable, taskContextSupplier);
        this.preserveMetadata = false;
        this.recordsWritten = 0L;
        this.recordsDeleted = 0L;
        this.updatedRecordsWritten = 0L;
        this.insertRecordsWritten = 0L;
        this.keyToNewRecords = map;
        this.useWriterSchemaForCompaction = true;
        this.preserveMetadata = hoodieWriteConfig.isPreserveHoodieCommitMetadataForCompaction();
        init(str3, this.partitionPath, hoodieBaseFile);
        validateAndSetAndKeyGenProps(option, hoodieWriteConfig.populateMetaFields());
    }

    private void validateAndSetAndKeyGenProps(Option<BaseKeyGenerator> option, boolean z) {
        ValidationUtils.checkArgument(z == (!option.isPresent()));
        this.keyGeneratorOpt = option;
    }

    @Override // org.apache.hudi.io.HoodieWriteHandle
    public Schema getWriterSchemaWithMetaFields() {
        return this.writeSchemaWithMetaFields;
    }

    @Override // org.apache.hudi.io.HoodieWriteHandle
    public void write(HoodieRecord hoodieRecord, Option<IndexedRecord> option) {
    }

    public Schema getWriterSchema() {
        return this.writeSchema;
    }

    private void init(String str, String str2, HoodieBaseFile hoodieBaseFile) {
        LOG.info("partitionPath:" + str2 + ", fileId to be merged:" + str);
        this.baseFileToMerge = hoodieBaseFile;
        this.writtenRecordKeys = new HashSet();
        this.writeStatus.setStat(new HoodieWriteStat());
        try {
            String fileName = hoodieBaseFile.getFileName();
            this.writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(fileName));
            new HoodiePartitionMetadata(this.fs, this.instantTime, new Path(this.config.getBasePath()), FSUtils.getPartitionPath(this.config.getBasePath(), str2), this.hoodieTable.getPartitionMetafileFormat()).trySave(getPartitionId());
            String makeDataFileName = FSUtils.makeDataFileName(this.instantTime, this.writeToken, str, this.hoodieTable.getBaseFileExtension());
            makeOldAndNewFilePaths(str2, fileName, makeDataFileName);
            LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", this.oldFilePath.toString(), this.newFilePath.toString()));
            this.writeStatus.setFileId(str);
            this.writeStatus.setPartitionPath(str2);
            this.writeStatus.getStat().setPartitionPath(str2);
            this.writeStatus.getStat().setFileId(str);
            setWriteStatusPath();
            createMarkerFile(str2, makeDataFileName);
            this.fileWriter = createNewFileWriter(this.instantTime, this.newFilePath, this.hoodieTable, this.config, this.writeSchemaWithMetaFields, this.taskContextSupplier);
        } catch (IOException e) {
            LOG.error("Error in update task at commit " + this.instantTime, e);
            this.writeStatus.setGlobalError(e);
            throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle for FileId: " + str + " on commit " + this.instantTime + " on path " + this.hoodieTable.getMetaClient().getBasePath(), e);
        }
    }

    protected void setWriteStatusPath() {
        this.writeStatus.getStat().setPath(new Path(this.config.getBasePath()), this.newFilePath);
    }

    protected void makeOldAndNewFilePaths(String str, String str2, String str3) {
        this.oldFilePath = makeNewFilePath(str, str2);
        this.newFilePath = makeNewFilePath(str, str3);
    }

    protected void initializeIncomingRecordsMap() {
        try {
            long maxMemoryPerPartitionMerge = IOUtils.getMaxMemoryPerPartitionMerge(this.taskContextSupplier, this.config);
            LOG.info("MaxMemoryPerPartitionMerge => " + maxMemoryPerPartitionMerge);
            this.keyToNewRecords = new ExternalSpillableMap(Long.valueOf(maxMemoryPerPartitionMerge), this.config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(this.tableSchema), this.config.getCommonConfig().getSpillableDiskMapType(), this.config.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
        } catch (IOException e) {
            throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean needsUpdateLocation() {
        return true;
    }

    protected void init(String str, Iterator<HoodieRecord<T>> it) {
        initializeIncomingRecordsMap();
        while (it.hasNext()) {
            HoodieRecord<T> next = it.next();
            if (needsUpdateLocation()) {
                next.unseal();
                next.setNewLocation(new HoodieRecordLocation(this.instantTime, str));
                next.seal();
            }
            this.keyToNewRecords.put(next.getRecordKey(), next);
        }
        LOG.info("Number of entries in MemoryBasedMap => " + ((ExternalSpillableMap) this.keyToNewRecords).getInMemoryMapNumEntries() + ", Total size in bytes of MemoryBasedMap => " + ((ExternalSpillableMap) this.keyToNewRecords).getCurrentInMemoryMapSize() + ", Number of entries in BitCaskDiskMap => " + ((ExternalSpillableMap) this.keyToNewRecords).getDiskBasedMapNumEntries() + ", Size of file spilled to disk => " + ((ExternalSpillableMap) this.keyToNewRecords).getSizeOfFileOnDiskInBytes());
    }

    private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord genericRecord, Option<IndexedRecord> option) {
        boolean z = false;
        if (option.isPresent()) {
            this.updatedRecordsWritten++;
            if (genericRecord != ((GenericRecord) option.get())) {
                z = HoodieOperation.isDelete(hoodieRecord.getOperation());
            }
        }
        return writeRecord(hoodieRecord, option, z);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
        Option<IndexedRecord> insertValue = hoodieRecord.getData().getInsertValue(this.useWriterSchemaForCompaction ? this.tableSchemaWithMetaFields : this.tableSchema, this.config.getProps());
        if (!(insertValue.isPresent() && insertValue.get().equals(IGNORE_RECORD)) && writeRecord(hoodieRecord, insertValue, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
            this.insertRecordsWritten++;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> option) {
        return writeRecord(hoodieRecord, option, false);
    }

    protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> option, boolean z) {
        Option<Map<String, String>> metadata = hoodieRecord.getData().getMetadata();
        if (!this.partitionPath.equals(hoodieRecord.getPartitionPath())) {
            this.writeStatus.markFailure(hoodieRecord, new HoodieUpsertException("mismatched partition path, record partition: " + hoodieRecord.getPartitionPath() + " but trying to insert into partition: " + this.partitionPath), metadata);
            return false;
        }
        try {
            if (!option.isPresent() || z) {
                this.recordsDeleted++;
            } else {
                writeToFile(hoodieRecord.getKey(), (GenericRecord) option.get(), this.preserveMetadata && this.useWriterSchemaForCompaction);
                this.recordsWritten++;
            }
            this.writeStatus.markSuccess(hoodieRecord, metadata);
            hoodieRecord.deflate();
            return true;
        } catch (Exception e) {
            LOG.error("Error writing record  " + hoodieRecord, e);
            this.writeStatus.markFailure(hoodieRecord, e, metadata);
            return false;
        }
    }

    public void write(GenericRecord genericRecord) {
        String recordKeyFromGenericRecord = KeyGenUtils.getRecordKeyFromGenericRecord(genericRecord, this.keyGeneratorOpt);
        boolean z = true;
        if (this.keyToNewRecords.containsKey(recordKeyFromGenericRecord)) {
            HoodieRecord<T> newInstance = this.keyToNewRecords.get(recordKeyFromGenericRecord).newInstance();
            try {
                Option<IndexedRecord> combineAndGetUpdateValue = newInstance.getData().combineAndGetUpdateValue(genericRecord, this.useWriterSchemaForCompaction ? this.tableSchemaWithMetaFields : this.tableSchema, this.config.getPayloadConfig().getProps());
                if (combineAndGetUpdateValue.isPresent() && combineAndGetUpdateValue.get().equals(IGNORE_RECORD)) {
                    z = true;
                } else if (writeUpdateRecord(newInstance, genericRecord, combineAndGetUpdateValue)) {
                    z = false;
                }
                this.writtenRecordKeys.add(recordKeyFromGenericRecord);
            } catch (Exception e) {
                throw new HoodieUpsertException("Failed to combine/merge new record with old value in storage, for new record {" + this.keyToNewRecords.get(recordKeyFromGenericRecord) + "}, old value {" + genericRecord + "}", e);
            }
        }
        if (z) {
            try {
                writeToFile(new HoodieKey(recordKeyFromGenericRecord, this.partitionPath), genericRecord, true);
                this.recordsWritten++;
            } catch (IOException | RuntimeException e2) {
                String format = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s", recordKeyFromGenericRecord, getOldFilePath(), this.newFilePath, this.writeSchemaWithMetaFields.toString(true));
                LOG.debug("Old record is " + genericRecord);
                throw new HoodieUpsertException(format, e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeToFile(HoodieKey hoodieKey, GenericRecord genericRecord, boolean z) throws IOException {
        if (z) {
            this.fileWriter.writeAvro(hoodieKey.getRecordKey(), rewriteRecordWithMetadata(genericRecord, this.newFilePath.getName()));
        } else {
            this.fileWriter.writeAvroWithMetadata(hoodieKey, rewriteRecord(genericRecord));
        }
    }

    protected void writeIncomingRecords() throws IOException {
        Iterator<HoodieRecord<T>> it = this.keyToNewRecords instanceof ExternalSpillableMap ? ((ExternalSpillableMap) this.keyToNewRecords).iterator() : this.keyToNewRecords.values().iterator();
        while (it.hasNext()) {
            HoodieRecord<T> next = it.next();
            if (!this.writtenRecordKeys.contains(next.getRecordKey())) {
                writeInsertRecord(next);
            }
        }
    }

    @Override // org.apache.hudi.io.HoodieWriteHandle
    public List<WriteStatus> close() {
        try {
            writeIncomingRecords();
            if (this.keyToNewRecords instanceof ExternalSpillableMap) {
                ((ExternalSpillableMap) this.keyToNewRecords).close();
            } else {
                this.keyToNewRecords.clear();
            }
            this.writtenRecordKeys.clear();
            if (this.fileWriter != null) {
                this.fileWriter.close();
                this.fileWriter = null;
            }
            long fileSize = FSUtils.getFileSize(this.fs, this.newFilePath);
            HoodieWriteStat stat = this.writeStatus.getStat();
            stat.setTotalWriteBytes(fileSize);
            stat.setFileSizeInBytes(fileSize);
            stat.setNumWrites(this.recordsWritten);
            stat.setNumDeletes(this.recordsDeleted);
            stat.setNumUpdateWrites(this.updatedRecordsWritten);
            stat.setNumInserts(this.insertRecordsWritten);
            stat.setTotalWriteErrors(this.writeStatus.getTotalErrorRecords());
            HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
            runtimeStats.setTotalUpsertTime(this.timer.endTimer());
            stat.setRuntimeStats(runtimeStats);
            performMergeDataValidationCheck(this.writeStatus);
            LOG.info(String.format("MergeHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), stat.getFileId(), Long.valueOf(runtimeStats.getTotalUpsertTime())));
            return Collections.singletonList(this.writeStatus);
        } catch (IOException e) {
            throw new HoodieUpsertException("Failed to close UpdateHandle", e);
        }
    }

    public void performMergeDataValidationCheck(WriteStatus writeStatus) {
        if (this.config.isMergeDataValidationCheckEnabled()) {
            try {
                long totalRecords = HoodieFileReaderFactory.getFileReader(this.hoodieTable.getHadoopConf(), this.oldFilePath).getTotalRecords();
                if (writeStatus.getStat().getNumWrites() + writeStatus.getStat().getNumDeletes() < totalRecords) {
                    throw new HoodieCorruptedDataException(String.format("Record write count decreased for file: %s, Partition Path: %s (%s:%d + %d < %s:%d)", writeStatus.getFileId(), writeStatus.getPartitionPath(), this.instantTime, Long.valueOf(writeStatus.getStat().getNumWrites()), Long.valueOf(writeStatus.getStat().getNumDeletes()), FSUtils.getCommitTime(this.oldFilePath.toString()), Long.valueOf(totalRecords)));
                }
            } catch (IOException e) {
                throw new HoodieUpsertException("Failed to check for merge data validation", e);
            }
        }
    }

    public Path getOldFilePath() {
        return this.oldFilePath;
    }

    @Override // org.apache.hudi.io.HoodieWriteHandle
    public IOType getIOType() {
        return IOType.MERGE;
    }

    public HoodieBaseFile baseFileForMerge() {
        return this.baseFileToMerge;
    }
}
