package com.huawei.hudi.rowdata;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieDuplicateKeyException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/hudi/rowdata/RowDataStreamWriteFunction.class */
abstract class RowDataStreamWriteFunction extends AbstractStreamWriteFunction<Tuple3<StringData, StringData, RowData>> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(RowDataStreamWriteFunction.class);
    private final RowType rowType;
    private transient Map<String, DataBucket> buckets;
    private transient boolean alignmentIndex;
    private transient TotalSizeChecker totalSizeChecker;
    private boolean supportFilter;
    private boolean printWarnLog;
    private boolean filterDeleteRecord;

    @NotNull
    private transient Map<String, CompletableFuture<List<WriteStatus>>> bucketWriteTasks;

    @NotNull
    private transient BucketStats bucketStats;

    @NotNull
    private transient ExecutorService writeTaskExecutor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RowDataStreamWriteFunction(Configuration configuration, RowType rowType) {
        super(configuration);
        this.supportFilter = false;
        this.printWarnLog = true;
        this.filterDeleteRecord = false;
        validateConfig(configuration);
        this.rowType = BulkInsertWriterHelper.addMetadataFields(rowType, false);
    }

    private void validateConfig(Configuration configuration) {
        if (configuration.getBoolean(FlinkOptions.PRE_COMBINE)) {
            LOG.warn("{} doesn't support for this writer", FlinkOptions.PRE_COMBINE.key());
        }
        if (!Objects.equals(HoodieStorageConfig.newBuilder().withProps(configuration.toMap()).build().getString(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE), HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE.defaultValue())) {
            LOG.warn("{} property is ignored, use {} instead", HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE.key(), FlinkOptions.WRITE_BATCH_SIZE.key());
        }
        ValidationUtils.checkArgument(((String) configuration.get(FlinkOptions.TABLE_TYPE)).equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ), "Only " + FlinkOptions.TABLE_TYPE_MERGE_ON_READ + " table is supported for fast sort write");
        ValidationUtils.checkArgument(configuration.getString(FlinkOptions.OPERATION).equals(WriteOperationType.UPSERT.value()), "Only " + WriteOperationType.UPSERT + " is supported for fast sort write");
    }

    public void open(Configuration configuration) throws IOException {
        this.totalSizeChecker = new TotalSizeChecker(this.config);
        this.buckets = new LinkedHashMap();
        this.bucketWriteTasks = new LinkedHashMap();
        this.bucketStats = new BucketStats();
        this.writeTaskExecutor = Executors.newFixedThreadPool(((Integer) this.config.get(FlinkOptions.LSM_STYLE_WRITE_THREADS)).intValue());
        this.supportFilter = !this.config.getBoolean(FlinkOptions.CHANGELOG_ENABLED) && ((String) this.config.get(FlinkOptions.TABLE_TYPE)).equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
        if (this.supportFilter) {
            this.filterDeleteRecord = this.config.getBoolean(FlinkOptions.FILTER_DELETE_RECORD);
            LOG.warn("filter delete record is {}", Boolean.valueOf(this.filterDeleteRecord));
        }
    }

    @Override // org.apache.hudi.sink.common.AbstractStreamWriteFunction
    public void snapshotState() {
        flushRemaining(false);
    }

    @Override // org.apache.hudi.sink.common.AbstractStreamWriteFunction
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        super.initializeState(functionInitializationContext);
        this.alignmentIndex = this.config.getBoolean(FlinkOptions.INDEX_ALIGNMENT_ENABLE) && OptionsResolver.isFlinkInMemoryIndex(this.config);
    }

    public void close() {
        if (this.writeClient != null) {
            this.writeClient.close();
        }
    }

    @Override // org.apache.hudi.sink.common.AbstractStreamWriteFunction, org.apache.hudi.sink.common.AbstractWriteFunction
    public void endInput() {
        super.endInput();
        flushRemaining(true);
        this.writeTaskExecutor.shutdown();
        this.writeClient.cleanHandles();
        this.writeStatuses.clear();
    }

    private String getBucketID(HoodieRecord<?> hoodieRecord, boolean z) {
        return StreamerUtil.generateBucketKey(hoodieRecord.getPartitionPath(), z ? hoodieRecord.getCurrentLocation().getNewFileId() : hoodieRecord.getCurrentLocation().getFileId());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean filterDeleteRecord(HoodieRecord<?> hoodieRecord) {
        if (this.supportFilter && this.printWarnLog && HoodieOperation.DELETE.equals(hoodieRecord.getOperation())) {
            LOG.warn("MOR {} has delete record!!!", getRuntimeContext().getTaskName());
            this.printWarnLog = false;
        }
        return this.filterDeleteRecord && HoodieOperation.DELETE.equals(hoodieRecord.getOperation());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void bufferRecord(HoodieFlinkRecord hoodieFlinkRecord) throws IOException {
        if (filterDeleteRecord(hoodieFlinkRecord)) {
            return;
        }
        String bucketID = getBucketID(hoodieFlinkRecord, OptionsResolver.isMowTable(this.config));
        if (this.alignmentIndex && KeyGroupRangeAssignment.assignKeyToParallelOperator(hoodieFlinkRecord.getCurrentLocation().getFileId(), getRuntimeContext().getMaxNumberOfParallelSubtasks(), getRuntimeContext().getNumberOfParallelSubtasks()) != getRuntimeContext().getIndexOfThisSubtask()) {
            LOG.warn("The stream write {}#{}#{} task should not process the {}.", new Object[]{Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Integer.valueOf(getRuntimeContext().getNumberOfParallelSubtasks()), Integer.valueOf(getRuntimeContext().getMaxNumberOfParallelSubtasks()), hoodieFlinkRecord});
            throw new HoodieDuplicateKeyException(hoodieFlinkRecord.toString());
        }
        DataBucket computeIfAbsent = this.buckets.computeIfAbsent(bucketID, str -> {
            return createBucket(str, hoodieFlinkRecord);
        });
        boolean add = computeIfAbsent.add(hoodieFlinkRecord);
        boolean check = this.totalSizeChecker.check(this.buckets.values());
        if (add) {
            flushBucket(computeIfAbsent);
        } else if (check) {
            flushMaxSizeBucket();
        }
    }

    private DataBucket createBucket(String str, HoodieFlinkRecord hoodieFlinkRecord) {
        String instantToWrite = instantToWrite(true);
        DataBucket dataBucket = new DataBucket(str, Double.valueOf(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE)), this.writeClient.getConfig(), instantToWrite, this.writeClient.initTable(WriteOperationType.UPSERT, Option.ofNullable(instantToWrite)), hoodieFlinkRecord.getPartitionPath(), hoodieFlinkRecord.getCurrentLocation().getFileId(), this.rowType);
        this.bucketStats.created(dataBucket);
        return dataBucket;
    }

    private void flushBucket(DataBucket dataBucket) {
        String instantToWrite = instantToWrite(true);
        if (instantToWrite == null) {
            LOG.info("No inflight instant when flushing data, skip.");
            return;
        }
        ValidationUtils.checkState(hasData(), "Data bucket to flush has no buffering records");
        List<WriteStatus> join = this.bucketWriteTasks.getOrDefault(dataBucket.getBucketId(), CompletableFuture.completedFuture(Collections.emptyList())).join();
        this.bucketStats.flushed(dataBucket);
        this.writeStatuses.addAll(join);
        updateGlobalPartitionMetadata(join);
        this.buckets.remove(dataBucket.getBucketId());
        this.bucketWriteTasks.put(dataBucket.getBucketId(), CompletableFuture.supplyAsync(() -> {
            List<WriteStatus> close = dataBucket.close();
            this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.builder().taskID(this.taskID).instantTime(instantToWrite).writeStatus(close).lastBatch(false).endInput(false).build());
            return close;
        }, this.writeTaskExecutor));
    }

    private void flushMaxSizeBucket() {
        Option.fromJavaOptional(this.buckets.values().stream().max(Comparator.comparingLong((v0) -> {
            return v0.getTotalSize();
        }))).ifPresent(this::flushBucket);
    }

    private void flushRemaining(boolean z) {
        this.currentInstant = instantToWrite(hasData());
        if (this.currentInstant == null) {
            throw new HoodieException("No inflight instant when flushing data!");
        }
        List<WriteStatus> flushAllBuckets = flushAllBuckets();
        this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.builder().taskID(this.taskID).instantTime(this.currentInstant).writeStatus(flushAllBuckets).lastBatch(true).endInput(z).build());
        this.buckets.clear();
        this.writeClient.cleanHandles();
        this.writeStatuses.addAll(flushAllBuckets);
        this.confirming = true;
    }

    private List<WriteStatus> flushAllBuckets() {
        LOG.info(this.bucketStats.getSummaryAndClear(this.buckets.values()));
        HoodieTimer start = HoodieTimer.start();
        if (!hasData()) {
            LOG.info("No data to write in subtask [{}] for instant [{}]", Integer.valueOf(this.taskID), this.currentInstant);
            return Collections.emptyList();
        }
        List<WriteStatus> list = (List) ((List) new ArrayList(this.buckets.values()).stream().filter(dataBucket -> {
            return !dataBucket.isEmpty();
        }).map(this::concatAsyncFlush).collect(Collectors.toList())).stream().map((v0) -> {
            return v0.join();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
        updateGlobalPartitionMetadata(list);
        LOG.info("{} flushed all buckets in {}ms", Thread.currentThread().getName(), Long.valueOf(start.endTimer()));
        return list;
    }

    CompletableFuture<List<WriteStatus>> concatAsyncFlush(DataBucket dataBucket) {
        this.buckets.remove(dataBucket.getBucketId());
        return this.bucketWriteTasks.getOrDefault(dataBucket.getBucketId(), CompletableFuture.completedFuture(Collections.emptyList())).thenApplyAsync(list -> {
            List<WriteStatus> close = dataBucket.close();
            close.addAll(list);
            return close;
        }, (Executor) this.writeTaskExecutor);
    }

    private boolean hasData() {
        return !this.buckets.isEmpty() && this.buckets.values().stream().anyMatch(dataBucket -> {
            return !dataBucket.isEmpty();
        });
    }
}
