package org.apache.flink.datastream.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.FromDataSource;
import org.apache.flink.api.connector.dsv2.Source;
import org.apache.flink.api.connector.dsv2.WrappedSource;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.datastream.api.ExecutionEnvironment;
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
import org.apache.flink.datastream.impl.stream.NonKeyedPartitionStreamImpl;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.runtime.translators.DataStreamV2SinkTransformationTranslator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.class */
public class ExecutionEnvironmentImpl implements ExecutionEnvironment {
    private final List<Transformation<?>> transformations = new ArrayList();
    private final ExecutionConfig executionConfig;
    private final CheckpointConfig checkpointCfg;
    private final Configuration configuration;
    private final ClassLoader userClassloader;
    private final PipelineExecutorServiceLoader executorServiceLoader;
    private static ExecutionEnvironmentFactory contextEnvironmentFactory = null;

    public static ExecutionEnvironment newInstance() {
        if (contextEnvironmentFactory != null) {
            return contextEnvironmentFactory.createExecutionEnvironment(new Configuration());
        }
        Configuration configuration = new Configuration();
        configuration.set(DeploymentOptions.TARGET, "local");
        configuration.set(DeploymentOptions.ATTACHED, true);
        return new ExecutionEnvironmentImpl(new DefaultExecutorServiceLoader(), configuration, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ExecutionEnvironmentImpl(PipelineExecutorServiceLoader pipelineExecutorServiceLoader, Configuration configuration, ClassLoader classLoader) {
        this.executorServiceLoader = (PipelineExecutorServiceLoader) Preconditions.checkNotNull(pipelineExecutorServiceLoader);
        this.configuration = configuration;
        this.executionConfig = new ExecutionConfig(this.configuration);
        this.checkpointCfg = new CheckpointConfig(this.configuration);
        this.userClassloader = classLoader == null ? getClass().getClassLoader() : classLoader;
        configure(configuration, this.userClassloader);
    }

    public void execute(String str) throws Exception {
        StreamGraph streamGraph = getStreamGraph();
        if (str != null) {
            streamGraph.setJobName(str);
        }
        execute(streamGraph);
    }

    public RuntimeExecutionMode getExecutionMode() {
        return (RuntimeExecutionMode) this.configuration.get(ExecutionOptions.RUNTIME_MODE);
    }

    public ExecutionEnvironment setExecutionMode(RuntimeExecutionMode runtimeExecutionMode) {
        Preconditions.checkNotNull(runtimeExecutionMode);
        this.configuration.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode);
        return this;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void initializeContextEnvironment(ExecutionEnvironmentFactory executionEnvironmentFactory) {
        contextEnvironmentFactory = executionEnvironmentFactory;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void resetContextEnvironment() {
        contextEnvironmentFactory = null;
    }

    public <OUT> NonKeyedPartitionStream<OUT> fromSource(Source<OUT> source, String str) {
        if (source instanceof WrappedSource) {
            org.apache.flink.api.connector.source.Source wrappedSource = ((WrappedSource) source).getWrappedSource();
            return new NonKeyedPartitionStreamImpl(this, new SourceTransformation(str, wrappedSource, WatermarkStrategy.noWatermarks(), getSourceTypeInfo(wrappedSource, str), getParallelism(), false));
        }
        if (!(source instanceof FromDataSource)) {
            throw new UnsupportedOperationException("Unsupported type of sink, you could use DataStreamV2SourceUtils to wrap a FLIP-27 based source.");
        }
        Collection data = ((FromDataSource) source).getData();
        TypeInformation extractTypeInfoFromCollection = extractTypeInfoFromCollection(data);
        return fromSource(new WrappedSource(new DataGeneratorSource(new FromElementsGeneratorFunction(extractTypeInfoFromCollection, this.executionConfig, data), data.size(), extractTypeInfoFromCollection)), "Collection Source");
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public int getParallelism() {
        return this.executionConfig.getParallelism();
    }

    public List<Transformation<?>> getTransformations() {
        return this.transformations;
    }

    public void setParallelism(int i) {
        this.executionConfig.setParallelism(i);
    }

    public CheckpointConfig getCheckpointCfg() {
        return this.checkpointCfg;
    }

    private static <OUT> TypeInformation<OUT> extractTypeInfoFromCollection(Collection<OUT> collection) {
        Preconditions.checkNotNull(collection, "Collection must not be null");
        if (collection.isEmpty()) {
            throw new IllegalArgumentException("Collection must not be empty");
        }
        OUT next = collection.iterator().next();
        if (next == null) {
            throw new IllegalArgumentException("Collection must not contain null elements");
        }
        try {
            return TypeExtractor.getForObject(next);
        } catch (Exception e) {
            throw new RuntimeException("Could not create TypeInformation for type " + next.getClass() + "; please specify the TypeInformation manually via the version of the method that explicitly accepts it as an argument.", e);
        }
    }

    private static <OUT, T extends TypeInformation<OUT>> T getSourceTypeInfo(org.apache.flink.api.connector.source.Source<OUT, ?, ?> source, String str) {
        MissingTypeInfo missingTypeInfo = null;
        if (source instanceof ResultTypeQueryable) {
            missingTypeInfo = ((ResultTypeQueryable) source).getProducedType();
        }
        if (missingTypeInfo == null) {
            try {
                missingTypeInfo = TypeExtractor.createTypeInfo(org.apache.flink.api.connector.source.Source.class, source.getClass(), 0, (TypeInformation) null, (TypeInformation) null);
            } catch (InvalidTypesException e) {
                missingTypeInfo = new MissingTypeInfo(str, e);
            }
        }
        return missingTypeInfo;
    }

    public void addOperator(Transformation<?> transformation) {
        Preconditions.checkNotNull(transformation, "transformation must not be null.");
        this.transformations.add(transformation);
    }

    private void execute(StreamGraph streamGraph) throws Exception {
        JobClient executeAsync = executeAsync(streamGraph);
        try {
            if (((Boolean) this.configuration.get(DeploymentOptions.ATTACHED)).booleanValue()) {
                executeAsync.getJobExecutionResult().get();
            }
        } catch (Throwable th) {
            ExceptionUtils.rethrowException(ExceptionUtils.stripExecutionException(th));
        }
    }

    private JobClient executeAsync(StreamGraph streamGraph) throws Exception {
        Preconditions.checkNotNull(streamGraph, "StreamGraph cannot be null.");
        try {
            return (JobClient) getPipelineExecutor().execute(streamGraph, this.configuration, getClass().getClassLoader()).get();
        } catch (ExecutionException e) {
            throw new FlinkException(String.format("Failed to execute job '%s'.", streamGraph.getJobName()), ExceptionUtils.stripExecutionException(e));
        }
    }

    public StreamGraph getStreamGraph() {
        StreamGraph generate = getStreamGraphGenerator(this.transformations).generate();
        this.transformations.clear();
        return generate;
    }

    private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> list) {
        if (list.size() <= 0) {
            throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
        }
        return new StreamGraphGenerator(new ArrayList(list), this.executionConfig, this.checkpointCfg, this.configuration).setTimeCharacteristic(StreamGraphGenerator.DEFAULT_TIME_CHARACTERISTIC);
    }

    private PipelineExecutor getPipelineExecutor() throws Exception {
        Preconditions.checkNotNull((String) this.configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");
        PipelineExecutorFactory executorFactory = this.executorServiceLoader.getExecutorFactory(this.configuration);
        Preconditions.checkNotNull(executorFactory, "Cannot find compatible factory for specified execution.target (=%s)", new Object[]{this.configuration.get(DeploymentOptions.TARGET)});
        return executorFactory.getExecutor(this.configuration);
    }

    private void configure(ReadableConfig readableConfig, ClassLoader classLoader) {
        this.configuration.addAll(Configuration.fromMap(readableConfig.toMap()));
        this.executionConfig.configure(readableConfig, classLoader);
        this.checkpointCfg.configure(readableConfig);
    }

    static {
        try {
            DataStreamV2SinkTransformationTranslator.registerSinkTransformationTranslator();
        } catch (Exception e) {
            throw new RuntimeException("Can not register process function transformation translator.", e);
        }
    }
}
