package org.apache.hudi.sink.partitioner;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.model.BaseAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieDuplicateKeyException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.sink.bootstrap.IndexRecord;
import org.apache.hudi.sink.bootstrap.aggregate.IndexAlignment;
import org.apache.hudi.sink.utils.PayloadCreation;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.util.FlinkWriteClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/partitioner/BucketAssignFunction.class */
public class BucketAssignFunction<K, I, O extends HoodieRecord<?>> extends KeyedProcessFunction<K, I, O> implements CheckpointedFunction, CheckpointListener {
    private static final Logger LOG = LoggerFactory.getLogger(BucketAssignFunction.class);
    private ValueState<HoodieRecordGlobalLocation> indexState;
    private BucketAssigner bucketAssigner;
    private final Configuration conf;
    private final boolean isChangingRecords;
    private PayloadCreation payloadCreation;
    private final boolean globalIndex;
    private transient GlobalAggregateManager aggregateManager;
    private transient boolean alignmentIndex;
    private transient AtomicLong loadIndexCount;
    private transient ScheduledExecutorService executor;
    private transient ScheduledFuture scheduledFuture;
    private transient Map<String, Set<String>> assignFileIdMap = new HashMap();

    public BucketAssignFunction(Configuration configuration) {
        this.conf = configuration;
        this.isChangingRecords = WriteOperationType.isChangingRecords(WriteOperationType.fromValue(configuration.getString(FlinkOptions.OPERATION)));
        this.globalIndex = configuration.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED) && !configuration.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        HoodieWriteConfig hoodieClientConfig = FlinkWriteClients.getHoodieClientConfig(this.conf, true);
        this.bucketAssigner = BucketAssigners.create(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getMaxNumberOfParallelSubtasks(), getRuntimeContext().getNumberOfParallelSubtasks(), ignoreSmallFiles(), HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)), new HoodieFlinkEngineContext(HadoopFSUtils.getStorageConfWithCopy(HadoopConfigurations.getHadoopConf(this.conf)), new FlinkTaskContextSupplier(getRuntimeContext())), hoodieClientConfig);
        this.payloadCreation = PayloadCreation.instance(this.conf);
    }

    private boolean ignoreSmallFiles() {
        return WriteOperationType.isOverwrite(WriteOperationType.fromValue(this.conf.getString(FlinkOptions.OPERATION)));
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
        if (functionSnapshotContext != null) {
            LOG.info("current checkpoint {} write data to fileId {}", Long.valueOf(functionSnapshotContext.getCheckpointId()), this.assignFileIdMap);
        }
        this.assignFileIdMap.clear();
        this.bucketAssigner.reset();
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("indexState", TypeInformation.of(HoodieRecordGlobalLocation.class));
        double d = this.conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24.0d * 60.0d * 60.0d * 1000.0d;
        if (d > 0.0d) {
            valueStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.milliseconds((long) d)).build());
            LOG.info("index state ttl is {}", StateTtlConfig.newBuilder(Time.milliseconds((long) d)).build());
        }
        this.indexState = functionInitializationContext.getKeyedStateStore().getState(valueStateDescriptor);
        if (functionInitializationContext.isRestored()) {
            this.conf.setBoolean(FlinkOptions.INDEX_ALIGNMENT_ENABLE, false);
        }
        this.alignmentIndex = this.conf.getBoolean(FlinkOptions.INDEX_ALIGNMENT_ENABLE);
        if (this.alignmentIndex) {
            this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
            this.loadIndexCount = new AtomicLong();
            this.executor = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("bucketAssign-agg"));
            this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
                try {
                    this.aggregateManager.updateGlobalAggregate(IndexAlignment.NAME + this.conf.getString(FlinkOptions.TABLE_NAME), new Tuple4(getRuntimeContext().getTaskName(), Integer.valueOf(getRuntimeContext().getNumberOfParallelSubtasks()), Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Long.valueOf(this.loadIndexCount.get())), new IndexAlignment());
                } catch (IOException e) {
                    LOG.warn("Update global task bucketAssign summary error", e);
                }
            }, 0L, 500L, TimeUnit.MILLISECONDS);
        } else {
            LOG.info("skip index alignment.");
        }
        this.assignFileIdMap = new HashMap();
    }

    private void waitLoadIndex() {
        if (this.scheduledFuture != null) {
            LOG.info("shutdown the reporting thread.");
            this.scheduledFuture.cancel(false);
            this.executor.shutdown();
            this.scheduledFuture = null;
            this.executor = null;
        }
    }

    public void processElement(I i, KeyedProcessFunction<K, I, O>.Context context, Collector<O> collector) throws Exception {
        if (i instanceof IndexRecord) {
            IndexRecord indexRecord = (IndexRecord) i;
            HoodieRecordGlobalLocation hoodieRecordGlobalLocation = (HoodieRecordGlobalLocation) this.indexState.value();
            if (hoodieRecordGlobalLocation == null || (OptionsResolver.isMowTable(this.conf) && HoodieTimeline.compareTimestamps(hoodieRecordGlobalLocation.getInstantTime(), HoodieTimeline.LESSER_THAN, indexRecord.getCurrentLocation().getInstantTime()))) {
                this.indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation());
            }
            if (this.loadIndexCount != null) {
                this.loadIndexCount.incrementAndGet();
            }
            LOG.debug("IndexRecord {}", i);
        } else {
            waitLoadIndex();
            LOG.debug("HoodieRecord {}", i);
            processRecord((HoodieRecord) i, collector);
        }
        if (!this.alignmentIndex || KeyGroupRangeAssignment.assignKeyToParallelOperator(((HoodieRecord) i).getRecordKey(), getRuntimeContext().getMaxNumberOfParallelSubtasks(), getRuntimeContext().getNumberOfParallelSubtasks()) == getRuntimeContext().getIndexOfThisSubtask()) {
            return;
        }
        LOG.warn("The bucket assign {}#{}#{} task should not process the {}.", new Object[]{Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Integer.valueOf(getRuntimeContext().getNumberOfParallelSubtasks()), Integer.valueOf(getRuntimeContext().getMaxNumberOfParallelSubtasks()), i});
        LOG.warn("the {} task has {} fileIds.", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), this.assignFileIdMap);
        throw new HoodieDuplicateKeyException(i.toString());
    }

    private void processRecord(HoodieRecord<?> hoodieRecord, Collector<O> collector) throws Exception {
        HoodieRecordLocation newRecordLocation;
        HoodieKey key = hoodieRecord.getKey();
        String recordKey = key.getRecordKey();
        String partitionPath = key.getPartitionPath();
        if (this.isChangingRecords) {
            HoodieRecordGlobalLocation hoodieRecordGlobalLocation = (HoodieRecordGlobalLocation) this.indexState.value();
            if (hoodieRecordGlobalLocation == null) {
                newRecordLocation = getNewRecordLocation(partitionPath);
            } else if (Objects.equals(hoodieRecordGlobalLocation.getPartitionPath(), partitionPath)) {
                newRecordLocation = hoodieRecordGlobalLocation.toLocal("U");
                this.bucketAssigner.addUpdate(partitionPath, newRecordLocation.getFileId());
            } else {
                if (this.globalIndex) {
                    HoodieAvroRecord hoodieAvroRecord = new HoodieAvroRecord(new HoodieKey(recordKey, hoodieRecordGlobalLocation.getPartitionPath()), this.payloadCreation.createDeletePayload((BaseAvroPayload) hoodieRecord.getData()));
                    hoodieAvroRecord.unseal();
                    hoodieAvroRecord.setCurrentLocation(hoodieRecordGlobalLocation.toLocal("U"));
                    hoodieAvroRecord.seal();
                    collector.collect(hoodieAvroRecord);
                }
                newRecordLocation = getNewRecordLocation(partitionPath);
            }
            updateIndexState(partitionPath, newRecordLocation);
        } else {
            newRecordLocation = getNewRecordLocation(partitionPath);
        }
        hoodieRecord.unseal();
        hoodieRecord.setCurrentLocation(newRecordLocation);
        hoodieRecord.seal();
        collector.collect(hoodieRecord);
    }

    private HoodieRecordLocation getNewRecordLocation(String str) {
        HoodieRecordLocation hoodieRecordLocation;
        BucketInfo addInsert = this.bucketAssigner.addInsert(str);
        switch (addInsert.getBucketType()) {
            case INSERT:
                hoodieRecordLocation = new HoodieRecordLocation("I", addInsert.getFileIdPrefix());
                break;
            case UPDATE:
                hoodieRecordLocation = new HoodieRecordLocation("U", addInsert.getFileIdPrefix());
                break;
            default:
                throw new AssertionError();
        }
        if (!this.assignFileIdMap.containsKey(str)) {
            this.assignFileIdMap.put(str, new HashSet());
        }
        this.assignFileIdMap.get(str).add(addInsert.getFileIdPrefix());
        return hoodieRecordLocation;
    }

    private void updateIndexState(String str, HoodieRecordLocation hoodieRecordLocation) throws Exception {
        this.indexState.update(HoodieRecordGlobalLocation.fromLocal(str, hoodieRecordLocation));
    }

    public void notifyCheckpointComplete(long j) {
        this.bucketAssigner.reload(j);
    }

    public void close() throws Exception {
        this.bucketAssigner.close();
    }
}
