package org.apache.flink.streaming.api.functions.source.datagen;

import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.NanoTimerGauge;
import org.apache.flink.runtime.resourceestimator.metrics.SourcePartitionRecorder;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
@Deprecated
/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.class */
public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implements CheckpointedFunction {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(DataGeneratorSource.class);
    private final DataGenerator<T> generator;
    private final long rowsPerSecond;

    @Nullable
    private final Long numberOfRows;
    private transient int outputSoFar;
    private transient int toOutput;
    volatile transient boolean isRunning;
    private NanoTimerGauge inputWaitTimerGauge;
    private SourcePartitionRecorder partitionRecorder;

    public DataGeneratorSource(DataGenerator<T> dataGenerator) {
        this(dataGenerator, Long.MAX_VALUE, null);
    }

    public DataGeneratorSource(DataGenerator<T> dataGenerator, long j, @Nullable Long l) {
        this.generator = dataGenerator;
        this.rowsPerSecond = j;
        this.numberOfRows = l;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        if (this.numberOfRows != null) {
            int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            int longValue = (int) (this.numberOfRows.longValue() / numberOfParallelSubtasks);
            this.toOutput = this.numberOfRows.longValue() % ((long) numberOfParallelSubtasks) > ((long) indexOfThisSubtask) ? longValue + 1 : longValue;
        }
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.generator.open("DataGenerator", functionInitializationContext, getRuntimeContext());
        OperatorMetricGroup metricGroup = getRuntimeContext().getMetricGroup();
        SimpleCounter simpleCounter = new SimpleCounter();
        this.inputWaitTimerGauge = new NanoTimerGauge(simpleCounter);
        metricGroup.gauge("sourceInputBlockingTimePerSecond", this.inputWaitTimerGauge);
        metricGroup.counter("sourceInputBlockingTime", simpleCounter);
        this.partitionRecorder = new SourcePartitionRecorder(getRuntimeContext().getIndexOfThisSubtask(), metricGroup);
        this.isRunning = true;
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        this.generator.snapshotState(functionSnapshotContext);
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        double numberOfParallelSubtasks = this.rowsPerSecond / getRuntimeContext().getNumberOfParallelSubtasks();
        long currentTimeMillis = System.currentTimeMillis();
        while (this.isRunning) {
            for (int i = 0; i < numberOfParallelSubtasks; i++) {
                if (!this.isRunning || !this.generator.hasNext()) {
                    return;
                }
                if (this.numberOfRows != null && this.outputSoFar >= this.toOutput) {
                    return;
                }
                this.partitionRecorder.inc(1L);
                synchronized (sourceContext.getCheckpointLock()) {
                    this.outputSoFar++;
                    sourceContext.collect(this.generator.next());
                }
            }
            currentTimeMillis += 1000;
            this.inputWaitTimerGauge.markStart();
            for (long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis(); currentTimeMillis2 > 0; currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis()) {
                Thread.sleep(currentTimeMillis2);
            }
            this.inputWaitTimerGauge.markEnd();
        }
    }

    public void close() throws Exception {
        super.close();
        LOG.info("generated {} rows", Integer.valueOf(this.outputSoFar));
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void cancel() {
        this.isRunning = false;
    }
}
