package org.apache.hudi.sink.bootstrap.async;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.bootstrap.IndexRecord;
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.util.StreamerUtil;

/* loaded from: input_file:org/apache/hudi/sink/bootstrap/async/AsyncParquetAvroReader.class */
public class AsyncParquetAvroReader implements Callable<Boolean> {
    private final FileSlice fileSlice;
    private final Configuration hadoopConf;
    private final InternalSchema internalSchema;
    private final String latestCommitTime;
    private final String partitionPath;
    private final HoodieWriteConfig writeConfig;
    private final Schema schema;
    private final ConcurrentLinkedQueue<StreamRecord> queue;
    private final BaseFileUtils fileUtils;
    private final org.apache.flink.configuration.Configuration flinkConf;
    private final String basePath;
    private final Configuration parquetConf;

    public AsyncParquetAvroReader(FileSlice fileSlice, Configuration configuration, InternalSchema internalSchema, String str, String str2, HoodieWriteConfig hoodieWriteConfig, Schema schema, ConcurrentLinkedQueue<StreamRecord> concurrentLinkedQueue, HoodieFileFormat hoodieFileFormat, org.apache.flink.configuration.Configuration configuration2, String str3, Configuration configuration3) {
        this.fileSlice = fileSlice;
        this.hadoopConf = configuration;
        this.internalSchema = internalSchema;
        this.latestCommitTime = str;
        this.partitionPath = str2;
        this.writeConfig = hoodieWriteConfig;
        this.schema = schema;
        this.queue = concurrentLinkedQueue;
        this.fileUtils = BaseFileUtils.getInstance(hoodieFileFormat);
        this.flinkConf = configuration2;
        this.basePath = str3;
        this.parquetConf = configuration3;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Boolean call() throws Exception {
        this.fileSlice.getBaseFile().ifPresent(hoodieBaseFile -> {
            if (StreamerUtil.isValidFile(hoodieBaseFile.getFileStatus())) {
                ClosableIterator<HoodieKey> closableIterator = null;
                try {
                    closableIterator = this.parquetConf != null ? ((ParquetUtils) this.fileUtils).getHoodieKeyIterator(this.parquetConf, hoodieBaseFile.getFileStatus()) : this.fileUtils.getHoodieKeyIterator(this.hadoopConf, new Path(hoodieBaseFile.getPath()));
                    closableIterator.forEachRemaining(hoodieKey -> {
                        this.queue.add(new StreamRecord(new IndexRecord(BootstrapOperator.generateHoodieRecord(hoodieKey, this.fileSlice))));
                    });
                    if (closableIterator != null) {
                        closableIterator.close();
                    }
                } catch (Throwable th) {
                    if (closableIterator != null) {
                        closableIterator.close();
                    }
                    throw th;
                }
            }
        });
        List<String> list = (List) this.fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).filter(hoodieLogFile -> {
            return StreamerUtil.isValidFile(hoodieLogFile.getFileStatus());
        }).map(hoodieLogFile2 -> {
            return hoodieLogFile2.getPath().toString();
        }).collect(Collectors.toList());
        String string = this.flinkConf.getString(FlinkOptions.INDEX_BOOTSTRAP_LOAD_STRATEGY);
        if (string.equals(FlinkOptions.INDEX_BOOTSTRAP_STRATEGY_MERGED)) {
            HoodieMergedLogRecordScanner logScanner = FormatUtils.logScanner(list, this.schema, this.internalSchema, this.latestCommitTime, this.writeConfig, this.hadoopConf);
            try {
                try {
                    Iterator<String> it = logScanner.getRecords().keySet().iterator();
                    while (it.hasNext()) {
                        this.queue.add(new StreamRecord(new IndexRecord(BootstrapOperator.generateHoodieRecord(new HoodieKey(it.next(), this.partitionPath), this.fileSlice))));
                    }
                } catch (Exception e) {
                    throw new HoodieException(String.format("Error when loading record keys from files: %s", list), e);
                }
            } finally {
                logScanner.close();
            }
        } else {
            if (!string.equals(FlinkOptions.INDEX_BOOTSTRAP_STRATEGY_UNMERGED)) {
                throw new IllegalStateException(String.format("Unknown '%s' bootstrap strategy. Available = [%s, %s]", string, FlinkOptions.INDEX_BOOTSTRAP_STRATEGY_MERGED, FlinkOptions.INDEX_BOOTSTRAP_STRATEGY_UNMERGED));
            }
            HoodieUnMergedLogRecordScanner.newBuilder().withFileSystem(FSUtils.getFs(this.basePath, this.hadoopConf)).withBasePath(this.basePath).withLogFilePaths(list).withReaderSchema(this.schema).withLatestInstantTime(this.latestCommitTime).withReadBlocksLazily(this.writeConfig.getCompactionLazyBlockReadEnabled().booleanValue()).withReverseReader(this.writeConfig.getCompactionReverseLogReadEnabled().booleanValue()).withBufferSize(this.writeConfig.getMaxDFSStreamBufferSize()).withLogRecordScannerCallback(hoodieRecord -> {
                this.queue.add(new StreamRecord(new IndexRecord(BootstrapOperator.generateHoodieRecord(hoodieRecord.getKey(), this.fileSlice))));
            }).build().scan();
        }
        return true;
    }
}
