package org.apache.flink.api.java;

import com.esotericsoftware.kryo.Serializer;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.api.java.io.IteratorInputFormat;
import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
import org.apache.flink.api.java.io.PrimitiveInputFormat;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.io.TextValueInputFormat;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.api.java.utils.PlanGenerator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RedeploymentOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.NumberSequenceIterator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SplittableIterator;
import org.apache.flink.util.WrappingRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Public
/* loaded from: input_file:org/apache/flink/api/java/ExecutionEnvironment.class */
public class ExecutionEnvironment {
    protected static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
    private static ExecutionEnvironmentFactory contextEnvironmentFactory = null;
    private static final ThreadLocal<ExecutionEnvironmentFactory> threadLocalContextEnvironmentFactory = new ThreadLocal<>();
    private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
    private final List<DataSink<?>> sinks;
    private final List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile;
    private final ExecutionConfig config;
    protected JobExecutionResult lastJobExecutionResult;
    private boolean wasExecuted;
    private final PipelineExecutorServiceLoader executorServiceLoader;
    private final Configuration configuration;
    private final ClassLoader userClassloader;
    private final List<JobListener> jobListeners;

    @PublicEvolving
    public ExecutionEnvironment(Configuration configuration) {
        this(configuration, null);
    }

    @PublicEvolving
    public ExecutionEnvironment(Configuration configuration, ClassLoader classLoader) {
        this(new DefaultExecutorServiceLoader(), configuration, classLoader);
    }

    @PublicEvolving
    public ExecutionEnvironment(PipelineExecutorServiceLoader pipelineExecutorServiceLoader, Configuration configuration, ClassLoader classLoader) {
        this.sinks = new ArrayList();
        this.cacheFile = new ArrayList();
        this.config = new ExecutionConfig();
        this.wasExecuted = false;
        this.jobListeners = new ArrayList();
        this.executorServiceLoader = (PipelineExecutorServiceLoader) Preconditions.checkNotNull(pipelineExecutorServiceLoader);
        this.configuration = new Configuration((Configuration) Preconditions.checkNotNull(configuration));
        this.userClassloader = classLoader == null ? getClass().getClassLoader() : classLoader;
        configure(this.configuration, this.userClassloader);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ExecutionEnvironment() {
        this(new Configuration());
    }

    @Internal
    public ClassLoader getUserCodeClassLoader() {
        return this.userClassloader;
    }

    @Internal
    public PipelineExecutorServiceLoader getExecutorServiceLoader() {
        return this.executorServiceLoader;
    }

    @Internal
    public Configuration getConfiguration() {
        return this.configuration;
    }

    public ExecutionConfig getConfig() {
        return this.config;
    }

    protected List<JobListener> getJobListeners() {
        return this.jobListeners;
    }

    public int getParallelism() {
        return this.config.getParallelism();
    }

    public void setParallelism(int i) {
        this.config.setParallelism(i);
    }

    public RedeploymentOptions.ManagementStrategy getRedeploymentManagementStrategy() {
        return this.config.getRedeploymentManagementStrategy();
    }

    public void setRedeploymentManagementStrategy(RedeploymentOptions.ManagementStrategy managementStrategy) {
        this.config.setRedeploymentManagementStrategy(managementStrategy);
    }

    @PublicEvolving
    public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) {
        this.config.setRestartStrategy(restartStrategyConfiguration);
    }

    @PublicEvolving
    public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
        return this.config.getRestartStrategy();
    }

    @PublicEvolving
    @Deprecated
    public void setNumberOfExecutionRetries(int i) {
        this.config.setNumberOfExecutionRetries(i);
    }

    @PublicEvolving
    @Deprecated
    public int getNumberOfExecutionRetries() {
        return this.config.getNumberOfExecutionRetries();
    }

    public JobExecutionResult getLastJobExecutionResult() {
        return this.lastJobExecutionResult;
    }

    public <T extends Serializer<?> & Serializable> void addDefaultKryoSerializer(Class<?> cls, T t) {
        this.config.addDefaultKryoSerializer(cls, t);
    }

    public void addDefaultKryoSerializer(Class<?> cls, Class<? extends Serializer<?>> cls2) {
        this.config.addDefaultKryoSerializer(cls, cls2);
    }

    public <T extends Serializer<?> & Serializable> void registerTypeWithKryoSerializer(Class<?> cls, T t) {
        this.config.registerTypeWithKryoSerializer(cls, t);
    }

    public void registerTypeWithKryoSerializer(Class<?> cls, Class<? extends Serializer<?>> cls2) {
        this.config.registerTypeWithKryoSerializer(cls, cls2);
    }

    public void registerType(Class<?> cls) {
        if (cls == null) {
            throw new NullPointerException("Cannot register null type class.");
        }
        if (TypeExtractor.createTypeInfo(cls) instanceof PojoTypeInfo) {
            this.config.registerPojoType(cls);
        } else {
            this.config.registerKryoType(cls);
        }
    }

    @PublicEvolving
    public void configure(ReadableConfig readableConfig, ClassLoader classLoader) {
        readableConfig.getOptional(DeploymentOptions.JOB_LISTENERS).ifPresent(list -> {
            registerCustomListeners(classLoader, list);
        });
        readableConfig.getOptional(PipelineOptions.CACHED_FILES).ifPresent(list2 -> {
            this.cacheFile.clear();
            this.cacheFile.addAll(DistributedCache.parseCachedFilesFromString(list2));
        });
        readableConfig.getOptional(PipelineOptions.NAME).ifPresent(str -> {
            getConfiguration().set(PipelineOptions.NAME, str);
        });
        this.config.configure(readableConfig, classLoader);
    }

    private void registerCustomListeners(ClassLoader classLoader, List<String> list) {
        for (String str : list) {
            try {
                this.jobListeners.add((JobListener) InstantiationUtil.instantiate(str, JobListener.class, classLoader));
            } catch (FlinkException e) {
                throw new WrappingRuntimeException("Could not load JobListener : " + str, e);
            }
        }
    }

    public DataSource<String> readTextFile(String str) {
        Preconditions.checkNotNull(str, "The file path may not be null.");
        return new DataSource<>(this, new TextInputFormat(new Path(str)), BasicTypeInfo.STRING_TYPE_INFO, Utils.getCallLocationName());
    }

    public DataSource<String> readTextFile(String str, String str2) {
        Preconditions.checkNotNull(str, "The file path may not be null.");
        TextInputFormat textInputFormat = new TextInputFormat(new Path(str));
        textInputFormat.setCharsetName(str2);
        return new DataSource<>(this, textInputFormat, BasicTypeInfo.STRING_TYPE_INFO, Utils.getCallLocationName());
    }

    public DataSource<StringValue> readTextFileWithValue(String str) {
        Preconditions.checkNotNull(str, "The file path may not be null.");
        return new DataSource<>(this, new TextValueInputFormat(new Path(str)), new ValueTypeInfo(StringValue.class), Utils.getCallLocationName());
    }

    public DataSource<StringValue> readTextFileWithValue(String str, String str2, boolean z) {
        Preconditions.checkNotNull(str, "The file path may not be null.");
        TextValueInputFormat textValueInputFormat = new TextValueInputFormat(new Path(str));
        textValueInputFormat.setCharsetName(str2);
        textValueInputFormat.setSkipInvalidLines(z);
        return new DataSource<>(this, textValueInputFormat, new ValueTypeInfo(StringValue.class), Utils.getCallLocationName());
    }

    public <X> DataSource<X> readFileOfPrimitives(String str, Class<X> cls) {
        Preconditions.checkNotNull(str, "The file path may not be null.");
        return new DataSource<>(this, new PrimitiveInputFormat(new Path(str), cls), TypeExtractor.getForClass(cls), Utils.getCallLocationName());
    }

    public <X> DataSource<X> readFileOfPrimitives(String str, String str2, Class<X> cls) {
        Preconditions.checkNotNull(str, "The file path may not be null.");
        return new DataSource<>(this, new PrimitiveInputFormat(new Path(str), str2, cls), TypeExtractor.getForClass(cls), Utils.getCallLocationName());
    }

    public CsvReader readCsvFile(String str) {
        return new CsvReader(str, this);
    }

    public <X> DataSource<X> readFile(FileInputFormat<X> fileInputFormat, String str) {
        if (fileInputFormat == null) {
            throw new IllegalArgumentException("InputFormat must not be null.");
        }
        if (str == null) {
            throw new IllegalArgumentException("The file path must not be null.");
        }
        fileInputFormat.setFilePath(new Path(str));
        try {
            return createInput(fileInputFormat, TypeExtractor.getInputFormatTypes(fileInputFormat));
        } catch (Exception e) {
            throw new InvalidProgramException("The type returned by the input format could not be automatically determined. Please specify the TypeInformation of the produced type explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.");
        }
    }

    public <X> DataSource<X> createInput(InputFormat<X, ?> inputFormat) {
        if (inputFormat == null) {
            throw new IllegalArgumentException("InputFormat must not be null.");
        }
        try {
            return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat));
        } catch (Exception e) {
            throw new InvalidProgramException("The type returned by the input format could not be automatically determined. Please specify the TypeInformation of the produced type explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.", e);
        }
    }

    public <X> DataSource<X> createInput(InputFormat<X, ?> inputFormat, TypeInformation<X> typeInformation) {
        if (inputFormat == null) {
            throw new IllegalArgumentException("InputFormat must not be null.");
        }
        if (typeInformation == null) {
            throw new IllegalArgumentException("Produced type information must not be null.");
        }
        return new DataSource<>(this, inputFormat, typeInformation, Utils.getCallLocationName());
    }

    public <X> DataSource<X> fromCollection(Collection<X> collection) {
        if (collection == null) {
            throw new IllegalArgumentException("The data must not be null.");
        }
        if (collection.size() == 0) {
            throw new IllegalArgumentException("The size of the collection must not be empty.");
        }
        TypeInformation forObject = TypeExtractor.getForObject(collection.iterator().next());
        CollectionInputFormat.checkCollection(collection, forObject.getTypeClass());
        return new DataSource<>(this, new CollectionInputFormat(collection, forObject.createSerializer(this.config)), forObject, Utils.getCallLocationName());
    }

    public <X> DataSource<X> fromCollection(Collection<X> collection, TypeInformation<X> typeInformation) {
        return fromCollection(collection, typeInformation, Utils.getCallLocationName());
    }

    private <X> DataSource<X> fromCollection(Collection<X> collection, TypeInformation<X> typeInformation, String str) {
        CollectionInputFormat.checkCollection(collection, typeInformation.getTypeClass());
        return new DataSource<>(this, new CollectionInputFormat(collection, typeInformation.createSerializer(this.config)), typeInformation, str);
    }

    public <X> DataSource<X> fromCollection(Iterator<X> it, Class<X> cls) {
        return fromCollection(it, TypeExtractor.getForClass(cls));
    }

    public <X> DataSource<X> fromCollection(Iterator<X> it, TypeInformation<X> typeInformation) {
        return new DataSource<>(this, new IteratorInputFormat(it), typeInformation, Utils.getCallLocationName());
    }

    @SafeVarargs
    public final <X> DataSource<X> fromElements(X... xArr) {
        if (xArr == null) {
            throw new IllegalArgumentException("The data must not be null.");
        }
        if (xArr.length == 0) {
            throw new IllegalArgumentException("The number of elements must not be zero.");
        }
        try {
            return fromCollection(Arrays.asList(xArr), TypeExtractor.getForObject(xArr[0]), Utils.getCallLocationName());
        } catch (Exception e) {
            throw new RuntimeException("Could not create TypeInformation for type " + xArr[0].getClass().getName() + "; please specify the TypeInformation manually via ExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
        }
    }

    @SafeVarargs
    public final <X> DataSource<X> fromElements(Class<X> cls, X... xArr) {
        if (xArr == null) {
            throw new IllegalArgumentException("The data must not be null.");
        }
        if (xArr.length == 0) {
            throw new IllegalArgumentException("The number of elements must not be zero.");
        }
        try {
            return fromCollection(Arrays.asList(xArr), TypeExtractor.getForClass(cls), Utils.getCallLocationName());
        } catch (Exception e) {
            throw new RuntimeException("Could not create TypeInformation for type " + cls.getName() + "; please specify the TypeInformation manually via ExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
        }
    }

    public <X> DataSource<X> fromParallelCollection(SplittableIterator<X> splittableIterator, Class<X> cls) {
        return fromParallelCollection(splittableIterator, TypeExtractor.getForClass(cls));
    }

    public <X> DataSource<X> fromParallelCollection(SplittableIterator<X> splittableIterator, TypeInformation<X> typeInformation) {
        return fromParallelCollection(splittableIterator, typeInformation, Utils.getCallLocationName());
    }

    private <X> DataSource<X> fromParallelCollection(SplittableIterator<X> splittableIterator, TypeInformation<X> typeInformation, String str) {
        return new DataSource<>(this, new ParallelIteratorInputFormat(splittableIterator), typeInformation, str);
    }

    public DataSource<Long> generateSequence(long j, long j2) {
        return fromParallelCollection(new NumberSequenceIterator(j, j2), BasicTypeInfo.LONG_TYPE_INFO, Utils.getCallLocationName());
    }

    public JobExecutionResult execute() throws Exception {
        return execute(getJobName());
    }

    public JobExecutionResult execute(String str) throws Exception {
        JobClient executeAsync = executeAsync(str);
        try {
            if (this.configuration.getBoolean(DeploymentOptions.ATTACHED)) {
                this.lastJobExecutionResult = (JobExecutionResult) executeAsync.getJobExecutionResult().get();
            } else {
                this.lastJobExecutionResult = new DetachedJobExecutionResult(executeAsync.getJobID());
            }
            this.jobListeners.forEach(jobListener -> {
                jobListener.onJobExecuted(this.lastJobExecutionResult, (Throwable) null);
            });
        } catch (Throwable th) {
            Throwable stripExecutionException = ExceptionUtils.stripExecutionException(th);
            this.jobListeners.forEach(jobListener2 -> {
                jobListener2.onJobExecuted((JobExecutionResult) null, stripExecutionException);
            });
            ExceptionUtils.rethrowException(stripExecutionException);
        }
        return this.lastJobExecutionResult;
    }

    @PublicEvolving
    public void registerJobListener(JobListener jobListener) {
        Preconditions.checkNotNull(jobListener, "JobListener cannot be null");
        this.jobListeners.add(jobListener);
    }

    @PublicEvolving
    public void clearJobListeners() {
        this.jobListeners.clear();
    }

    @PublicEvolving
    public final JobClient executeAsync() throws Exception {
        return executeAsync(getJobName());
    }

    @PublicEvolving
    public JobClient executeAsync(String str) throws Exception {
        Preconditions.checkNotNull(this.configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");
        Plan createProgramPlan = createProgramPlan(str);
        PipelineExecutorFactory executorFactory = this.executorServiceLoader.getExecutorFactory(this.configuration);
        Preconditions.checkNotNull(executorFactory, "Cannot find compatible factory for specified execution.target (=%s)", new Object[]{this.configuration.get(DeploymentOptions.TARGET)});
        try {
            JobClient jobClient = (JobClient) executorFactory.getExecutor(this.configuration).execute(createProgramPlan, this.configuration, this.userClassloader).get();
            this.jobListeners.forEach(jobListener -> {
                jobListener.onJobSubmitted(jobClient, (Throwable) null);
            });
            return jobClient;
        } catch (Throwable th) {
            this.jobListeners.forEach(jobListener2 -> {
                jobListener2.onJobSubmitted((JobClient) null, th);
            });
            ExceptionUtils.rethrow(th);
            return null;
        }
    }

    public String getExecutionPlan() throws Exception {
        return ExecutionPlanUtil.getExecutionPlanAsJSON(createProgramPlan(getJobName(), false));
    }

    public void registerCachedFile(String str, String str2) {
        registerCachedFile(str, str2, false);
    }

    public void registerCachedFile(String str, String str2, boolean z) {
        this.cacheFile.add(new Tuple2<>(str2, new DistributedCache.DistributedCacheEntry(str, Boolean.valueOf(z))));
    }

    @Internal
    public Plan createProgramPlan() {
        return createProgramPlan(getJobName());
    }

    @Internal
    public Plan createProgramPlan(String str) {
        return createProgramPlan(str, true);
    }

    @Internal
    public Plan createProgramPlan(String str, boolean z) {
        Preconditions.checkNotNull(str);
        if (this.sinks.isEmpty()) {
            if (this.wasExecuted) {
                throw new RuntimeException("No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.");
            }
            throw new RuntimeException("No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.");
        }
        Plan generate = new PlanGenerator(this.sinks, this.config, getParallelism(), this.cacheFile, str).generate();
        if (z) {
            this.sinks.clear();
            this.wasExecuted = true;
        }
        return generate;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Internal
    public void registerDataSink(DataSink<?> dataSink) {
        this.sinks.add(dataSink);
    }

    private String getJobName() {
        return this.configuration.getString(PipelineOptions.NAME, "Flink Java Job at " + Calendar.getInstance().getTime());
    }

    public static ExecutionEnvironment getExecutionEnvironment() {
        return (ExecutionEnvironment) Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory).map((v0) -> {
            return v0.createExecutionEnvironment();
        }).orElseGet(ExecutionEnvironment::createLocalEnvironment);
    }

    @PublicEvolving
    public static CollectionEnvironment createCollectionsEnvironment() {
        CollectionEnvironment collectionEnvironment = new CollectionEnvironment();
        collectionEnvironment.setParallelism(1);
        return collectionEnvironment;
    }

    public static LocalEnvironment createLocalEnvironment() {
        return createLocalEnvironment(defaultLocalDop);
    }

    public static LocalEnvironment createLocalEnvironment(int i) {
        return createLocalEnvironment(new Configuration(), i);
    }

    public static LocalEnvironment createLocalEnvironment(Configuration configuration) {
        return createLocalEnvironment(configuration, -1);
    }

    @PublicEvolving
    public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration configuration) {
        Preconditions.checkNotNull(configuration, "conf");
        if (!configuration.contains(RestOptions.PORT)) {
            configuration.setInteger(RestOptions.PORT, ((Integer) RestOptions.PORT.defaultValue()).intValue());
        }
        return createLocalEnvironment(configuration, -1);
    }

    private static LocalEnvironment createLocalEnvironment(Configuration configuration, int i) {
        LocalEnvironment localEnvironment = new LocalEnvironment(configuration);
        if (i > 0) {
            localEnvironment.setParallelism(i);
        }
        return localEnvironment;
    }

    public static ExecutionEnvironment createRemoteEnvironment(String str, int i, String... strArr) {
        return new RemoteEnvironment(str, i, strArr);
    }

    public static ExecutionEnvironment createRemoteEnvironment(String str, int i, Configuration configuration, String... strArr) {
        return new RemoteEnvironment(str, i, configuration, strArr, null);
    }

    public static ExecutionEnvironment createRemoteEnvironment(String str, int i, int i2, String... strArr) {
        RemoteEnvironment remoteEnvironment = new RemoteEnvironment(str, i, strArr);
        remoteEnvironment.setParallelism(i2);
        return remoteEnvironment;
    }

    public static int getDefaultLocalParallelism() {
        return defaultLocalDop;
    }

    public static void setDefaultLocalParallelism(int i) {
        defaultLocalDop = i;
    }

    protected static void initializeContextEnvironment(ExecutionEnvironmentFactory executionEnvironmentFactory) {
        contextEnvironmentFactory = (ExecutionEnvironmentFactory) Preconditions.checkNotNull(executionEnvironmentFactory);
        threadLocalContextEnvironmentFactory.set(executionEnvironmentFactory);
    }

    protected static void resetContextEnvironment() {
        contextEnvironmentFactory = null;
        threadLocalContextEnvironmentFactory.remove();
    }

    @Internal
    public static boolean areExplicitEnvironmentsAllowed() {
        return contextEnvironmentFactory == null && threadLocalContextEnvironmentFactory.get() == null;
    }
}
