package org.apache.hudi.sink.bootstrap;

import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
import org.apache.hudi.sink.bootstrap.aggregate.IndexAlignment;
import org.apache.hudi.sink.bootstrap.async.AsyncParquetAvroReader;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.util.FlinkTables;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/bootstrap/BootstrapOperator.class */
public class BootstrapOperator<I, O extends HoodieRecord<?>> extends AbstractStreamOperator<O> implements OneInputStreamOperator<I, O> {
    private static final Logger LOG = LoggerFactory.getLogger(BootstrapOperator.class);
    protected HoodieTable<?, ?, ?, ?> hoodieTable;
    private CkpMetadata ckpMetadata;
    protected final Configuration conf;
    protected transient org.apache.hadoop.conf.Configuration hadoopConf;
    protected transient HoodieWriteConfig writeConfig;
    private transient GlobalAggregateManager aggregateManager;
    private transient ListState<String> instantState;
    private final Pattern pattern;
    private String lastInstantTime;
    private transient boolean alignmentIndex;
    private transient AtomicLong loadIndexCount;
    private transient boolean bootstrapReady = false;

    public BootstrapOperator(Configuration configuration) {
        this.conf = configuration;
        this.pattern = Pattern.compile(configuration.getString(FlinkOptions.INDEX_PARTITION_REGEX));
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        this.lastInstantTime = this.ckpMetadata.lastPendingInstant();
        this.instantState.update(Collections.singletonList(this.lastInstantTime));
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        this.instantState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("instantStateDescriptor", Types.STRING));
        if (stateInitializationContext.isRestored()) {
            Iterator it = ((Iterable) this.instantState.get()).iterator();
            if (it.hasNext()) {
                this.lastInstantTime = (String) it.next();
            }
            this.conf.setBoolean(FlinkOptions.INDEX_ALIGNMENT_ENABLE, false);
        }
        this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
        this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
        this.hoodieTable = FlinkTables.createTable(this.writeConfig, this.hadoopConf, getRuntimeContext());
        this.ckpMetadata = CkpMetadata.getInstance(this.hoodieTable.getMetaClient().getFs(), this.writeConfig.getBasePath());
        this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
        this.alignmentIndex = this.conf.getBoolean(FlinkOptions.INDEX_ALIGNMENT_ENABLE);
        this.loadIndexCount = new AtomicLong();
        preLoadIndexRecords();
    }

    protected void preLoadIndexRecords() throws Exception {
        String basePath = this.hoodieTable.getMetaClient().getBasePath();
        LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
        for (String str : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, this.hadoopConf), basePath)) {
            if (this.pattern.matcher(str).matches()) {
                if (this.conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ASYNC)) {
                    int integer = this.conf.getInteger(FlinkOptions.INDEX_BOOTSTRAP_PARALLEL_THREADS);
                    if (integer < 2) {
                        throw new IllegalStateException(FlinkOptions.INDEX_BOOTSTRAP_PARALLEL_THREADS.key() + " must be greater than 1");
                    }
                    loadRecordsAsync(str, integer);
                } else {
                    loadRecords(str);
                }
            }
        }
        LOG.info("Finish sending index records, taskId = {}.", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
        waitForBootstrapReady(getRuntimeContext().getIndexOfThisSubtask());
    }

    private void waitForBootstrapReady(int i) {
        int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
        if (this.alignmentIndex) {
            Tuple4 tuple4 = new Tuple4(getRuntimeContext().getTaskName(), Integer.valueOf(numberOfParallelSubtasks), Integer.valueOf(i), Long.valueOf(this.loadIndexCount.get()));
            Boolean bool = false;
            while (!bool.booleanValue()) {
                try {
                    bool = (Boolean) this.aggregateManager.updateGlobalAggregate(IndexAlignment.NAME, tuple4, new IndexAlignment());
                    LOG.info("Reported index volume, taskId = {}, load index = {}.", Integer.valueOf(i), Long.valueOf(this.loadIndexCount.get()));
                    TimeUnit.SECONDS.sleep(5L);
                } catch (Exception e) {
                    LOG.warn("Update index volume error", e);
                }
            }
        } else {
            int i2 = 1;
            while (numberOfParallelSubtasks != i2) {
                try {
                    i2 = ((Integer) this.aggregateManager.updateGlobalAggregate(BootstrapAggFunction.NAME, Integer.valueOf(i), new BootstrapAggFunction())).intValue();
                    LOG.info("Waiting for other bootstrap tasks to complete, taskId = {}.", Integer.valueOf(i));
                    TimeUnit.SECONDS.sleep(5L);
                } catch (Exception e2) {
                    LOG.warn("Update global task bootstrap summary error", e2);
                }
            }
        }
        this.bootstrapReady = true;
    }

    public void processElement(StreamRecord<I> streamRecord) throws Exception {
        if (!this.bootstrapReady) {
            throw new HoodieException("The index is being loaded and cannot process " + streamRecord);
        }
        this.output.collect(streamRecord);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void loadRecords(String str) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
        int maxNumberOfParallelSubtasks = getRuntimeContext().getMaxNumberOfParallelSubtasks();
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        HoodieTimeline commitsTimeline = this.hoodieTable.getMetaClient().getCommitsTimeline();
        if (!StringUtils.isNullOrEmpty(this.lastInstantTime)) {
            commitsTimeline = commitsTimeline.findInstantsAfter(this.lastInstantTime);
        }
        Option<HoodieInstant> lastInstant = commitsTimeline.filterCompletedInstants().lastInstant();
        if (lastInstant.isPresent()) {
            BaseFileUtils baseFileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
            TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(this.hoodieTable.getMetaClient());
            Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema();
            List<FileSlice> list = (List) this.hoodieTable.getSliceView().getLatestMergedFileSlicesBeforeOrOn(str, lastInstant.get().getTimestamp()).collect(Collectors.toList());
            org.apache.hadoop.conf.Configuration avroSchema = baseFileUtils instanceof ParquetUtils ? ((ParquetUtils) baseFileUtils).setAvroSchema(this.hadoopConf) : null;
            String string = this.conf.getString(FlinkOptions.INDEX_BOOTSTRAP_LOAD_STRATEGY);
            if (this.conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED) && string.equals(FlinkOptions.INDEX_BOOTSTRAP_STRATEGY_UNMERGED)) {
                throw new IllegalStateException("unmerged strategy doesn't supported with global index.");
            }
            for (FileSlice fileSlice : list) {
                AtomicLong atomicLong = new AtomicLong();
                if (shouldLoadFile(fileSlice.getFileId(), maxNumberOfParallelSubtasks, numberOfParallelSubtasks, indexOfThisSubtask)) {
                    LOG.info("Load records from {}.", fileSlice);
                    fileSlice.getBaseFile().ifPresent(hoodieBaseFile -> {
                        if (StreamerUtil.isValidFile(hoodieBaseFile.getFileStatus())) {
                            ClosableIterator<HoodieKey> closableIterator = null;
                            try {
                                closableIterator = avroSchema != null ? ((ParquetUtils) baseFileUtils).getHoodieKeyIterator(avroSchema, hoodieBaseFile.getFileStatus()) : baseFileUtils.getHoodieKeyIterator(this.hadoopConf, new Path(hoodieBaseFile.getPath()));
                                closableIterator.forEachRemaining(hoodieKey -> {
                                    this.loadIndexCount.incrementAndGet();
                                    atomicLong.incrementAndGet();
                                    this.output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))));
                                });
                                if (closableIterator != null) {
                                    closableIterator.close();
                                }
                            } catch (Throwable th) {
                                if (closableIterator != null) {
                                    closableIterator.close();
                                }
                                throw th;
                            }
                        }
                    });
                    List<String> list2 = (List) fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).filter(hoodieLogFile -> {
                        return StreamerUtil.isValidFile(hoodieLogFile.getFileStatus());
                    }).map(hoodieLogFile2 -> {
                        return hoodieLogFile2.getPath().toString();
                    }).collect(Collectors.toList());
                    InternalSchema orElse = tableSchemaResolver.getTableInternalSchemaFromCommitMetadata().orElse(InternalSchema.getEmptyInternalSchema());
                    if (string.equals(FlinkOptions.INDEX_BOOTSTRAP_STRATEGY_MERGED)) {
                        HoodieMergedLogRecordScanner logScanner = FormatUtils.logScanner(list2, tableAvroSchema, orElse, lastInstant.get().getTimestamp(), this.writeConfig, this.hadoopConf);
                        try {
                            try {
                                for (String str2 : logScanner.getRecords().keySet()) {
                                    this.loadIndexCount.incrementAndGet();
                                    atomicLong.incrementAndGet();
                                    this.output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(new HoodieKey(str2, str), fileSlice))));
                                }
                            } catch (Exception e) {
                                throw new HoodieException(String.format("Error when loading record keys from files: %s", list2), e);
                            }
                        } finally {
                            logScanner.close();
                        }
                    } else {
                        if (!string.equals(FlinkOptions.INDEX_BOOTSTRAP_STRATEGY_UNMERGED)) {
                            throw new IllegalStateException(String.format("Unknown '%s' bootstrap strategy. Available = [%s, %s]", string, FlinkOptions.INDEX_BOOTSTRAP_STRATEGY_MERGED, FlinkOptions.INDEX_BOOTSTRAP_STRATEGY_UNMERGED));
                        }
                        unMergedLogScanner(list2, tableAvroSchema, lastInstant.get().getTimestamp(), fileSlice, atomicLong).scan();
                    }
                    LOG.info("Load {} records from {}.", Long.valueOf(atomicLong.get()), fileSlice);
                }
            }
        }
        LOG.info("Task [{}}:{}}] finish loading the index under partition {} and sending them to downstream, time cost: {} milliseconds.", new Object[]{getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask), str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
    }

    private HoodieUnMergedLogRecordScanner unMergedLogScanner(List<String> list, Schema schema, String str, FileSlice fileSlice, AtomicLong atomicLong) {
        return HoodieUnMergedLogRecordScanner.newBuilder().withFileSystem(FSUtils.getFs(this.hoodieTable.getConfig().getBasePath(), this.hadoopConf)).withBasePath(this.hoodieTable.getConfig().getBasePath()).withLogFilePaths(list).withReaderSchema(schema).withLatestInstantTime(str).withReadBlocksLazily(this.writeConfig.getCompactionLazyBlockReadEnabled().booleanValue()).withReverseReader(this.writeConfig.getCompactionReverseLogReadEnabled().booleanValue()).withBufferSize(this.writeConfig.getMaxDFSStreamBufferSize()).withLogRecordScannerCallback(hoodieRecord -> {
            HoodieRecord<?> generateHoodieRecord = generateHoodieRecord(hoodieRecord.getKey(), fileSlice);
            this.loadIndexCount.incrementAndGet();
            atomicLong.incrementAndGet();
            this.output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord)));
        }).build();
    }

    protected void loadRecordsAsync(String str, int i) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        BaseFileUtils baseFileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
        TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(this.hoodieTable.getMetaClient());
        Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema();
        int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
        int maxNumberOfParallelSubtasks = getRuntimeContext().getMaxNumberOfParallelSubtasks();
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        HoodieTimeline commitsTimeline = this.hoodieTable.getMetaClient().getCommitsTimeline();
        if (!StringUtils.isNullOrEmpty(this.lastInstantTime)) {
            commitsTimeline = commitsTimeline.findInstantsAfter(this.lastInstantTime);
        }
        Option<HoodieInstant> lastInstant = commitsTimeline.filterCompletedInstants().lastInstant();
        InternalSchema orElse = tableSchemaResolver.getTableInternalSchemaFromCommitMetadata().orElse(InternalSchema.getEmptyInternalSchema());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(i);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        LinkedList linkedList = new LinkedList();
        try {
            if (lastInstant.isPresent()) {
                List<FileSlice> list = (List) this.hoodieTable.getSliceView().getLatestMergedFileSlicesBeforeOrOn(str, lastInstant.get().getTimestamp()).collect(Collectors.toList());
                org.apache.hadoop.conf.Configuration avroSchema = baseFileUtils instanceof ParquetUtils ? ((ParquetUtils) baseFileUtils).setAvroSchema(this.hadoopConf) : null;
                String string = this.conf.getString(FlinkOptions.INDEX_BOOTSTRAP_LOAD_STRATEGY);
                if (this.conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED) && string.equals(FlinkOptions.INDEX_BOOTSTRAP_STRATEGY_UNMERGED)) {
                    throw new IllegalStateException("unmerged strategy doesn't supported with global index.");
                }
                for (FileSlice fileSlice : list) {
                    if (shouldLoadFile(fileSlice.getFileId(), maxNumberOfParallelSubtasks, numberOfParallelSubtasks, indexOfThisSubtask)) {
                        LOG.info("Load records from {}.", fileSlice);
                        linkedList.add(newFixedThreadPool.submit(new AsyncParquetAvroReader(fileSlice, this.hadoopConf, orElse, lastInstant.get().getTimestamp(), str, this.writeConfig, tableAvroSchema, concurrentLinkedQueue, this.hoodieTable.getBaseFileFormat(), this.conf, this.hoodieTable.getConfig().getBasePath(), avroSchema)));
                    }
                }
                while (true) {
                    StreamRecord streamRecord = (StreamRecord) concurrentLinkedQueue.poll();
                    if (streamRecord != null) {
                        this.loadIndexCount.incrementAndGet();
                        this.output.collect(streamRecord);
                    } else if (linkedList.stream().allMatch((v0) -> {
                        return v0.isDone();
                    })) {
                        break;
                    } else {
                        Thread.sleep(2000L);
                    }
                }
            }
            LOG.info("Task [{}}:{}}] finish loading the index under partition {} and sending them to downstream, time cost: {} milliseconds.", new Object[]{getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask), str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
        } finally {
            newFixedThreadPool.shutdown();
        }
    }

    public static HoodieRecord<?> generateHoodieRecord(HoodieKey hoodieKey, FileSlice fileSlice) {
        HoodieAvroRecord hoodieAvroRecord = new HoodieAvroRecord(hoodieKey, null);
        hoodieAvroRecord.setCurrentLocation(new HoodieRecordGlobalLocation(hoodieKey.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId()));
        hoodieAvroRecord.seal();
        return hoodieAvroRecord;
    }

    public static HoodieRecord<?> generateHoodieRecord(String str, String str2, String str3, String str4) {
        HoodieAvroRecord hoodieAvroRecord = new HoodieAvroRecord(new HoodieKey(str, str2), null);
        hoodieAvroRecord.setCurrentLocation(new HoodieRecordGlobalLocation(str2, str3, str4));
        hoodieAvroRecord.seal();
        return hoodieAvroRecord;
    }

    protected boolean shouldLoadFile(String str, int i, int i2, int i3) {
        return KeyGroupRangeAssignment.assignKeyToParallelOperator(str, i, i2) == i3;
    }

    @VisibleForTesting
    public boolean isAlreadyBootstrap() throws Exception {
        return ((Iterable) this.instantState.get()).iterator().hasNext();
    }
}
