package org.apache.hudi.io;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Set;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:org/apache/hudi/io/HoodieMergeHandle.class */
public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O> {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieMergeHandle.class);
    protected Map<String, HoodieRecord<T>> keyToNewRecords;
    protected Set<String> writtenRecordKeys;
    protected HoodieFileWriter fileWriter;
    protected boolean preserveMetadata;
    protected StoragePath newFilePath;
    protected StoragePath oldFilePath;
    protected long recordsWritten;
    protected long recordsDeleted;
    protected long updatedRecordsWritten;
    protected long insertRecordsWritten;
    protected Option<BaseKeyGenerator> keyGeneratorOpt;
    private HoodieBaseFile baseFileToMerge;
    protected boolean useWriterSchemaForCompaction;
    protected final Set<HoodieRecordLocation> recordLocations;
    private TableFileSystemView fsViewSnapshot;
    protected Option<String[]> partitionFields;
    protected Object[] partitionValues;

    @FunctionalInterface
    /* loaded from: input_file:org/apache/hudi/io/HoodieMergeHandle$FileIdProvider.class */
    interface FileIdProvider {
        String get(String str);
    }

    public HoodieMergeHandle(HoodieWriteConfig hoodieWriteConfig, String str, HoodieTable<T, I, K, O> hoodieTable, Iterator<HoodieRecord<T>> it, String str2, String str3, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> option) {
        this(hoodieWriteConfig, str, hoodieTable, it, str2, str3, taskContextSupplier, getLatestBaseFile(hoodieTable, str2, str3), option);
    }

    public HoodieMergeHandle(HoodieWriteConfig hoodieWriteConfig, String str, HoodieTable<T, I, K, O> hoodieTable, Iterator<HoodieRecord<T>> it, String str2, String str3, TaskContextSupplier taskContextSupplier, HoodieBaseFile hoodieBaseFile, Option<BaseKeyGenerator> option) {
        super(hoodieWriteConfig, str, str2, str3, hoodieTable, taskContextSupplier);
        this.preserveMetadata = false;
        this.recordsWritten = 0L;
        this.recordsDeleted = 0L;
        this.updatedRecordsWritten = 0L;
        this.insertRecordsWritten = 0L;
        this.recordLocations = new HashSet();
        this.partitionFields = Option.empty();
        this.partitionValues = new Object[0];
        String str4 = getFileIdProvider().get(str3);
        init(str4, it);
        init(str4, str2, hoodieBaseFile);
        validateAndSetAndKeyGenProps(option, hoodieWriteConfig.populateMetaFields());
    }

    public HoodieMergeHandle(HoodieWriteConfig hoodieWriteConfig, String str, HoodieTable<T, I, K, O> hoodieTable, Map<String, HoodieRecord<T>> map, String str2, String str3, HoodieBaseFile hoodieBaseFile, TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> option) {
        super(hoodieWriteConfig, str, str2, str3, hoodieTable, taskContextSupplier);
        this.preserveMetadata = false;
        this.recordsWritten = 0L;
        this.recordsDeleted = 0L;
        this.updatedRecordsWritten = 0L;
        this.insertRecordsWritten = 0L;
        this.recordLocations = new HashSet();
        this.partitionFields = Option.empty();
        this.partitionValues = new Object[0];
        String str4 = getFileIdProvider().get(str3);
        this.keyToNewRecords = map;
        updateRecordLocations(str4, map);
        this.useWriterSchemaForCompaction = true;
        this.preserveMetadata = (HoodieTableMetadata.isMetadataTable(hoodieWriteConfig.getBasePath()) || HoodieTableType.MERGE_ON_WRITE.equals(hoodieWriteConfig.getTableType())) ? false : true;
        init(str4, this.partitionPath, hoodieBaseFile);
        validateAndSetAndKeyGenProps(option, hoodieWriteConfig.populateMetaFields());
    }

    private void updateRecordLocations(String str, Map<String, HoodieRecord<T>> map) {
        map.entrySet().stream().map((v0) -> {
            return v0.getValue();
        }).forEach(hoodieRecord -> {
            if (hoodieRecord.getCurrentLocation() != null) {
                this.recordLocations.add(hoodieRecord.getCurrentLocation());
            }
            updateNewRecordLocation(str, hoodieRecord);
        });
    }

    private void validateAndSetAndKeyGenProps(Option<BaseKeyGenerator> option, boolean z) {
        ValidationUtils.checkArgument(z == (!option.isPresent()));
        this.keyGeneratorOpt = option;
    }

    public static HoodieBaseFile getLatestBaseFile(HoodieTable<?, ?, ?, ?> hoodieTable, String str, String str2) {
        Option latestBaseFile = hoodieTable.getBaseFileOnlyView().getLatestBaseFile(str, str2);
        if (latestBaseFile.isPresent()) {
            return (HoodieBaseFile) latestBaseFile.get();
        }
        if (hoodieTable.getConfig().getTableType() == HoodieTableType.MERGE_ON_WRITE) {
            return (HoodieBaseFile) hoodieTable.getBaseFileOnlyView().getLatestBaseFiles(str).findFirst().get();
        }
        throw new NoSuchElementException(String.format("FileID %s of partition path %s does not exist.", str2, str));
    }

    protected void init(String str, String str2, HoodieBaseFile hoodieBaseFile) {
        LOG.info("partitionPath:" + str2 + ", fileId to be merged:" + str);
        this.baseFileToMerge = hoodieBaseFile;
        this.writtenRecordKeys = new HashSet();
        this.fsViewSnapshot = this.hoodieTable.getFileSystemView();
        this.writeStatus.setStat(new HoodieWriteStat());
        try {
            String fileName = hoodieBaseFile.getFileName();
            this.writeStatus.getStat().setPrevCommit(hoodieBaseFile.getCommitTime());
            new HoodiePartitionMetadata(this.storage, this.instantTime, new StoragePath(this.config.getBasePath()), FSUtils.constructAbsolutePath(this.config.getBasePath(), str2), this.hoodieTable.getPartitionMetafileFormat(), this.hoodieTable.isTtlEnabled()).trySave();
            String makeBaseFileName = FSUtils.makeBaseFileName(this.instantTime, this.writeToken, str, this.hoodieTable.getBaseFileExtension());
            makeOldAndNewFilePaths(str2, fileName, makeBaseFileName);
            LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", this.oldFilePath.toString(), this.newFilePath.toString()));
            this.writeStatus.setFileId(str);
            this.writeStatus.setPartitionPath(str2);
            this.writeStatus.getStat().setPartitionPath(str2);
            this.writeStatus.getStat().setFileId(str);
            setWriteStatusPath();
            if (!this.config.getUseHiveWriteStyle()) {
                createMarkerFile(str2, makeBaseFileName);
            }
            this.fileWriter = HoodieFileWriterFactory.getFileWriter(this.instantTime, this.newFilePath, this.hoodieTable.getStorage(), this.config, this.writeSchemaWithMetaFields, this.taskContextSupplier, this.recordMerger.getRecordType());
        } catch (IOException e) {
            LOG.error("Error in update task at commit " + this.instantTime, e);
            this.writeStatus.setGlobalError(e);
            throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle for FileId: " + str + " on commit " + this.instantTime + " on path " + this.hoodieTable.getMetaClient().getBasePath(), e);
        }
    }

    protected void setWriteStatusPath() {
        this.writeStatus.getStat().setPath(new StoragePath(this.config.getBasePath()), this.newFilePath);
    }

    protected void makeOldAndNewFilePaths(String str, String str2, String str3) {
        this.oldFilePath = makeNewFilePath(str, str2);
        this.newFilePath = makeNewFilePath(str, str3);
        if (this.config.getUseHiveWriteStyle()) {
            this.newFilePath = new StoragePath(this.newFilePath.getParent().toUri() + "/.tmp", this.newFilePath.getName());
        }
    }

    protected void initializeIncomingRecordsMap() {
        try {
            long maxMemoryPerPartitionMerge = IOUtils.getMaxMemoryPerPartitionMerge(this.taskContextSupplier, this.config);
            LOG.info("MaxMemoryPerPartitionMerge => " + maxMemoryPerPartitionMerge);
            this.keyToNewRecords = new ExternalSpillableMap(Long.valueOf(maxMemoryPerPartitionMerge), this.config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(this.writeSchema), this.config.getCommonConfig().getSpillableDiskMapType(), this.config.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
        } catch (IOException e) {
            throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean needsUpdateLocation() {
        return true;
    }

    protected void init(String str, Iterator<HoodieRecord<T>> it) {
        initializeIncomingRecordsMap();
        while (it.hasNext()) {
            HoodieRecord<T> next = it.next();
            updateNewRecordLocation(str, next);
            this.keyToNewRecords.put(next.getRecordKey(), next);
            if (next.getCurrentLocation() != null) {
                this.recordLocations.add(next.getCurrentLocation());
            }
        }
        LOG.info("Number of entries in MemoryBasedMap => " + this.keyToNewRecords.getInMemoryMapNumEntries() + ", Total size in bytes of MemoryBasedMap => " + this.keyToNewRecords.getCurrentInMemoryMapSize() + ", Number of entries in BitCaskDiskMap => " + this.keyToNewRecords.getDiskBasedMapNumEntries() + ", Size of file spilled to disk => " + this.keyToNewRecords.getSizeOfFileOnDiskInBytes());
    }

    private void updateNewRecordLocation(String str, HoodieRecord<T> hoodieRecord) {
        if (needsUpdateLocation()) {
            hoodieRecord.unseal();
            hoodieRecord.setNewLocation(new HoodieRecordLocation(this.instantTime, str));
            hoodieRecord.seal();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, HoodieRecord<T> hoodieRecord2, Option<HoodieRecord> option, Schema schema) throws IOException {
        boolean z = false;
        if (option.isPresent()) {
            if (hoodieRecord2.getData() != ((HoodieRecord) option.get()).getData()) {
                z = HoodieOperation.isDelete(hoodieRecord.getOperation());
            }
            this.updatedRecordsWritten++;
        }
        return writeRecord(hoodieRecord, option, schema, this.config.getPayloadConfig().getProps(), z);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
        Schema schema = this.preserveMetadata ? this.writeSchemaWithMetaFields : this.writeSchema;
        if (hoodieRecord.shouldIgnore(schema, this.config.getProps())) {
            return;
        }
        writeInsertRecord(hoodieRecord, schema, this.config.getProps());
    }

    protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Schema schema, Properties properties) throws IOException {
        if (writeRecord(hoodieRecord, Option.of(hoodieRecord), schema, properties, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
            this.insertRecordsWritten++;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<HoodieRecord> option, Schema schema, Properties properties) throws IOException {
        return writeRecord(hoodieRecord, option, schema, properties, false);
    }

    private boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<HoodieRecord> option, Schema schema, Properties properties, boolean z) throws IOException {
        Option<Map<String, String>> metadata = hoodieRecord.getMetadata();
        if (!this.partitionPath.equals(hoodieRecord.getPartitionPath())) {
            this.writeStatus.markFailure((HoodieRecord) hoodieRecord, (Throwable) new HoodieUpsertException("mismatched partition path, record partition: " + hoodieRecord.getPartitionPath() + " but trying to insert into partition: " + this.partitionPath), metadata);
            return false;
        }
        try {
            if (!option.isPresent() || ((HoodieRecord) option.get()).isDelete(schema, this.config.getProps()) || z) {
                this.recordsDeleted++;
                hoodieRecord.unseal();
                hoodieRecord.clearNewLocation();
                hoodieRecord.seal();
            } else {
                writeToFile(hoodieRecord.getKey(), (HoodieRecord) option.get(), schema, properties, this.preserveMetadata);
                this.recordsWritten++;
            }
            this.writeStatus.markSuccess(hoodieRecord, metadata);
            hoodieRecord.deflate();
            return true;
        } catch (Exception e) {
            LOG.error("Error writing record  " + hoodieRecord, e);
            this.writeStatus.markFailure(hoodieRecord, e, metadata);
            return false;
        }
    }

    public void write(HoodieRecord<T> hoodieRecord) {
        Schema schema = this.writeSchemaWithMetaFields;
        Schema schema2 = this.preserveMetadata ? this.writeSchemaWithMetaFields : this.writeSchema;
        boolean z = true;
        String recordKey = hoodieRecord.getRecordKey(schema, this.keyGeneratorOpt);
        TypedProperties props = this.config.getPayloadConfig().getProps();
        if (this.keyToNewRecords.containsKey(recordKey)) {
            HoodieRecord<T> newInstance = this.keyToNewRecords.get(recordKey).newInstance();
            try {
                Option merge = this.recordMerger.merge(hoodieRecord, schema, newInstance, schema2, props);
                Schema schema3 = (Schema) merge.map((v0) -> {
                    return v0.getRight();
                }).orElse((Object) null);
                Option<HoodieRecord> map = merge.map((v0) -> {
                    return v0.getLeft();
                });
                if (map.isPresent() && ((HoodieRecord) map.get()).shouldIgnore(schema3, props)) {
                    z = true;
                } else if (writeUpdateRecord(newInstance, hoodieRecord, map, schema3)) {
                    z = false;
                }
                this.writtenRecordKeys.add(recordKey);
            } catch (Exception e) {
                throw new HoodieUpsertException("Failed to combine/merge new record with old value in storage, for new record {" + this.keyToNewRecords.get(recordKey) + "}, old value {" + hoodieRecord + "}", e);
            }
        }
        if (z) {
            try {
                writeToFile(new HoodieKey(recordKey, this.partitionPath), hoodieRecord, schema, props, true);
                this.recordsWritten++;
            } catch (IOException | RuntimeException e2) {
                String format = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s", recordKey, getOldFilePath(), this.newFilePath, this.writeSchemaWithMetaFields.toString(true));
                LOG.debug("Old record is " + hoodieRecord);
                throw new HoodieUpsertException(format, e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeToFile(HoodieKey hoodieKey, HoodieRecord<T> hoodieRecord, Schema schema, Properties properties, boolean z) throws IOException {
        HoodieRecord prependMetaFields = hoodieRecord.prependMetaFields(schema, this.writeSchemaWithMetaFields, new MetadataValues().setFileName(this.newFilePath.getName()), properties);
        if (z) {
            this.fileWriter.write(hoodieKey.getRecordKey(), prependMetaFields, this.writeSchemaWithMetaFields);
        } else {
            this.fileWriter.writeWithMetadata(hoodieKey, prependMetaFields, this.writeSchemaWithMetaFields);
        }
    }

    protected void writeIncomingRecords() throws IOException {
        Iterator it = this.keyToNewRecords instanceof ExternalSpillableMap ? this.keyToNewRecords.iterator(obj -> {
            return !this.writtenRecordKeys.contains(obj);
        }) : this.keyToNewRecords.entrySet().stream().filter(entry -> {
            return !this.writtenRecordKeys.contains(entry.getKey());
        }).map((v0) -> {
            return v0.getValue();
        }).iterator();
        while (it.hasNext()) {
            writeInsertRecord((HoodieRecord) it.next());
        }
    }

    @Override // org.apache.hudi.io.HoodieWriteHandle
    public List<WriteStatus> close() {
        try {
            if (isClosed()) {
                return Collections.emptyList();
            }
            markClosed();
            writeIncomingRecords();
            if (this.keyToNewRecords instanceof ExternalSpillableMap) {
                this.keyToNewRecords.close();
            }
            this.keyToNewRecords = null;
            this.writtenRecordKeys = null;
            this.fileWriter.close();
            this.fileWriter = null;
            long fileSize = HadoopFSUtils.getFileSize(this.fs, new Path(this.newFilePath.toUri()));
            HoodieWriteStat stat = this.writeStatus.getStat();
            stat.setTotalWriteBytes(fileSize);
            stat.setFileSizeInBytes(fileSize);
            stat.setNumWrites(this.recordsWritten);
            stat.setNumDeletes(this.recordsDeleted);
            stat.setNumUpdateWrites(this.updatedRecordsWritten);
            stat.setNumInserts(this.insertRecordsWritten);
            stat.setTotalWriteErrors(this.writeStatus.getTotalErrorRecords());
            HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
            runtimeStats.setTotalUpsertTime(this.timer.endTimer());
            stat.setRuntimeStats(runtimeStats);
            performMergeDataValidationCheck(this.writeStatus);
            LOG.info(String.format("MergeHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), stat.getFileId(), Long.valueOf(runtimeStats.getTotalUpsertTime())));
            return Collections.singletonList(this.writeStatus);
        } catch (IOException e) {
            throw new HoodieUpsertException("Failed to close UpdateHandle", e);
        }
    }

    public void performMergeDataValidationCheck(WriteStatus writeStatus) {
        if (this.config.isMergeDataValidationCheckEnabled()) {
            try {
                HoodieFileReader fileReader = HoodieIOFactory.getIOFactory(this.hoodieTable.getStorage()).getReaderFactory(this.recordMerger.getRecordType()).getFileReader(this.config, this.oldFilePath);
                try {
                    long totalRecords = fileReader.getTotalRecords();
                    if (fileReader != null) {
                        fileReader.close();
                    }
                    if (writeStatus.getStat().getNumWrites() + writeStatus.getStat().getNumDeletes() < totalRecords) {
                        throw new HoodieCorruptedDataException(String.format("Record write count decreased for file: %s, Partition Path: %s (%s:%d + %d < %s:%d)", writeStatus.getFileId(), writeStatus.getPartitionPath(), this.instantTime, Long.valueOf(writeStatus.getStat().getNumWrites()), Long.valueOf(writeStatus.getStat().getNumDeletes()), this.baseFileToMerge.getCommitTime(), Long.valueOf(totalRecords)));
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new HoodieUpsertException("Failed to check for merge data validation", e);
            }
        }
    }

    public Iterator<List<WriteStatus>> getWriteStatusesAsIterator() {
        List<WriteStatus> writeStatuses = getWriteStatuses();
        if (getPartitionPath() == null) {
            LOG.info("Upsert Handle has partition path as null {}, {}", getOldFilePath(), writeStatuses);
        }
        return Collections.singletonList(writeStatuses).iterator();
    }

    public StoragePath getOldFilePath() {
        return this.oldFilePath;
    }

    @Override // org.apache.hudi.io.HoodieWriteHandle
    public IOType getIOType() {
        return IOType.MERGE;
    }

    public HoodieBaseFile baseFileForMerge() {
        return this.baseFileToMerge;
    }

    public void setPartitionFields(Option<String[]> option) {
        this.partitionFields = option;
    }

    public Option<String[]> getPartitionFields() {
        return this.partitionFields;
    }

    public void setPartitionValues(Object[] objArr) {
        this.partitionValues = objArr;
    }

    public Object[] getPartitionValues() {
        return this.partitionValues;
    }

    public ClosableIterator<HoodieRecord> getRecordIterator(HoodieFileReader hoodieFileReader, Schema schema) throws IOException {
        return hoodieFileReader.getRecordIterator(schema);
    }

    public Map<String, HoodieRecord<T>> getKeyToNewRecords() {
        return this.keyToNewRecords;
    }

    public Set<HoodieRecordLocation> getRecordLocations() {
        return this.recordLocations;
    }

    protected FileIdProvider getFileIdProvider() {
        return str -> {
            return str;
        };
    }

    public boolean filterFutureInstants(String str) {
        return this.instantTime.compareTo(str) > 0;
    }

    public TableFileSystemView getFsViewSnapshot() {
        return this.fsViewSnapshot;
    }
}
