package org.apache.flink.streaming.api.functions.source;

import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.class */
public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
    private static final long serialVersionUID = 1;
    private TypeInformation<OUT> typeInfo;
    private transient TypeSerializer<OUT> serializer;
    private InputFormat<OUT, InputSplit> format;
    private transient InputSplitProvider provider;
    private transient Iterator<InputSplit> splitIterator;
    private volatile boolean isRunning = true;

    /* JADX WARN: Multi-variable type inference failed */
    public InputFormatSourceFunction(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInformation) {
        this.format = inputFormat;
        this.typeInfo = typeInformation;
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void open(Configuration configuration) throws Exception {
        StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext();
        if (this.format instanceof RichInputFormat) {
            ((RichInputFormat) this.format).setRuntimeContext(streamingRuntimeContext);
        }
        this.format.configure(configuration);
        this.provider = streamingRuntimeContext.getInputSplitProvider();
        this.serializer = this.typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
        this.splitIterator = getInputSplits();
        this.isRunning = this.splitIterator.hasNext();
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void run(SourceFunction.SourceContext<OUT> sourceContext) throws Exception {
        try {
            Counter counter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");
            if (this.isRunning && (this.format instanceof RichInputFormat)) {
                ((RichInputFormat) this.format).openInputFormat();
            }
            OUT mo2282createInstance = this.serializer.mo2282createInstance();
            while (this.isRunning) {
                this.format.open(this.splitIterator.next());
                while (this.isRunning && !this.format.reachedEnd()) {
                    mo2282createInstance = this.format.nextRecord(mo2282createInstance);
                    if (mo2282createInstance == null) {
                        break;
                    } else {
                        sourceContext.collect(mo2282createInstance);
                    }
                }
                this.format.close();
                counter.inc();
                if (this.isRunning) {
                    this.isRunning = this.splitIterator.hasNext();
                }
            }
        } finally {
            this.format.close();
            if (this.format instanceof RichInputFormat) {
                ((RichInputFormat) this.format).closeInputFormat();
            }
            this.isRunning = false;
        }
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void cancel() {
        this.isRunning = false;
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void close() throws Exception {
        this.format.close();
        if (this.format instanceof RichInputFormat) {
            ((RichInputFormat) this.format).closeInputFormat();
        }
    }

    public InputFormat<OUT, InputSplit> getFormat() {
        return this.format;
    }

    private Iterator<InputSplit> getInputSplits() {
        return new Iterator<InputSplit>() { // from class: org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.1
            private InputSplit nextSplit;
            private boolean exhausted;

            @Override // java.util.Iterator
            public boolean hasNext() {
                if (this.exhausted) {
                    return false;
                }
                if (this.nextSplit != null) {
                    return true;
                }
                try {
                    InputSplit nextInputSplit = InputFormatSourceFunction.this.provider.getNextInputSplit(InputFormatSourceFunction.this.getRuntimeContext().getUserCodeClassLoader());
                    if (nextInputSplit != null) {
                        this.nextSplit = nextInputSplit;
                        return true;
                    }
                    this.exhausted = true;
                    return false;
                } catch (InputSplitProviderException e) {
                    throw new RuntimeException("Could not retrieve next input split.", e);
                }
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public InputSplit next() {
                if (this.nextSplit == null && !hasNext()) {
                    throw new NoSuchElementException();
                }
                InputSplit inputSplit = this.nextSplit;
                this.nextSplit = null;
                return inputSplit;
            }

            @Override // java.util.Iterator
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }
}
