package org.apache.hudi.table.format;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.common.util.queue.HoodieProducer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/table/format/FormatUtils.class */
public class FormatUtils {
    private static final Logger LOG;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/hudi/table/format/FormatUtils$BoundedMemoryRecords.class */
    public static class BoundedMemoryRecords {
        private final BoundedInMemoryExecutor<HoodieRecord<?>, HoodieRecord<?>, ?> executor;
        private final Iterator<HoodieRecord<?>> iterator;

        public BoundedMemoryRecords(MergeOnReadInputSplit mergeOnReadInputSplit, Schema schema, InternalSchema internalSchema, Configuration configuration, org.apache.flink.configuration.Configuration configuration2) {
            this.executor = new BoundedInMemoryExecutor<>(StreamerUtil.getMaxCompactionMemoryInBytes(configuration2), getParallelProducers(HoodieUnMergedLogRecordScanner.newBuilder().withStorage(HoodieStorageUtils.getStorage(mergeOnReadInputSplit.getTablePath(), HadoopFSUtils.getStorageConf(configuration))).withBasePath(mergeOnReadInputSplit.getTablePath()).withLogFilePaths((List) mergeOnReadInputSplit.getLogPaths().get()).withReaderSchema(schema).withInternalSchema(internalSchema).withLatestInstantTime(mergeOnReadInputSplit.getLatestCommit()).withReverseReader(false).withBufferSize(configuration2.getInteger("hoodie.memory.dfs.buffer.max.size", 1048576)).withInstantRange(mergeOnReadInputSplit.getInstantRange()).withRecordMerger(HoodieRecordUtils.createRecordMerger(mergeOnReadInputSplit.getTablePath(), EngineType.FLINK, (List) Arrays.stream(configuration2.getString(FlinkOptions.RECORD_MERGER_IMPLS).split(",")).map((v0) -> {
                return v0.trim();
            }).distinct().collect(Collectors.toList()), configuration2.getString(FlinkOptions.RECORD_MERGER_STRATEGY))).withLogIndex(mergeOnReadInputSplit.getLogIndex())), Option.empty(), Function.identity(), new DefaultSizeEstimator(), Functions.noop());
            this.iterator = this.executor.getRecordIterator();
            this.executor.startProducingAsync();
        }

        public Iterator<HoodieRecord<?>> getRecordsIterator() {
            return this.iterator;
        }

        private List<HoodieProducer<HoodieRecord<?>>> getParallelProducers(HoodieUnMergedLogRecordScanner.Builder builder) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(new FunctionBasedQueueProducer(hoodieMessageQueue -> {
                Objects.requireNonNull(hoodieMessageQueue);
                builder.withLogRecordScannerCallback((v1) -> {
                    r1.insertRecord(v1);
                }).build().scan();
                return null;
            }));
            return arrayList;
        }

        public void close() {
            this.executor.shutdownNow();
        }
    }

    private FormatUtils() {
    }

    public static void setRowKind(RowData rowData, IndexedRecord indexedRecord, int i) {
        if (i == -1) {
            return;
        }
        rowData.setRowKind(getRowKind(indexedRecord, i));
    }

    private static RowKind getRowKind(IndexedRecord indexedRecord, int i) {
        Object obj = indexedRecord.get(i);
        if (obj == null) {
            return RowKind.INSERT;
        }
        HoodieOperation fromName = HoodieOperation.fromName(obj.toString());
        if (HoodieOperation.isInsert(fromName)) {
            return RowKind.INSERT;
        }
        if (HoodieOperation.isUpdateBefore(fromName)) {
            return RowKind.UPDATE_BEFORE;
        }
        if (HoodieOperation.isUpdateAfter(fromName)) {
            return RowKind.UPDATE_AFTER;
        }
        if (HoodieOperation.isDelete(fromName)) {
            return RowKind.DELETE;
        }
        throw new AssertionError();
    }

    public static RowKind getRowKindSafely(IndexedRecord indexedRecord, int i) {
        return i == -1 ? RowKind.INSERT : getRowKind(indexedRecord, i);
    }

    public static GenericRecord buildAvroRecordBySchema(IndexedRecord indexedRecord, Schema schema, int[] iArr, GenericRecordBuilder genericRecordBuilder) {
        List fields = schema.getFields();
        if (!$assertionsDisabled && fields.size() != iArr.length) {
            throw new AssertionError();
        }
        Iterator<Integer> it = Arrays.stream(iArr).iterator();
        fields.forEach(field -> {
            genericRecordBuilder.set(field, getVal(indexedRecord, ((Integer) it.next()).intValue()));
        });
        return genericRecordBuilder.build();
    }

    private static Object getVal(IndexedRecord indexedRecord, int i) {
        if (i == -1) {
            return null;
        }
        return indexedRecord.get(i);
    }

    public static ExternalSpillableMap<String, byte[]> spillableMap(HoodieWriteConfig hoodieWriteConfig, long j) {
        try {
            return new ExternalSpillableMap<>(Long.valueOf(j), hoodieWriteConfig.getSpillableMapBasePath(), new DefaultSizeEstimator(), new DefaultSizeEstimator(), hoodieWriteConfig.getCommonConfig().getSpillableDiskMapType(), hoodieWriteConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
        } catch (IOException e) {
            throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + hoodieWriteConfig.getSpillableMapBasePath(), e);
        }
    }

    public static HoodieMergedLogRecordScanner logScanner(MergeOnReadInputSplit mergeOnReadInputSplit, Schema schema, InternalSchema internalSchema, org.apache.flink.configuration.Configuration configuration, Configuration configuration2) {
        HoodieWriteConfig hoodieClientConfig = FlinkWriteClients.getHoodieClientConfig(configuration);
        return HoodieMergedLogRecordScanner.newBuilder().withStorage(HoodieStorageUtils.getStorage(mergeOnReadInputSplit.getTablePath(), HadoopFSUtils.getStorageConf(configuration2))).withBasePath(mergeOnReadInputSplit.getTablePath()).withLogFilePaths((List) mergeOnReadInputSplit.getLogPaths().get()).withReaderSchema(schema).withInternalSchema(internalSchema).withLatestInstantTime(mergeOnReadInputSplit.getLatestCommit()).withReverseReader(false).withBufferSize(hoodieClientConfig.getMaxDFSStreamBufferSize()).withMaxMemorySizeInBytes(Long.valueOf(mergeOnReadInputSplit.getMaxCompactionMemoryInBytes())).withDiskMapType(hoodieClientConfig.getCommonConfig().getSpillableDiskMapType()).withBitCaskDiskMapCompressionEnabled(hoodieClientConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()).withSpillableMapBasePath(hoodieClientConfig.getSpillableMapBasePath()).withInstantRange(mergeOnReadInputSplit.getInstantRange()).withOperationField(configuration.getBoolean(FlinkOptions.CHANGELOG_ENABLED)).withRecordMerger(hoodieClientConfig.getRecordMerger()).withLogIndex(mergeOnReadInputSplit.getLogIndex()).build();
    }

    public static HoodieMergedLogRecordScanner logScanner(List<String> list, Schema schema, InternalSchema internalSchema, String str, HoodieWriteConfig hoodieWriteConfig, Configuration configuration) {
        String basePath = hoodieWriteConfig.getBasePath();
        return HoodieMergedLogRecordScanner.newBuilder().withStorage(HoodieStorageUtils.getStorage(basePath, HadoopFSUtils.getStorageConf(configuration))).withBasePath(basePath).withLogFilePaths(list).withReaderSchema(schema).withLatestInstantTime(str).withReverseReader(false).withBufferSize(hoodieWriteConfig.getMaxDFSStreamBufferSize()).withMaxMemorySizeInBytes(hoodieWriteConfig.getMaxMemoryPerPartitionMerge()).withSpillableMapBasePath(hoodieWriteConfig.getSpillableMapBasePath()).withDiskMapType(hoodieWriteConfig.getCommonConfig().getSpillableDiskMapType()).withBitCaskDiskMapCompressionEnabled(hoodieWriteConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()).withRecordMerger(hoodieWriteConfig.getRecordMerger()).withInternalSchema(internalSchema).build();
    }

    private static Boolean string2Boolean(String str) {
        return Boolean.valueOf("true".equals(str.toLowerCase(Locale.ROOT)));
    }

    public static List<HoodieCommitMetadata> getAllCommitsMetadata(HoodieTableMetaClient hoodieTableMetaClient, org.apache.flink.configuration.Configuration configuration, Path path) {
        hoodieTableMetaClient.reloadActiveTimeline();
        HoodieTimeline commitsAndCompactionTimeline = hoodieTableMetaClient.getCommitsAndCompactionTimeline();
        if (commitsAndCompactionTimeline.empty()) {
            LOG.warn("No splits found for the table under path " + path);
            return Collections.emptyList();
        }
        List instants = commitsAndCompactionTimeline.filterCompletedAndCompactionInstants().filterCompletedInstants().getInstants();
        if (instants.isEmpty()) {
            LOG.warn("No instants found for the table under path " + path);
            return Collections.emptyList();
        }
        String string = configuration.getString(FlinkOptions.TABLE_NAME);
        return (List) instants.stream().map(hoodieInstant -> {
            return WriteProfiles.getCommitMetadata(string, path, hoodieInstant, commitsAndCompactionTimeline);
        }).collect(Collectors.toList());
    }

    static {
        $assertionsDisabled = !FormatUtils.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(FormatUtils.class);
    }
}
