package org.apache.hudi.source;

import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.table.data.RowData;
import org.apache.hudi.adapter.AbstractStreamOperatorFactoryAdapter;
import org.apache.hudi.adapter.MailboxExecutorAdapter;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.source.StreamReadOperator;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;

/* loaded from: input_file:org/apache/hudi/source/KeyedStreamReadOperator.class */
public class KeyedStreamReadOperator extends StreamReadOperator {
    private MapState<Integer, MergeOnReadInputSplit> keyedState;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/source/KeyedStreamReadOperator$KeyedOperatorFactory.class */
    public static class KeyedOperatorFactory extends AbstractStreamOperatorFactoryAdapter<RowData> implements OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> {
        private final MergeOnReadInputFormat format;
        private final Configuration conf;

        private KeyedOperatorFactory(MergeOnReadInputFormat mergeOnReadInputFormat, Configuration configuration) {
            this.format = mergeOnReadInputFormat;
            this.conf = configuration;
        }

        public <O extends StreamOperator<RowData>> O createStreamOperator(StreamOperatorParameters<RowData> streamOperatorParameters) {
            KeyedStreamReadOperator keyedStreamReadOperator = new KeyedStreamReadOperator(this.format, this.processingTimeService, getMailboxExecutorAdapter(), this.conf);
            keyedStreamReadOperator.setup(streamOperatorParameters.getContainingTask(), streamOperatorParameters.getStreamConfig(), streamOperatorParameters.getOutput());
            return keyedStreamReadOperator;
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return StreamReadOperator.class;
        }
    }

    public KeyedStreamReadOperator(MergeOnReadInputFormat mergeOnReadInputFormat, ProcessingTimeService processingTimeService, MailboxExecutorAdapter mailboxExecutorAdapter, Configuration configuration) {
        super(mergeOnReadInputFormat, processingTimeService, mailboxExecutorAdapter, configuration);
    }

    @Override // org.apache.hudi.source.StreamReadOperator
    public void open() throws Exception {
        super.open();
    }

    @Override // org.apache.hudi.source.StreamReadOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        this.inputSplitsState = stateInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor("splits", new JavaSerializer()));
        this.keyedState = stateInitializationContext.getKeyedStateStore().getMapState(new MapStateDescriptor("mapState", IntSerializer.INSTANCE, new JavaSerializer()));
        super.preSet(stateInitializationContext);
    }

    @Override // org.apache.hudi.source.StreamReadOperator
    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
    }

    @Override // org.apache.hudi.source.StreamReadOperator
    public void processElement(StreamRecord<MergeOnReadInputSplit> streamRecord) throws Exception {
        super.processElement(streamRecord);
        setCurrentKey(((MergeOnReadInputSplit) streamRecord.getValue()).getFileId());
        this.keyedState.put(Integer.valueOf(((MergeOnReadInputSplit) streamRecord.getValue()).getSplitNumber()), streamRecord.getValue());
    }

    @Override // org.apache.hudi.source.StreamReadOperator
    protected void processSplits() throws Exception {
        if (this.asyncException != null) {
            throw this.asyncException;
        }
        MergeOnReadInputSplit peek = this.splits.peek();
        if (peek == null) {
            return;
        }
        setCurrentKey(peek.getFileId());
        if (!this.keyedState.contains(Integer.valueOf(peek.getSplitNumber()))) {
            this.splits.poll();
            onSplitQueueModified();
        } else {
            if (process(peek)) {
                return;
            }
            this.timer.markStart();
            try {
                if (this.rateLimiter != null) {
                    consumeAsMiniBatchWithLimit(peek);
                } else {
                    consumeAsMiniBatch(peek);
                }
                this.timer.markEnd();
                enqueueProcessSplits();
            } finally {
                this.currentSplitState = StreamReadOperator.SplitState.IDLE;
            }
        }
    }

    @Override // org.apache.hudi.source.StreamReadOperator
    protected void consumeAsMiniBatchWithLimit(MergeOnReadInputSplit mergeOnReadInputSplit) throws Exception {
        for (int i = 0; i < this.miniBatchSize; i++) {
            if (this.format.reachedEnd()) {
                this.format.close();
                this.splits.poll();
                onSplitQueueModified();
                this.consumedRecords.inc();
                setCurrentKey(mergeOnReadInputSplit.getFileId());
                this.keyedState.remove(Integer.valueOf(mergeOnReadInputSplit.getSplitNumber()));
                return;
            }
            this.rateLimiter.acquire();
            this.sourceContext.collect(this.format.nextRecord((RowData) null));
            mergeOnReadInputSplit.consume();
            if (!StringUtils.isNullOrEmpty(mergeOnReadInputSplit.getLatestCommit())) {
                this.readCommitTime = Long.valueOf(mergeOnReadInputSplit.getLatestCommit()).longValue();
            }
        }
    }

    @Override // org.apache.hudi.source.StreamReadOperator
    protected void consumeAsMiniBatch(MergeOnReadInputSplit mergeOnReadInputSplit) throws Exception {
        for (int i = 0; i < this.miniBatchSize; i++) {
            if (this.format.reachedEnd()) {
                this.format.close();
                this.splits.poll();
                onSplitQueueModified();
                this.consumedRecords.inc();
                setCurrentKey(mergeOnReadInputSplit.getFileId());
                this.keyedState.remove(Integer.valueOf(mergeOnReadInputSplit.getSplitNumber()));
                return;
            }
            this.sourceContext.collect(this.format.nextRecord((RowData) null));
            mergeOnReadInputSplit.consume();
            if (!StringUtils.isNullOrEmpty(mergeOnReadInputSplit.getLatestCommit())) {
                this.readCommitTime = Long.valueOf(mergeOnReadInputSplit.getLatestCommit()).longValue();
            }
        }
    }

    public static OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory(MergeOnReadInputFormat mergeOnReadInputFormat, Configuration configuration) {
        return new KeyedOperatorFactory(mergeOnReadInputFormat, configuration);
    }
}
