package org.apache.flink.streaming.util;

import java.util.Arrays;
import java.util.Iterator;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

/* loaded from: input_file:org/apache/flink/streaming/util/FiniteTestSource.class */
public class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListener {
    private static final long serialVersionUID = 1;
    private final Iterable<T> elements;
    private volatile boolean running;
    private transient int numCheckpointsComplete;

    @SafeVarargs
    public FiniteTestSource(T... tArr) {
        this(Arrays.asList(tArr));
    }

    public FiniteTestSource(Iterable<T> iterable) {
        this.running = true;
        this.elements = iterable;
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        emitElementsAndWaitForCheckpoints(sourceContext, 2);
        emitElementsAndWaitForCheckpoints(sourceContext, 2);
    }

    private void emitElementsAndWaitForCheckpoints(SourceFunction.SourceContext<T> sourceContext, int i) throws InterruptedException {
        int i2;
        Object checkpointLock = sourceContext.getCheckpointLock();
        synchronized (checkpointLock) {
            i2 = this.numCheckpointsComplete + i;
            Iterator<T> it = this.elements.iterator();
            while (it.hasNext()) {
                sourceContext.collect(it.next());
            }
        }
        synchronized (checkpointLock) {
            while (this.running && this.numCheckpointsComplete < i2) {
                checkpointLock.wait(serialVersionUID);
            }
        }
    }

    public void cancel() {
        this.running = false;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        this.numCheckpointsComplete++;
    }
}
