package org.apache.flink.streaming.api.graph;

import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.util.CorruptConfigurationException;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.rescale.RuntimeRescaleStorage;
import org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
import org.apache.flink.util.ClassLoaderUtil;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TernaryBoolean;
import org.apache.flink.util.concurrent.FutureUtils;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamConfig.class */
public class StreamConfig implements Serializable {
    private static final long serialVersionUID = 1;
    public static final String SERIALIZED_UDF = "serializedUDF";
    public static final String SERIALIZED_UDF_CLASS = "serializedUdfClass";
    public static final String BROADCAST_JOIN = "broadcastJoin";
    private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
    private static final String NUMBER_OF_NETWORK_INPUTS = "numberOfNetworkInputs";
    private static final String CHAINED_OUTPUTS = "chainedOutputs";
    private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_";
    private static final String IS_CHAINED_VERTEX = "isChainedSubtask";
    private static final String CHAIN_INDEX = "chainIndex";
    private static final String VERTEX_NAME = "vertexID";
    private static final String ITERATION_ID = "iterationId";
    private static final String INPUTS = "inputs";
    private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out";
    private static final String TYPE_SERIALIZER_SIDEOUT_PREFIX = "typeSerializer_sideout_";
    private static final String ITERATON_WAIT = "iterationWait";
    private static final String OP_NONCHAINED_OUTPUTS = "opNonChainedOutputs";
    private static final String VERTEX_NONCHAINED_OUTPUTS = "vertexNonChainedOutputs";
    private static final String IN_STREAM_EDGES = "inStreamEdges";
    private static final String OPERATOR_NAME = "operatorName";
    private static final String OPERATOR_ID = "operatorID";
    private static final String CHAIN_END = "chainEnd";
    private static final String GRAPH_CONTAINING_LOOPS = "graphContainingLoops";
    private static final String CHECKPOINTING_ENABLED = "checkpointing";
    private static final String CHECKPOINT_MODE = "checkpointMode";
    private static final String SAVEPOINT_DIR = "savepointdir";
    private static final String CHECKPOINT_STORAGE = "checkpointstorage";
    public static final String RUNTIME_RESCALING_ENABLED = "runtimerescalingenabled";
    public static final String RUNTIME_RESCALE_DIR = "runtimerescaledir";
    public static final String RUNTIME_RESCALE_STORAGE = "runtimerescalestorage";
    private static final String STATE_BACKEND = "statebackend";
    private static final String ENABLE_CHANGE_LOG_STATE_BACKEND = "enablechangelog";
    private static final String TIMER_SERVICE_PROVIDER = "timerservice";
    private static final String STATE_PARTITIONER = "statePartitioner";
    private static final String STATE_KEY_SERIALIZER = "statekeyser";
    private static final String TIME_CHARACTERISTIC = "timechar";
    private static final String MANAGED_MEMORY_FRACTION_PREFIX = "managedMemFraction.";
    private static final ConfigOption<Boolean> STATE_BACKEND_USE_MANAGED_MEMORY = ConfigOptions.key("statebackend.useManagedMemory").booleanType().noDefaultValue().withDescription("If state backend is specified, whether it uses managed memory.");
    private static final CheckpointingMode DEFAULT_CHECKPOINTING_MODE = CheckpointingMode.EXACTLY_ONCE;
    private static final double DEFAULT_MANAGED_MEMORY_FRACTION = 0.0d;
    private final Configuration config;
    private final transient Map<String, Object> toBeSerializedConfigObjects = new HashMap();
    private final transient Map<Integer, CompletableFuture<StreamConfig>> chainedTaskFutures = new HashMap();
    private final transient CompletableFuture<StreamConfig> serializationFuture = new CompletableFuture<>();
    private final Set<String> removedKeys = new HashSet();

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamConfig$InputConfig.class */
    public interface InputConfig extends Serializable {
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamConfig$InputRequirement.class */
    public enum InputRequirement {
        SORTED,
        PASS_THROUGH
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamConfig$NetworkInputConfig.class */
    public static class NetworkInputConfig implements InputConfig {
        private final TypeSerializer<?> typeSerializer;
        private final InputRequirement inputRequirement;
        private int inputGateIndex;

        public NetworkInputConfig(TypeSerializer<?> typeSerializer, int i) {
            this(typeSerializer, i, InputRequirement.PASS_THROUGH);
        }

        public NetworkInputConfig(TypeSerializer<?> typeSerializer, int i, InputRequirement inputRequirement) {
            this.typeSerializer = typeSerializer;
            this.inputGateIndex = i;
            this.inputRequirement = inputRequirement;
        }

        public TypeSerializer<?> getTypeSerializer() {
            return this.typeSerializer;
        }

        public int getInputGateIndex() {
            return this.inputGateIndex;
        }

        public InputRequirement getInputRequirement() {
            return this.inputRequirement;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamConfig$SourceInputConfig.class */
    public static class SourceInputConfig implements InputConfig {
        private final StreamEdge inputEdge;

        public SourceInputConfig(StreamEdge streamEdge) {
            this.inputEdge = streamEdge;
        }

        public StreamEdge getInputEdge() {
            return this.inputEdge;
        }

        public String toString() {
            return this.inputEdge.toString();
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (obj instanceof SourceInputConfig) {
                return Objects.equals(((SourceInputConfig) obj).inputEdge, this.inputEdge);
            }
            return false;
        }

        public int hashCode() {
            return this.inputEdge.hashCode();
        }
    }

    public StreamConfig(Configuration configuration) {
        this.config = configuration;
    }

    public Configuration getConfiguration() {
        return this.config;
    }

    public CompletableFuture<StreamConfig> getSerializationFuture() {
        return this.serializationFuture;
    }

    public CompletableFuture<StreamConfig> triggerSerializationAndReturnFuture(Executor executor) {
        FutureUtils.combineAll(this.chainedTaskFutures.values()).thenAcceptAsync(collection -> {
            try {
                serializeAllConfigs();
                InstantiationUtil.writeObjectToConfig(collection.stream().collect(Collectors.toMap((v0) -> {
                    return v0.getVertexID();
                }, Function.identity())), this.config, CHAINED_TASK_CONFIG);
                this.serializationFuture.complete(this);
            } catch (Throwable th) {
                this.serializationFuture.completeExceptionally(th);
            }
        }, executor);
        return this.serializationFuture;
    }

    public void serializeAllConfigs() {
        this.toBeSerializedConfigObjects.forEach((str, obj) -> {
            try {
                InstantiationUtil.writeObjectToConfig(obj, this.config, str);
            } catch (IOException e) {
                throw new StreamTaskException(String.format("Could not serialize object for key %s.", str), e);
            }
        });
    }

    @VisibleForTesting
    public void setAndSerializeTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> map) {
        try {
            InstantiationUtil.writeObjectToConfig(map, this.config, CHAINED_TASK_CONFIG);
        } catch (IOException e) {
            throw new StreamTaskException("Could not serialize object for key chained task config.", e);
        }
    }

    public void setVertexID(Integer num) {
        this.config.setInteger(VERTEX_NAME, num.intValue());
    }

    public Integer getVertexID() {
        return Integer.valueOf(this.config.getInteger(VERTEX_NAME, -1));
    }

    public void setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase managedMemoryUseCase, double d) {
        ConfigOption<Double> managedMemoryFractionConfigOption = getManagedMemoryFractionConfigOption(managedMemoryUseCase);
        Preconditions.checkArgument(d >= DEFAULT_MANAGED_MEMORY_FRACTION && d <= 1.0d, String.format("%s should be in range [0.0, 1.0], but was: %s", managedMemoryFractionConfigOption.key(), Double.valueOf(d)));
        this.config.set(managedMemoryFractionConfigOption, Double.valueOf(d));
    }

    public double getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase managedMemoryUseCase, Configuration configuration, Configuration configuration2, ClassLoader classLoader) {
        return ManagedMemoryUtils.convertToFractionOfSlot(managedMemoryUseCase, ((Double) this.config.get(getManagedMemoryFractionConfigOption(managedMemoryUseCase))).doubleValue(), getAllManagedMemoryUseCases(), configuration, configuration2, this.config.getOptional(STATE_BACKEND_USE_MANAGED_MEMORY), classLoader);
    }

    private static ConfigOption<Double> getManagedMemoryFractionConfigOption(ManagedMemoryUseCase managedMemoryUseCase) {
        return ConfigOptions.key("managedMemFraction." + Preconditions.checkNotNull(managedMemoryUseCase)).doubleType().defaultValue(Double.valueOf(DEFAULT_MANAGED_MEMORY_FRACTION));
    }

    private Set<ManagedMemoryUseCase> getAllManagedMemoryUseCases() {
        return (Set) this.config.keySet().stream().filter(str -> {
            return str.startsWith(MANAGED_MEMORY_FRACTION_PREFIX);
        }).map(str2 -> {
            return ManagedMemoryUseCase.valueOf(str2.replaceFirst(MANAGED_MEMORY_FRACTION_PREFIX, ""));
        }).collect(Collectors.toSet());
    }

    public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
        this.config.setInteger(TIME_CHARACTERISTIC, timeCharacteristic.ordinal());
    }

    public TimeCharacteristic getTimeCharacteristic() {
        int integer = this.config.getInteger(TIME_CHARACTERISTIC, -1);
        if (integer >= 0) {
            return TimeCharacteristic.values()[integer];
        }
        throw new CorruptConfigurationException("time characteristic is not set");
    }

    public void setTypeSerializerOut(TypeSerializer<?> typeSerializer) {
        setTypeSerializer(TYPE_SERIALIZER_OUT_1, typeSerializer);
    }

    public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader classLoader) {
        try {
            return (TypeSerializer) InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, classLoader);
        } catch (Exception e) {
            throw new StreamTaskException("Could not instantiate serializer.", e);
        }
    }

    public void setTypeSerializerSideOut(OutputTag<?> outputTag, TypeSerializer<?> typeSerializer) {
        setTypeSerializer("typeSerializer_sideout_" + outputTag.getId(), typeSerializer);
    }

    private void setTypeSerializer(String str, TypeSerializer<?> typeSerializer) {
        this.toBeSerializedConfigObjects.put(str, typeSerializer);
    }

    public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> outputTag, ClassLoader classLoader) {
        Preconditions.checkNotNull(outputTag, "Side output id must not be null.");
        try {
            return (TypeSerializer) InstantiationUtil.readObjectFromConfig(this.config, "typeSerializer_sideout_" + outputTag.getId(), classLoader);
        } catch (Exception e) {
            throw new StreamTaskException("Could not instantiate serializer.", e);
        }
    }

    public void setupNetworkInputs(TypeSerializer<?>... typeSerializerArr) {
        InputConfig[] inputConfigArr = new InputConfig[typeSerializerArr.length];
        for (int i = 0; i < typeSerializerArr.length; i++) {
            inputConfigArr[i] = new NetworkInputConfig(typeSerializerArr[i], i, InputRequirement.PASS_THROUGH);
        }
        setInputs(inputConfigArr);
    }

    public void setInputs(InputConfig... inputConfigArr) {
        this.toBeSerializedConfigObjects.put(INPUTS, inputConfigArr);
    }

    public InputConfig[] getInputs(ClassLoader classLoader) {
        try {
            InputConfig[] inputConfigArr = (InputConfig[]) InstantiationUtil.readObjectFromConfig(this.config, INPUTS, classLoader);
            return inputConfigArr == null ? new InputConfig[0] : inputConfigArr;
        } catch (Exception e) {
            throw new StreamTaskException("Could not deserialize inputs", e);
        }
    }

    @Deprecated
    public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader classLoader) {
        return getTypeSerializerIn(0, classLoader);
    }

    @Deprecated
    public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader classLoader) {
        return getTypeSerializerIn(1, classLoader);
    }

    public <T> TypeSerializer<T> getTypeSerializerIn(int i, ClassLoader classLoader) {
        InputConfig[] inputs = getInputs(classLoader);
        Preconditions.checkState(i < inputs.length);
        Preconditions.checkState(inputs[i] instanceof NetworkInputConfig, "Input [%s] was assumed to be network input", new Object[]{Integer.valueOf(i)});
        return (TypeSerializer<T>) ((NetworkInputConfig) inputs[i]).typeSerializer;
    }

    @VisibleForTesting
    public void setStreamOperator(StreamOperator<?> streamOperator) {
        setStreamOperatorFactory(SimpleOperatorFactory.of(streamOperator));
    }

    public void setStreamOperatorFactory(StreamOperatorFactory<?> streamOperatorFactory) {
        if (streamOperatorFactory != null) {
            this.toBeSerializedConfigObjects.put(SERIALIZED_UDF, streamOperatorFactory);
            this.toBeSerializedConfigObjects.put(SERIALIZED_UDF_CLASS, streamOperatorFactory.getClass());
        }
    }

    @VisibleForTesting
    public <T extends StreamOperator<?>> T getStreamOperator(ClassLoader classLoader) {
        return (T) ((SimpleOperatorFactory) getStreamOperatorFactory(classLoader)).getOperator();
    }

    public <T extends StreamOperatorFactory<?>> T getStreamOperatorFactory(ClassLoader classLoader) {
        try {
            Preconditions.checkState(!this.removedKeys.contains(SERIALIZED_UDF), String.format("%s has been removed.", SERIALIZED_UDF));
            return (T) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZED_UDF, classLoader);
        } catch (ClassNotFoundException e) {
            throw new StreamTaskException("Cannot load user class: " + e.getMessage() + "\nClassLoader info: " + ClassLoaderUtil.getUserCodeClassLoaderInfo(classLoader) + (ClassLoaderUtil.validateClassLoadable(e, classLoader) ? "\nClass was actually found in classloader - deserialization issue." : "\nClass not resolvable through given classloader."), e);
        } catch (Exception e2) {
            throw new StreamTaskException("Cannot instantiate user function.", e2);
        }
    }

    public <T extends StreamOperatorFactory<?>> Class<T> getStreamOperatorFactoryClass(ClassLoader classLoader) {
        try {
            return (Class) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZED_UDF_CLASS, classLoader);
        } catch (Exception e) {
            throw new StreamTaskException("Could not instantiate serialized udf class.", e);
        }
    }

    public void setIterationId(String str) {
        this.config.setString(ITERATION_ID, str);
    }

    public String getIterationId() {
        return this.config.getString(ITERATION_ID, "");
    }

    public void setIterationWaitTime(long j) {
        this.config.setLong(ITERATON_WAIT, j);
    }

    public long getIterationWaitTime() {
        return this.config.getLong(ITERATON_WAIT, 0L);
    }

    public void setNumberOfNetworkInputs(int i) {
        this.config.setInteger(NUMBER_OF_NETWORK_INPUTS, i);
    }

    public int getNumberOfNetworkInputs() {
        return this.config.getInteger(NUMBER_OF_NETWORK_INPUTS, 0);
    }

    public void setNumberOfOutputs(int i) {
        this.config.setInteger(NUMBER_OF_OUTPUTS, i);
    }

    public int getNumberOfOutputs() {
        return this.config.getInteger(NUMBER_OF_OUTPUTS, 0);
    }

    public void setOperatorNonChainedOutputs(List<NonChainedOutput> list) {
        this.toBeSerializedConfigObjects.put(OP_NONCHAINED_OUTPUTS, list);
    }

    public List<NonChainedOutput> getOperatorNonChainedOutputs(ClassLoader classLoader) {
        try {
            List<NonChainedOutput> list = (List) InstantiationUtil.readObjectFromConfig(this.config, OP_NONCHAINED_OUTPUTS, classLoader);
            return list == null ? new ArrayList() : list;
        } catch (Exception e) {
            throw new StreamTaskException("Could not instantiate non chained outputs.", e);
        }
    }

    public void setChainedOutputs(List<StreamEdge> list) {
        this.toBeSerializedConfigObjects.put(CHAINED_OUTPUTS, list);
    }

    public List<StreamEdge> getChainedOutputs(ClassLoader classLoader) {
        try {
            List<StreamEdge> list = (List) InstantiationUtil.readObjectFromConfig(this.config, CHAINED_OUTPUTS, classLoader);
            return list == null ? new ArrayList() : list;
        } catch (Exception e) {
            throw new StreamTaskException("Could not instantiate chained outputs.", e);
        }
    }

    public void setInPhysicalEdges(List<StreamEdge> list) {
        this.toBeSerializedConfigObjects.put(IN_STREAM_EDGES, list);
    }

    public List<StreamEdge> getInPhysicalEdges(ClassLoader classLoader) {
        try {
            List<StreamEdge> list = (List) InstantiationUtil.readObjectFromConfig(this.config, IN_STREAM_EDGES, classLoader);
            return list == null ? new ArrayList() : list;
        } catch (Exception e) {
            throw new StreamTaskException("Could not instantiate inputs.", e);
        }
    }

    public void setBroadcastJoin(boolean z) {
        this.config.setBoolean(BROADCAST_JOIN, z);
    }

    public void setCheckpointingEnabled(boolean z) {
        this.config.setBoolean(CHECKPOINTING_ENABLED, z);
    }

    public boolean isCheckpointingEnabled() {
        return this.config.getBoolean(CHECKPOINTING_ENABLED, false);
    }

    public void setCheckpointMode(CheckpointingMode checkpointingMode) {
        this.config.setInteger(CHECKPOINT_MODE, checkpointingMode.ordinal());
    }

    public CheckpointingMode getCheckpointMode() {
        int integer = this.config.getInteger(CHECKPOINT_MODE, -1);
        return integer >= 0 ? CheckpointingMode.values()[integer] : DEFAULT_CHECKPOINTING_MODE;
    }

    public void setUnalignedCheckpointsEnabled(boolean z) {
        this.config.set(CheckpointingOptions.ENABLE_UNALIGNED, Boolean.valueOf(z));
    }

    public boolean isUnalignedCheckpointsEnabled() {
        return ((Boolean) this.config.get(CheckpointingOptions.ENABLE_UNALIGNED, false)).booleanValue();
    }

    public void setUnalignedCheckpointsSplittableTimersEnabled(boolean z) {
        this.config.setBoolean(CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS, z);
    }

    public boolean isUnalignedCheckpointsSplittableTimersEnabled() {
        return ((Boolean) this.config.get(CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS)).booleanValue();
    }

    public boolean isExactlyOnceCheckpointMode() {
        return getCheckpointMode() == CheckpointingMode.EXACTLY_ONCE;
    }

    public Duration getAlignedCheckpointTimeout() {
        return (Duration) this.config.get(CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT);
    }

    public void setAlignedCheckpointTimeout(Duration duration) {
        this.config.set(CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, duration);
    }

    public void setMaxConcurrentCheckpoints(int i) {
        this.config.set(CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, Integer.valueOf(i));
    }

    public int getMaxConcurrentCheckpoints() {
        return ((Integer) this.config.get(CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, (Integer) CheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS.defaultValue())).intValue();
    }

    public int getMaxSubtasksPerChannelStateFile() {
        return ((Integer) this.config.get(CheckpointingOptions.UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE)).intValue();
    }

    public void setMaxSubtasksPerChannelStateFile(int i) {
        this.config.set(CheckpointingOptions.UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE, Integer.valueOf(i));
    }

    public void setVertexNonChainedOutputs(List<NonChainedOutput> list) {
        this.toBeSerializedConfigObjects.put(VERTEX_NONCHAINED_OUTPUTS, list);
    }

    public List<NonChainedOutput> getVertexNonChainedOutputs(ClassLoader classLoader) {
        try {
            List<NonChainedOutput> list = (List) InstantiationUtil.readObjectFromConfig(this.config, VERTEX_NONCHAINED_OUTPUTS, classLoader);
            return list == null ? new ArrayList() : list;
        } catch (Exception e) {
            throw new StreamTaskException("Could not instantiate outputs in order.", e);
        }
    }

    public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> map) {
        if (map != null) {
            map.forEach((num, streamConfig) -> {
                this.chainedTaskFutures.put(num, streamConfig.getSerializationFuture());
            });
        }
    }

    public Map<Integer, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader classLoader) {
        try {
            Preconditions.checkState(!this.removedKeys.contains(CHAINED_TASK_CONFIG), String.format("%s has been removed.", CHAINED_TASK_CONFIG));
            Map<Integer, StreamConfig> map = (Map) InstantiationUtil.readObjectFromConfig(this.config, CHAINED_TASK_CONFIG, classLoader);
            return map == null ? new HashMap() : map;
        } catch (Exception e) {
            throw new StreamTaskException("Could not instantiate configuration.", e);
        }
    }

    public Map<Integer, StreamConfig> getTransitiveChainedTaskConfigsWithSelf(ClassLoader classLoader) {
        Map<Integer, StreamConfig> transitiveChainedTaskConfigs = getTransitiveChainedTaskConfigs(classLoader);
        transitiveChainedTaskConfigs.put(getVertexID(), this);
        return transitiveChainedTaskConfigs;
    }

    public void setOperatorID(OperatorID operatorID) {
        this.config.setBytes(OPERATOR_ID, operatorID.getBytes());
    }

    public OperatorID getOperatorID() {
        return new OperatorID((byte[]) Preconditions.checkNotNull(this.config.getBytes(OPERATOR_ID, (byte[]) null)));
    }

    public void setOperatorName(String str) {
        this.config.setString(OPERATOR_NAME, str);
    }

    public String getOperatorName() {
        return this.config.getString(OPERATOR_NAME, (String) null);
    }

    public void setChainIndex(int i) {
        this.config.setInteger(CHAIN_INDEX, i);
    }

    public int getChainIndex() {
        return this.config.getInteger(CHAIN_INDEX, 0);
    }

    public void setStateBackend(StateBackend stateBackend) {
        if (stateBackend != null) {
            this.toBeSerializedConfigObjects.put(STATE_BACKEND, stateBackend);
            setStateBackendUsesManagedMemory(stateBackend.useManagedMemory());
        }
    }

    public void setChangelogStateBackendEnabled(TernaryBoolean ternaryBoolean) {
        this.toBeSerializedConfigObjects.put(ENABLE_CHANGE_LOG_STATE_BACKEND, ternaryBoolean);
    }

    public void setStateBackendUsesManagedMemory(boolean z) {
        this.config.set(STATE_BACKEND_USE_MANAGED_MEMORY, Boolean.valueOf(z));
    }

    public StateBackend getStateBackend(ClassLoader classLoader) {
        try {
            return (StateBackend) InstantiationUtil.readObjectFromConfig(this.config, STATE_BACKEND, classLoader);
        } catch (Exception e) {
            throw new StreamTaskException("Could not instantiate statehandle provider.", e);
        }
    }

    public TernaryBoolean isChangelogStateBackendEnabled(ClassLoader classLoader) {
        try {
            return (TernaryBoolean) InstantiationUtil.readObjectFromConfig(this.config, ENABLE_CHANGE_LOG_STATE_BACKEND, classLoader);
        } catch (Exception e) {
            throw new StreamTaskException("Could not instantiate change log state backend enable flag.", e);
        }
    }

    public void setCheckpointStorage(CheckpointStorage checkpointStorage) {
        if (checkpointStorage != null) {
            this.toBeSerializedConfigObjects.put(CHECKPOINT_STORAGE, checkpointStorage);
        }
    }

    public CheckpointStorage getCheckpointStorage(ClassLoader classLoader) {
        try {
            return (CheckpointStorage) InstantiationUtil.readObjectFromConfig(this.config, CHECKPOINT_STORAGE, classLoader);
        } catch (Exception e) {
            throw new StreamTaskException("Could not instantiate checkpoint storage.", e);
        }
    }

    public void setRuntimeRescalingEnabled(boolean z) {
        this.config.setBoolean(RUNTIME_RESCALING_ENABLED, z);
    }

    public boolean isRuntimeRescalingEnabled() {
        return this.config.getBoolean(RUNTIME_RESCALING_ENABLED, false);
    }

    public void setRuntimeRescaleDir(Path path) {
        if (path != null) {
            this.toBeSerializedConfigObjects.put(RUNTIME_RESCALE_DIR, path);
        }
    }

    public Path getRuntimeRescaleDir(ClassLoader classLoader) {
        try {
            return (Path) InstantiationUtil.readObjectFromConfig(this.config, RUNTIME_RESCALE_DIR, classLoader);
        } catch (Exception e) {
            throw new StreamTaskException("Could not instantiate runtime rescale directory.", e);
        }
    }

    public void setRuntimeRescaleStorage(RuntimeRescaleStorage runtimeRescaleStorage) {
        if (runtimeRescaleStorage != null) {
            this.toBeSerializedConfigObjects.put(RUNTIME_RESCALE_STORAGE, runtimeRescaleStorage);
        }
    }

    public RuntimeRescaleStorage getRuntimeRescaleStorage(ClassLoader classLoader) {
        try {
            return (RuntimeRescaleStorage) InstantiationUtil.readObjectFromConfig(this.config, RUNTIME_RESCALE_STORAGE, classLoader);
        } catch (Exception e) {
            throw new StreamTaskException("Could not instantiate runtime rescale storage.", e);
        }
    }

    public void setTimerServiceProvider(InternalTimeServiceManager.Provider provider) {
        if (provider != null) {
            this.toBeSerializedConfigObjects.put(TIMER_SERVICE_PROVIDER, provider);
        }
    }

    public InternalTimeServiceManager.Provider getTimerServiceProvider(ClassLoader classLoader) {
        try {
            return (InternalTimeServiceManager.Provider) InstantiationUtil.readObjectFromConfig(this.config, TIMER_SERVICE_PROVIDER, classLoader);
        } catch (Exception e) {
            throw new StreamTaskException("Could not instantiate timer service provider.", e);
        }
    }

    public void setStatePartitioner(int i, KeySelector<?, ?> keySelector) {
        this.toBeSerializedConfigObjects.put("statePartitioner" + i, keySelector);
    }

    public <IN, K extends Serializable> KeySelector<IN, K> getStatePartitioner(int i, ClassLoader classLoader) {
        try {
            return (KeySelector) InstantiationUtil.readObjectFromConfig(this.config, "statePartitioner" + i, classLoader);
        } catch (Exception e) {
            throw new StreamTaskException("Could not instantiate state partitioner.", e);
        }
    }

    public void setStateKeySerializer(TypeSerializer<?> typeSerializer) {
        this.toBeSerializedConfigObjects.put(STATE_KEY_SERIALIZER, typeSerializer);
    }

    public <K> TypeSerializer<K> getStateKeySerializer(ClassLoader classLoader) {
        try {
            return (TypeSerializer) InstantiationUtil.readObjectFromConfig(this.config, STATE_KEY_SERIALIZER, classLoader);
        } catch (Exception e) {
            throw new StreamTaskException("Could not instantiate state key serializer from task config.", e);
        }
    }

    public void setChainStart() {
        this.config.setBoolean(IS_CHAINED_VERTEX, true);
    }

    public boolean isChainStart() {
        return this.config.getBoolean(IS_CHAINED_VERTEX, false);
    }

    public void setChainEnd() {
        this.config.setBoolean(CHAIN_END, true);
    }

    public boolean isChainEnd() {
        return this.config.getBoolean(CHAIN_END, false);
    }

    public String toString() {
        ClassLoader classLoader = getClass().getClassLoader();
        StringBuilder sb = new StringBuilder();
        sb.append("\n=======================");
        sb.append("Stream Config");
        sb.append("=======================");
        sb.append("\nNumber of non-chained inputs: ").append(getNumberOfNetworkInputs());
        sb.append("\nNumber of non-chained outputs: ").append(getNumberOfOutputs());
        sb.append("\nOutput names: ").append(getOperatorNonChainedOutputs(classLoader));
        sb.append("\nPartitioning:");
        for (NonChainedOutput nonChainedOutput : getOperatorNonChainedOutputs(classLoader)) {
            sb.append("\n\t").append(nonChainedOutput.getDataSetId().toString()).append(": ").append(nonChainedOutput.getPartitioner());
        }
        sb.append("\nChained subtasks: ").append(getChainedOutputs(classLoader));
        try {
            sb.append("\nOperator: ").append(getStreamOperatorFactoryClass(classLoader).getSimpleName());
        } catch (Exception e) {
            sb.append("\nOperator: Missing");
        }
        sb.append("\nState Monitoring: ").append(isCheckpointingEnabled());
        if (isChainStart() && getChainedOutputs(classLoader).size() > 0) {
            sb.append("\n\n\n---------------------\nChained task configs\n---------------------\n");
            sb.append(getTransitiveChainedTaskConfigs(classLoader));
        }
        return sb.toString();
    }

    public void setGraphContainingLoops(boolean z) {
        this.config.setBoolean(GRAPH_CONTAINING_LOOPS, z);
    }

    public boolean isGraphContainingLoops() {
        return this.config.getBoolean(GRAPH_CONTAINING_LOOPS, false);
    }

    public void clearInitialConfigs() {
        this.removedKeys.add(SERIALIZED_UDF);
        this.config.removeKey(SERIALIZED_UDF);
        this.removedKeys.add(CHAINED_TASK_CONFIG);
        this.config.removeKey(CHAINED_TASK_CONFIG);
    }

    public static boolean requiresSorting(InputConfig inputConfig) {
        return (inputConfig instanceof NetworkInputConfig) && ((NetworkInputConfig) inputConfig).getInputRequirement() == InputRequirement.SORTED;
    }
}
