package org.apache.hudi.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.metrics.NanoTimerGauge;
import org.apache.flink.runtime.metrics.SettableGauge;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.hudi.adapter.AbstractStreamOperatorAdapter;
import org.apache.hudi.adapter.AbstractStreamOperatorFactoryAdapter;
import org.apache.hudi.adapter.MailboxExecutorAdapter;
import org.apache.hudi.adapter.RateLimiterAdapter;
import org.apache.hudi.adapter.Utils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/source/StreamReadOperator.class */
public class StreamReadOperator extends AbstractStreamOperatorAdapter<RowData> implements OneInputStreamOperator<MergeOnReadInputSplit, RowData> {
    protected static final Logger LOG = LoggerFactory.getLogger(StreamReadOperator.class);
    protected final MailboxExecutorAdapter executor;
    protected final Configuration conf;
    protected final boolean asyncOpenLog;
    protected MergeOnReadInputFormat format;
    protected transient SourceFunction.SourceContext<RowData> sourceContext;
    protected transient ListState<MergeOnReadInputSplit> inputSplitsState;
    protected transient Queue<MergeOnReadInputSplit> splits;
    protected volatile transient SplitState currentSplitState;
    protected final long totalLimit;
    protected final long subtaskLimit;
    protected transient Counter consumedRecords;
    private transient Counter timerCounter;
    protected transient NanoTimerGauge timer;
    private transient SettableGauge bufferSize;
    protected long logCount = 1;
    protected transient IOException asyncException = null;
    protected int miniBatchSize = 2048;
    protected transient long readCommitTime = 0;
    protected transient RateLimiterAdapter rateLimiter = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/source/StreamReadOperator$OperatorFactory.class */
    public static class OperatorFactory extends AbstractStreamOperatorFactoryAdapter<RowData> implements OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> {
        private final MergeOnReadInputFormat format;
        private final Configuration conf;

        private OperatorFactory(MergeOnReadInputFormat mergeOnReadInputFormat, Configuration configuration) {
            this.format = mergeOnReadInputFormat;
            this.conf = configuration;
        }

        public <O extends StreamOperator<RowData>> O createStreamOperator(StreamOperatorParameters<RowData> streamOperatorParameters) {
            StreamReadOperator streamReadOperator = new StreamReadOperator(this.format, this.processingTimeService, getMailboxExecutorAdapter(), this.conf);
            streamReadOperator.setup(streamOperatorParameters.getContainingTask(), streamOperatorParameters.getStreamConfig(), streamOperatorParameters.getOutput());
            return streamReadOperator;
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return StreamReadOperator.class;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hudi/source/StreamReadOperator$SplitState.class */
    public enum SplitState {
        IDLE,
        RUNNING
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamReadOperator(MergeOnReadInputFormat mergeOnReadInputFormat, ProcessingTimeService processingTimeService, MailboxExecutorAdapter mailboxExecutorAdapter, Configuration configuration) {
        this.format = (MergeOnReadInputFormat) Preconditions.checkNotNull(mergeOnReadInputFormat, "The InputFormat should not be null.");
        this.processingTimeService = processingTimeService;
        this.executor = (MailboxExecutorAdapter) Preconditions.checkNotNull(mailboxExecutorAdapter, "The mailboxExecutor should not be null.");
        this.conf = configuration;
        this.asyncOpenLog = this.conf.getBoolean(FlinkOptions.ASYNC_OPEN_LOG);
        this.totalLimit = this.conf.getLong(FlinkOptions.READ_RATE_LIMIT);
        this.subtaskLimit = this.conf.getLong(FlinkOptions.SUBTASK_READ_RATE_LIMIT);
    }

    public void open() throws Exception {
        super.open();
        if (this.subtaskLimit > 0) {
            this.rateLimiter = RateLimiterAdapter.create(this.subtaskLimit);
            this.miniBatchSize = Math.min(2048, (int) this.subtaskLimit);
        } else if (this.totalLimit > 0) {
            this.rateLimiter = RateLimiterAdapter.create(this.totalLimit / getRuntimeContext().getNumberOfParallelSubtasks());
            this.miniBatchSize = Math.min(2048, (int) (this.totalLimit / getRuntimeContext().getNumberOfParallelSubtasks()));
        }
        getRuntimeContext().getMetricGroup().gauge("ReadCommitTime", new Gauge<Long>() { // from class: org.apache.hudi.source.StreamReadOperator.1
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Long m47getValue() {
                return Long.valueOf(StreamReadOperator.this.readCommitTime);
            }
        });
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.inputSplitsState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("splits", new JavaSerializer()));
        preSet(stateInitializationContext);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void preSet(StateInitializationContext stateInitializationContext) throws Exception {
        this.consumedRecords = getRuntimeContext().getMetricGroup().counter("consumedBufferRecords");
        this.timerCounter = getRuntimeContext().getMetricGroup().counter("bufferConsumptionTimeNs");
        this.bufferSize = new SettableGauge();
        this.timer = new NanoTimerGauge(this.timerCounter);
        getRuntimeContext().getMetricGroup().gauge("bufferConsumptionTimeNsPerSecond", this.timer);
        getRuntimeContext().getMetricGroup().gauge("currentBufferSize", this.bufferSize);
        this.currentSplitState = SplitState.IDLE;
        this.splits = new LinkedBlockingDeque();
        if (stateInitializationContext.isRestored()) {
            LOG.info("Restoring state for operator {} (task ID: {}).", getClass().getSimpleName(), Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
            Iterator it = ((Iterable) this.inputSplitsState.get()).iterator();
            while (it.hasNext()) {
                this.splits.add((MergeOnReadInputSplit) it.next());
            }
        }
        onSplitQueueModified();
        this.sourceContext = Utils.getSourceContext(getOperatorConfig().getTimeCharacteristic(), getProcessingTimeService(), getContainingTask(), this.output, getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());
        enqueueProcessSplits();
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.inputSplitsState.clear();
        this.inputSplitsState.addAll(new ArrayList(this.splits));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(StreamRecord<MergeOnReadInputSplit> streamRecord) throws Exception {
        this.splits.add(streamRecord.getValue());
        onSplitQueueModified();
        enqueueProcessSplits();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void enqueueProcessSplits() {
        if (this.currentSplitState != SplitState.IDLE || this.splits.isEmpty()) {
            return;
        }
        this.currentSplitState = SplitState.RUNNING;
        this.executor.execute(this::processSplits, "process input split");
    }

    protected void processSplits() throws Exception {
        if (this.asyncException != null) {
            throw this.asyncException;
        }
        MergeOnReadInputSplit peek = this.splits.peek();
        if (peek == null) {
            return;
        }
        if (process(peek)) {
            return;
        }
        this.timer.markStart();
        try {
            if (this.rateLimiter != null) {
                consumeAsMiniBatchWithLimit(peek);
            } else {
                consumeAsMiniBatch(peek);
            }
            this.timer.markEnd();
            enqueueProcessSplits();
        } finally {
            this.currentSplitState = SplitState.IDLE;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean process(MergeOnReadInputSplit mergeOnReadInputSplit) throws InterruptedException, IOException {
        if (!this.format.isClosed()) {
            if (this.format.isOpened()) {
                return false;
            }
            this.logCount++;
            TimeUnit.MICROSECONDS.sleep(100L);
            this.currentSplitState = SplitState.IDLE;
            if (this.logCount % 600 == 0) {
                LOG.info("Format is opening {}.", mergeOnReadInputSplit);
                this.logCount = 1L;
            }
            enqueueProcessSplits();
            return true;
        }
        LOG.info("Processing input split : {}", mergeOnReadInputSplit);
        if (!this.asyncOpenLog) {
            LOG.info("Begin open split : {}", mergeOnReadInputSplit);
            this.timer.markStart();
            this.format.open(mergeOnReadInputSplit);
            this.timer.markEnd();
            LOG.info("Finish open split : {}", mergeOnReadInputSplit);
            return false;
        }
        try {
            this.format.setOpening();
            Thread thread = new Thread(() -> {
                try {
                    try {
                        LOG.info("Begin async open split : {}", mergeOnReadInputSplit);
                        this.format.open(mergeOnReadInputSplit);
                        LOG.info("Finish async open split : {}", mergeOnReadInputSplit);
                    } catch (IOException e) {
                        this.asyncException = e;
                        LOG.info("Finish async open split : {}", mergeOnReadInputSplit);
                    }
                } catch (Throwable th) {
                    LOG.info("Finish async open split : {}", mergeOnReadInputSplit);
                    throw th;
                }
            }, getRuntimeContext().getTaskNameWithSubtasks() + " - " + this.conf.getString(FlinkOptions.TABLE_NAME));
            thread.setUncaughtExceptionHandler((thread2, th) -> {
                this.asyncException = new IOException(th);
            });
            thread.start();
            TimeUnit.MICROSECONDS.sleep(100L);
            enqueueProcessSplits();
            return true;
        } finally {
            this.currentSplitState = SplitState.IDLE;
        }
    }

    protected void consumeAsMiniBatchWithLimit(MergeOnReadInputSplit mergeOnReadInputSplit) throws Exception {
        for (int i = 0; i < this.miniBatchSize; i++) {
            if (this.format.reachedEnd()) {
                this.format.close();
                this.splits.poll();
                return;
            }
            this.rateLimiter.acquire();
            this.sourceContext.collect(this.format.nextRecord((RowData) null));
            mergeOnReadInputSplit.consume();
            if (!StringUtils.isNullOrEmpty(mergeOnReadInputSplit.getLatestCommit())) {
                this.readCommitTime = Long.valueOf(mergeOnReadInputSplit.getLatestCommit()).longValue();
            }
        }
    }

    protected void consumeAsMiniBatch(MergeOnReadInputSplit mergeOnReadInputSplit) throws Exception {
        for (int i = 0; i < this.miniBatchSize; i++) {
            if (this.format.reachedEnd()) {
                this.format.close();
                this.splits.poll();
                return;
            } else {
                this.sourceContext.collect(this.format.nextRecord((RowData) null));
                mergeOnReadInputSplit.consume();
                if (!StringUtils.isNullOrEmpty(mergeOnReadInputSplit.getLatestCommit())) {
                    this.readCommitTime = Long.valueOf(mergeOnReadInputSplit.getLatestCommit()).longValue();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onSplitQueueModified() {
        this.bufferSize.setValue(this.splits.size());
    }

    public void processWatermark(Watermark watermark) {
    }

    public void close() throws Exception {
        super.close();
        if (this.format != null) {
            this.format.close();
            this.format.closeInputFormat();
            this.format = null;
        }
        this.sourceContext = null;
        this.timer = null;
        this.timerCounter = null;
        this.bufferSize = null;
        this.consumedRecords = null;
    }

    public void finish() throws Exception {
        super.finish();
        this.output.close();
        if (this.sourceContext != null) {
            this.sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
            this.sourceContext.close();
            this.sourceContext = null;
        }
    }

    public static OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory(MergeOnReadInputFormat mergeOnReadInputFormat, Configuration configuration) {
        return new OperatorFactory(mergeOnReadInputFormat, configuration);
    }
}
