package org.apache.flink.runtime.operators;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
import org.apache.flink.runtime.metrics.groups.InternalOperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.operators.chaining.ChainedDriver;
import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.operators.util.metrics.CountingCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.UserCodeClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/DataSourceTask.class */
public class DataSourceTask<OT> extends AbstractInvokable {
    private static final Logger LOG = LoggerFactory.getLogger(DataSourceTask.class);
    private List<RecordWriter<?>> eventualOutputs;
    private Collector<OT> output;
    private InputFormat<OT, InputSplit> format;
    private TypeSerializerFactory<OT> serializerFactory;
    private TaskConfig config;
    private ArrayList<ChainedDriver<?, ?>> chainedTasks;
    private volatile boolean taskCanceled;

    public DataSourceTask(Environment environment) {
        super(environment);
        this.taskCanceled = false;
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable, org.apache.flink.runtime.jobgraph.tasks.TaskInvokable
    public void invoke() throws Exception {
        Counter simpleCounter;
        initInputFormat();
        LOG.debug(getLogString("Start registering input and output"));
        try {
            initOutputs(getEnvironment().getUserCodeClassLoader());
            LOG.debug(getLogString("Finished registering input and output"));
            LOG.debug(getLogString("Starting data source operator"));
            DistributedRuntimeUDFContext createRuntimeContext = createRuntimeContext();
            try {
                InternalOperatorIOMetricGroup m342getIOMetricGroup = ((InternalOperatorMetricGroup) createRuntimeContext.getMetricGroup()).m342getIOMetricGroup();
                m342getIOMetricGroup.reuseInputMetricsForTask();
                if (this.config.getNumberOfChainedStubs() == 0) {
                    m342getIOMetricGroup.reuseOutputMetricsForTask();
                }
                simpleCounter = m342getIOMetricGroup.getNumRecordsOutCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", e);
                simpleCounter = new SimpleCounter();
            }
            Counter counter = simpleCounter;
            Counter counter2 = createRuntimeContext.getMetricGroup().counter("numSplitsProcessed");
            if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
                this.format.setRuntimeContext(createRuntimeContext);
                LOG.debug(getLogString("Rich Source detected. Initializing runtime context."));
                this.format.openInputFormat();
                LOG.debug(getLogString("Rich Source detected. Opening the InputFormat."));
            }
            boolean isObjectReuseEnabled = getExecutionConfig().isObjectReuseEnabled();
            LOG.debug("DataSourceTask object reuse: " + (isObjectReuseEnabled ? "ENABLED" : "DISABLED") + ScopeFormat.SCOPE_SEPARATOR);
            TypeSerializer serializer = this.serializerFactory.getSerializer();
            try {
                try {
                    BatchTask.openChainedTasks(this.chainedTasks, this);
                    Iterator<InputSplit> inputSplits = getInputSplits();
                    while (!this.taskCanceled && inputSplits.hasNext()) {
                        InputSplit next = inputSplits.next();
                        LOG.debug(getLogString("Opening input split " + next.toString()));
                        InputFormat<OT, InputSplit> inputFormat = this.format;
                        inputFormat.open(next);
                        LOG.debug(getLogString("Starting to read input from split " + next.toString()));
                        try {
                            CountingCollector countingCollector = new CountingCollector(this.output, counter);
                            if (isObjectReuseEnabled) {
                                Object createInstance = serializer.createInstance();
                                while (!this.taskCanceled && !inputFormat.reachedEnd()) {
                                    Object nextRecord = inputFormat.nextRecord(createInstance);
                                    if (nextRecord != null) {
                                        countingCollector.collect(nextRecord);
                                    }
                                }
                            } else {
                                while (!this.taskCanceled && !inputFormat.reachedEnd()) {
                                    Object nextRecord2 = inputFormat.nextRecord(serializer.createInstance());
                                    if (nextRecord2 != null) {
                                        countingCollector.collect(nextRecord2);
                                    }
                                }
                            }
                            if (LOG.isDebugEnabled() && !this.taskCanceled) {
                                LOG.debug(getLogString("Closing input split " + next.toString()));
                            }
                            inputFormat.close();
                            counter2.inc();
                        } catch (Throwable th) {
                            inputFormat.close();
                            throw th;
                        }
                    }
                    BatchTask.closeChainedTasks(this.chainedTasks, this);
                    this.output.close();
                    BatchTask.clearWriters(this.eventualOutputs);
                    if (this.format != null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
                        this.format.closeInputFormat();
                        LOG.debug(getLogString("Rich Source detected. Closing the InputFormat."));
                    }
                } catch (Exception e2) {
                    try {
                        this.format.close();
                    } catch (Throwable th2) {
                    }
                    BatchTask.cancelChainedTasks(this.chainedTasks);
                    Exception exceptionUnwrap = ExceptionInChainedStubException.exceptionUnwrap(e2);
                    if (exceptionUnwrap instanceof CancelTaskException) {
                        throw exceptionUnwrap;
                    }
                    if (!this.taskCanceled) {
                        BatchTask.logAndThrowException(exceptionUnwrap, this);
                    }
                    BatchTask.clearWriters(this.eventualOutputs);
                    if (this.format != null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
                        this.format.closeInputFormat();
                        LOG.debug(getLogString("Rich Source detected. Closing the InputFormat."));
                    }
                }
                if (this.taskCanceled) {
                    LOG.debug(getLogString("Data source operator cancelled"));
                } else {
                    LOG.debug(getLogString("Finished data source operator"));
                }
            } catch (Throwable th3) {
                BatchTask.clearWriters(this.eventualOutputs);
                if (this.format != null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
                    this.format.closeInputFormat();
                    LOG.debug(getLogString("Rich Source detected. Closing the InputFormat."));
                }
                throw th3;
            }
        } catch (Exception e3) {
            throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " + e3.getMessage(), e3);
        }
    }

    @Override // org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable, org.apache.flink.runtime.jobgraph.tasks.TaskInvokable
    public void cancel() throws Exception {
        this.taskCanceled = true;
        LOG.debug(getLogString("Cancelling data source operator"));
    }

    private void initInputFormat() {
        ClassLoader userCodeClassLoader = getUserCodeClassLoader();
        this.config = new TaskConfig(getTaskConfiguration());
        InputOutputFormatContainer inputOutputFormatContainer = new InputOutputFormatContainer(this.config, userCodeClassLoader);
        try {
            Pair uniqueInputFormat = inputOutputFormatContainer.getUniqueInputFormat();
            this.format = (InputFormat) uniqueInputFormat.getValue();
            if (!InputFormat.class.isAssignableFrom(this.format.getClass())) {
                throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" + InputFormat.class.getName() + "' as is required.");
            }
            Thread currentThread = Thread.currentThread();
            ClassLoader contextClassLoader = currentThread.getContextClassLoader();
            try {
                try {
                    currentThread.setContextClassLoader(userCodeClassLoader);
                    this.format.configure(inputOutputFormatContainer.getParameters((OperatorID) uniqueInputFormat.getKey()));
                    currentThread.setContextClassLoader(contextClassLoader);
                    this.serializerFactory = this.config.getOutputSerializer(userCodeClassLoader);
                } catch (Throwable th) {
                    throw new RuntimeException("The user defined 'configure()' method caused an error: " + th.getMessage(), th);
                }
            } catch (Throwable th2) {
                currentThread.setContextClassLoader(contextClassLoader);
                throw th2;
            }
        } catch (ClassCastException e) {
            throw new RuntimeException("The stub class is not a proper subclass of " + InputFormat.class.getName(), e);
        }
    }

    private void initOutputs(UserCodeClassLoader userCodeClassLoader) throws Exception {
        this.chainedTasks = new ArrayList<>();
        this.eventualOutputs = new ArrayList();
        this.output = BatchTask.initOutputs(this, userCodeClassLoader, this.config, this.chainedTasks, this.eventualOutputs, getExecutionConfig(), getEnvironment().getAccumulatorRegistry().getUserMap());
    }

    private String getLogString(String str) {
        return getLogString(str, getEnvironment().getTaskInfo().getTaskName());
    }

    private String getLogString(String str, String str2) {
        return BatchTask.constructLogString(str, str2, this);
    }

    private Iterator<InputSplit> getInputSplits() {
        final InputSplitProvider inputSplitProvider = getEnvironment().getInputSplitProvider();
        return new Iterator<InputSplit>() { // from class: org.apache.flink.runtime.operators.DataSourceTask.1
            private InputSplit nextSplit;
            private boolean exhausted;

            @Override // java.util.Iterator
            public boolean hasNext() {
                if (this.exhausted) {
                    return false;
                }
                if (this.nextSplit != null) {
                    return true;
                }
                try {
                    InputSplit nextInputSplit = inputSplitProvider.getNextInputSplit(DataSourceTask.this.getUserCodeClassLoader());
                    if (nextInputSplit != null) {
                        this.nextSplit = nextInputSplit;
                        return true;
                    }
                    this.exhausted = true;
                    return false;
                } catch (InputSplitProviderException e) {
                    throw new RuntimeException("Could not retrieve next input split.", e);
                }
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public InputSplit next() {
                if (this.nextSplit == null && !hasNext()) {
                    throw new NoSuchElementException();
                }
                InputSplit inputSplit = this.nextSplit;
                this.nextSplit = null;
                return inputSplit;
            }

            @Override // java.util.Iterator
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }

    public DistributedRuntimeUDFContext createRuntimeContext() {
        Environment environment = getEnvironment();
        String trim = getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
        return new DistributedRuntimeUDFContext(environment.getTaskInfo(), environment.getUserCodeClassLoader(), getExecutionConfig(), environment.getDistributedCacheEntries(), environment.getAccumulatorRegistry().getUserMap(), getEnvironment().getMetricGroup().getOrAddOperator(trim.startsWith("CHAIN") ? trim.substring(6) : trim), environment.getExternalResourceInfoProvider(), environment.getJobID());
    }
}
