package org.apache.flink.streaming.api.graph;

import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.common.operators.util.SlotSharingGroupUtils;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RedeploymentOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.operators.InternalBacklogAwareTimerServiceManagerImpl;
import org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionCheckpointStorage;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend;
import org.apache.flink.streaming.api.transformations.BroadcastStateTransformation;
import org.apache.flink.streaming.api.transformations.CacheTransformation;
import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
import org.apache.flink.streaming.api.transformations.KeyedBroadcastStateTransformation;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.api.transformations.ReduceTransformation;
import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.TimestampsAndWatermarksTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.streaming.api.transformations.WithBoundedness;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.translators.BroadcastStateTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.CacheTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.KeyedBroadcastStateTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.LegacySinkTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.MultiInputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.PartitionTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.ReduceTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.SideOutputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.SinkTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.SourceTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.TimestampsAndWatermarksTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.TwoInputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.UnionTransformationTranslator;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TernaryBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraphGenerator.class */
public class StreamGraphGenerator {
    public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 128;
    public static final String DEFAULT_STREAMING_JOB_NAME = "Flink Streaming Job";
    public static final String DEFAULT_BATCH_JOB_NAME = "Flink Batch Job";
    public static final String DEFAULT_SLOT_SHARING_GROUP = "default";
    private final List<Transformation<?>> transformations;
    private final ExecutionConfig executionConfig;
    private final CheckpointConfig checkpointConfig;
    private final RedeploymentOptions.ManagementStrategy managementStrategy;
    private final ReadableConfig configuration;
    private final Map<String, ResourceProfile> slotSharingGroupResources;
    private Path savepointDir;
    private StateBackend stateBackend;
    private TernaryBoolean changelogStateBackendEnabled;
    private CheckpointStorage checkpointStorage;
    private boolean chaining;
    private Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts;
    private TimeCharacteristic timeCharacteristic;
    private SavepointRestoreSettings savepointRestoreSettings;
    private long defaultBufferTimeout;
    private boolean shouldExecuteInBatchMode;
    private static final Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>> translatorMap;
    protected static Integer iterationIdCounter;
    private StreamGraph streamGraph;
    private Map<Transformation<?>, Collection<Integer>> alreadyTransformed;
    private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class);
    public static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.streaming.api.graph.StreamGraphGenerator$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraphGenerator$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$api$common$BatchShuffleMode = new int[BatchShuffleMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$api$common$BatchShuffleMode[BatchShuffleMode.ALL_EXCHANGES_PIPELINED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$BatchShuffleMode[BatchShuffleMode.ALL_EXCHANGES_BLOCKING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$BatchShuffleMode[BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$BatchShuffleMode[BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraphGenerator$ContextImpl.class */
    public static class ContextImpl implements TransformationTranslator.Context {
        private final StreamGraphGenerator streamGraphGenerator;
        private final StreamGraph streamGraph;
        private final String slotSharingGroup;
        private final ReadableConfig config;

        public ContextImpl(StreamGraphGenerator streamGraphGenerator, StreamGraph streamGraph, String str, ReadableConfig readableConfig) {
            this.streamGraphGenerator = (StreamGraphGenerator) Preconditions.checkNotNull(streamGraphGenerator);
            this.streamGraph = (StreamGraph) Preconditions.checkNotNull(streamGraph);
            this.slotSharingGroup = (String) Preconditions.checkNotNull(str);
            this.config = (ReadableConfig) Preconditions.checkNotNull(readableConfig);
        }

        @Override // org.apache.flink.streaming.api.graph.TransformationTranslator.Context
        public StreamGraph getStreamGraph() {
            return this.streamGraph;
        }

        @Override // org.apache.flink.streaming.api.graph.TransformationTranslator.Context
        public Collection<Integer> getStreamNodeIds(Transformation<?> transformation) {
            Preconditions.checkNotNull(transformation);
            Collection<Integer> collection = (Collection) this.streamGraphGenerator.alreadyTransformed.get(transformation);
            Preconditions.checkState(collection != null, "Parent transformation \"" + transformation + "\" has not been transformed.");
            return collection;
        }

        @Override // org.apache.flink.streaming.api.graph.TransformationTranslator.Context
        public String getSlotSharingGroup() {
            return this.slotSharingGroup;
        }

        @Override // org.apache.flink.streaming.api.graph.TransformationTranslator.Context
        public long getDefaultBufferTimeout() {
            return this.streamGraphGenerator.defaultBufferTimeout;
        }

        @Override // org.apache.flink.streaming.api.graph.TransformationTranslator.Context
        public ReadableConfig getGraphGeneratorConfig() {
            return this.config;
        }

        @Override // org.apache.flink.streaming.api.graph.TransformationTranslator.Context
        public Collection<Integer> transform(Transformation<?> transformation) {
            return this.streamGraphGenerator.transform(transformation);
        }
    }

    public static int getNewIterationNodeId() {
        Integer num = iterationIdCounter;
        iterationIdCounter = Integer.valueOf(iterationIdCounter.intValue() - 1);
        return iterationIdCounter.intValue();
    }

    public StreamGraphGenerator(List<Transformation<?>> list, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig) {
        this(list, executionConfig, checkpointConfig, new Configuration());
    }

    public StreamGraphGenerator(List<Transformation<?>> list, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig, ReadableConfig readableConfig) {
        this.slotSharingGroupResources = new HashMap();
        this.chaining = true;
        this.userArtifacts = Collections.emptyList();
        this.timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
        this.defaultBufferTimeout = ((Duration) ExecutionOptions.BUFFER_TIMEOUT.defaultValue()).toMillis();
        this.transformations = (List) Preconditions.checkNotNull(list);
        this.executionConfig = (ExecutionConfig) Preconditions.checkNotNull(executionConfig);
        this.checkpointConfig = new CheckpointConfig(checkpointConfig);
        this.checkpointStorage = this.checkpointConfig.getCheckpointStorage();
        this.savepointRestoreSettings = SavepointRestoreSettings.fromConfiguration(readableConfig);
        this.configuration = (ReadableConfig) Preconditions.checkNotNull(readableConfig);
        this.managementStrategy = executionConfig.getRedeploymentManagementStrategy();
    }

    public StreamGraphGenerator setSavepointDir(Path path) {
        this.savepointDir = path;
        return this;
    }

    public StreamGraphGenerator setStateBackend(StateBackend stateBackend) {
        this.stateBackend = stateBackend;
        return this;
    }

    public StreamGraphGenerator setChangelogStateBackendEnabled(TernaryBoolean ternaryBoolean) {
        this.changelogStateBackendEnabled = ternaryBoolean;
        return this;
    }

    public StreamGraphGenerator setChaining(boolean z) {
        this.chaining = z;
        return this;
    }

    public StreamGraphGenerator setUserArtifacts(Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> collection) {
        this.userArtifacts = (Collection) Preconditions.checkNotNull(collection);
        return this;
    }

    public StreamGraphGenerator setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
        this.timeCharacteristic = timeCharacteristic;
        return this;
    }

    public StreamGraphGenerator setDefaultBufferTimeout(long j) {
        this.defaultBufferTimeout = j;
        return this;
    }

    public StreamGraphGenerator setSlotSharingGroupResource(Map<String, ResourceProfile> map) {
        map.forEach((str, resourceProfile) -> {
            if (resourceProfile.equals(ResourceProfile.UNKNOWN)) {
                return;
            }
            this.slotSharingGroupResources.put(str, resourceProfile);
        });
        return this;
    }

    public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) {
        this.savepointRestoreSettings = savepointRestoreSettings;
    }

    public StreamGraph generate() {
        this.streamGraph = new StreamGraph(this.executionConfig, this.checkpointConfig, this.savepointRestoreSettings);
        this.shouldExecuteInBatchMode = shouldExecuteInBatchMode();
        configureStreamGraph(this.streamGraph);
        this.streamGraph.setRedeploymentManagementStrategy(this.managementStrategy);
        this.alreadyTransformed = new IdentityHashMap();
        Iterator<Transformation<?>> it = this.transformations.iterator();
        while (it.hasNext()) {
            transform(it.next());
        }
        this.streamGraph.setSlotSharingGroupResource(this.slotSharingGroupResources);
        setFineGrainedGlobalStreamExchangeMode(this.streamGraph);
        for (StreamNode streamNode : this.streamGraph.getStreamNodes()) {
            if (streamNode.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
                Iterator<StreamEdge> it2 = streamNode.getInEdges().iterator();
                while (it2.hasNext()) {
                    it2.next().setSupportsUnalignedCheckpoints(false);
                }
            }
        }
        StreamGraph streamGraph = this.streamGraph;
        this.alreadyTransformed.clear();
        this.alreadyTransformed = null;
        this.streamGraph = null;
        return streamGraph;
    }

    private boolean shouldDisableUnalignedCheckpointing(StreamEdge streamEdge) {
        StreamPartitioner<?> partitioner = streamEdge.getPartitioner();
        return partitioner.isPointwise() || partitioner.isBroadcast();
    }

    private void setDynamic(StreamGraph streamGraph) {
        streamGraph.setDynamic(this.shouldExecuteInBatchMode && this.executionConfig.getSchedulerType().orElse(JobManagerOptions.SchedulerType.AdaptiveBatch) == JobManagerOptions.SchedulerType.AdaptiveBatch);
    }

    private void configureStreamGraph(StreamGraph streamGraph) {
        Preconditions.checkNotNull(streamGraph);
        streamGraph.setChaining(this.chaining);
        streamGraph.setUserArtifacts(this.userArtifacts);
        streamGraph.setTimeCharacteristic(this.timeCharacteristic);
        streamGraph.setVertexDescriptionMode((PipelineOptions.VertexDescriptionMode) this.configuration.get(PipelineOptions.VERTEX_DESCRIPTION_MODE));
        streamGraph.setVertexNameIncludeIndexPrefix(((Boolean) this.configuration.get(PipelineOptions.VERTEX_NAME_INCLUDE_INDEX_PREFIX)).booleanValue());
        streamGraph.setAutoParallelismEnabled(((Boolean) this.configuration.get(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED)).booleanValue());
        streamGraph.setEnableCheckpointsAfterTasksFinish(((Boolean) this.configuration.get(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH)).booleanValue());
        setDynamic(streamGraph);
        if (!this.shouldExecuteInBatchMode) {
            configureStreamGraphStreaming(streamGraph);
        } else {
            configureStreamGraphBatch(streamGraph);
            setDefaultBufferTimeout(-1L);
        }
    }

    private void configureStreamGraphBatch(StreamGraph streamGraph) {
        streamGraph.setJobType(JobType.BATCH);
        streamGraph.setJobName(deriveJobName(DEFAULT_BATCH_JOB_NAME));
        if (this.checkpointConfig.isCheckpointingEnabled()) {
            LOG.info("Disabled Checkpointing. Checkpointing is not supported and not needed when executing jobs in BATCH mode.");
            this.checkpointConfig.disableCheckpointing();
        }
        setBatchStateBackendAndTimerService(streamGraph);
        streamGraph.setGlobalStreamExchangeMode(deriveGlobalStreamExchangeModeBatch());
        streamGraph.setAllVerticesInSameSlotSharingGroupByDefault(false);
    }

    private void configureStreamGraphStreaming(StreamGraph streamGraph) {
        streamGraph.setJobType(JobType.STREAMING);
        streamGraph.setJobName(deriveJobName("Flink Streaming Job"));
        streamGraph.setStateBackend(this.stateBackend);
        streamGraph.setChangelogStateBackendEnabled(this.changelogStateBackendEnabled);
        streamGraph.setCheckpointStorage(this.checkpointStorage);
        streamGraph.setSavepointDirectory(this.savepointDir);
        streamGraph.setGlobalStreamExchangeMode(deriveGlobalStreamExchangeModeStreaming());
        if (Duration.ZERO.equals(this.configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG))) {
            streamGraph.setTimerServiceProvider(InternalBacklogAwareTimerServiceManagerImpl::create);
        }
    }

    private String deriveJobName(String str) {
        return (String) this.configuration.getOptional(PipelineOptions.NAME).orElse(str);
    }

    private GlobalStreamExchangeMode deriveGlobalStreamExchangeModeBatch() {
        BatchShuffleMode batchShuffleMode = (BatchShuffleMode) this.configuration.get(ExecutionOptions.BATCH_SHUFFLE_MODE);
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$api$common$BatchShuffleMode[batchShuffleMode.ordinal()]) {
            case 1:
                return GlobalStreamExchangeMode.ALL_EDGES_PIPELINED;
            case InternalTimerServiceSerializationProxy.VERSION /* 2 */:
                return GlobalStreamExchangeMode.ALL_EDGES_BLOCKING;
            case 3:
                return GlobalStreamExchangeMode.ALL_EDGES_HYBRID_FULL;
            case 4:
                return GlobalStreamExchangeMode.ALL_EDGES_HYBRID_SELECTIVE;
            default:
                throw new IllegalArgumentException(String.format("Unsupported shuffle mode '%s' in BATCH runtime mode.", batchShuffleMode.toString()));
        }
    }

    private GlobalStreamExchangeMode deriveGlobalStreamExchangeModeStreaming() {
        if (!this.checkpointConfig.isApproximateLocalRecoveryEnabled()) {
            return GlobalStreamExchangeMode.ALL_EDGES_PIPELINED;
        }
        checkApproximateLocalRecoveryCompatibility();
        return GlobalStreamExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE;
    }

    private void checkApproximateLocalRecoveryCompatibility() {
        Preconditions.checkState(!this.checkpointConfig.isUnalignedCheckpointsEnabled(), "Approximate Local Recovery and Unaligned Checkpoint can not be used together yet");
    }

    private void setBatchStateBackendAndTimerService(StreamGraph streamGraph) {
        boolean booleanValue = ((Boolean) this.configuration.get(ExecutionOptions.USE_BATCH_STATE_BACKEND)).booleanValue();
        Preconditions.checkState(!booleanValue || ((Boolean) this.configuration.get(ExecutionOptions.SORT_INPUTS)).booleanValue(), "Batch state backend requires the sorted inputs to be enabled!");
        if (!booleanValue) {
            streamGraph.setStateBackend(this.stateBackend);
            streamGraph.setChangelogStateBackendEnabled(this.changelogStateBackendEnabled);
            return;
        }
        LOG.debug("Using BATCH execution state backend and timer service.");
        streamGraph.setStateBackend(new BatchExecutionStateBackend());
        streamGraph.setChangelogStateBackendEnabled(TernaryBoolean.FALSE);
        streamGraph.setCheckpointStorage(new BatchExecutionCheckpointStorage());
        streamGraph.setTimerServiceProvider(BatchExecutionInternalTimeServiceManager::create);
    }

    private void setFineGrainedGlobalStreamExchangeMode(StreamGraph streamGraph) {
        if (this.shouldExecuteInBatchMode && streamGraph.hasFineGrainedResource()) {
            if (!((Boolean) this.configuration.get(ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING)).booleanValue()) {
                throw new IllegalConfigurationException("At the moment, fine-grained resource management requires batch workloads to be executed with types of all edges being BLOCKING. To do that, you need to configure '" + ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING.key() + "' to 'true'. Notice that this may affect the performance. See FLINK-20865 for more details.");
            }
            streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
        }
    }

    private boolean shouldExecuteInBatchMode() {
        RuntimeExecutionMode runtimeExecutionMode = (RuntimeExecutionMode) this.configuration.get(ExecutionOptions.RUNTIME_MODE);
        boolean existsUnboundedSource = existsUnboundedSource();
        Preconditions.checkState((runtimeExecutionMode == RuntimeExecutionMode.BATCH && existsUnboundedSource) ? false : true, "Detected an UNBOUNDED source with the '" + ExecutionOptions.RUNTIME_MODE.key() + "' set to 'BATCH'. This combination is not allowed, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "' to STREAMING or AUTOMATIC");
        return Preconditions.checkNotNull(runtimeExecutionMode) != RuntimeExecutionMode.AUTOMATIC ? runtimeExecutionMode == RuntimeExecutionMode.BATCH : !existsUnboundedSource;
    }

    private boolean existsUnboundedSource() {
        return this.transformations.stream().anyMatch(transformation -> {
            return isUnboundedSource(transformation) || transformation.getTransitivePredecessors().stream().anyMatch(this::isUnboundedSource);
        });
    }

    private boolean isUnboundedSource(Transformation<?> transformation) {
        Preconditions.checkNotNull(transformation);
        return (transformation instanceof WithBoundedness) && ((WithBoundedness) transformation).getBoundedness() != Boundedness.BOUNDED;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Collection<Integer> transform(Transformation<?> transformation) {
        int maxParallelism;
        if (this.alreadyTransformed.containsKey(transformation)) {
            return this.alreadyTransformed.get(transformation);
        }
        LOG.debug("Transforming " + transformation);
        if (transformation.getMaxParallelism() <= 0 && (maxParallelism = this.executionConfig.getMaxParallelism()) > 0) {
            transformation.setMaxParallelism(maxParallelism);
        }
        transformation.getSlotSharingGroup().ifPresent(slotSharingGroup -> {
            ResourceSpec extractResourceSpec = SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup);
            if (extractResourceSpec.equals(ResourceSpec.UNKNOWN)) {
                return;
            }
            this.slotSharingGroupResources.compute(slotSharingGroup.getName(), (str, resourceProfile) -> {
                if (resourceProfile == null) {
                    return ResourceProfile.fromResourceSpec(extractResourceSpec, MemorySize.ZERO);
                }
                if (ResourceProfile.fromResourceSpec(extractResourceSpec, MemorySize.ZERO).equals(resourceProfile)) {
                    return resourceProfile;
                }
                throw new IllegalArgumentException("The slot sharing group " + slotSharingGroup.getName() + " has been configured with two different resource spec.");
            });
        });
        transformation.getOutputType();
        TransformationTranslator<?, ? extends Transformation> transformationTranslator = translatorMap.get(transformation.getClass());
        Collection<Integer> translate = transformationTranslator != null ? translate(transformationTranslator, transformation) : legacyTransform(transformation);
        if (!this.alreadyTransformed.containsKey(transformation)) {
            this.alreadyTransformed.put(transformation, translate);
        }
        return translate;
    }

    private Collection<Integer> legacyTransform(Transformation<?> transformation) {
        Collection<Integer> transformCoFeedback;
        if (transformation instanceof FeedbackTransformation) {
            transformCoFeedback = transformFeedback((FeedbackTransformation) transformation);
        } else {
            if (!(transformation instanceof CoFeedbackTransformation)) {
                throw new IllegalStateException("Unknown transformation: " + transformation);
            }
            transformCoFeedback = transformCoFeedback((CoFeedbackTransformation) transformation);
        }
        if (transformation.getBufferTimeout() >= 0) {
            this.streamGraph.setBufferTimeout(Integer.valueOf(transformation.getId()), transformation.getBufferTimeout());
        } else {
            this.streamGraph.setBufferTimeout(Integer.valueOf(transformation.getId()), this.defaultBufferTimeout);
        }
        if (transformation.getUid() != null) {
            this.streamGraph.setTransformationUID(Integer.valueOf(transformation.getId()), transformation.getUid());
        }
        if (transformation.getUserProvidedNodeHash() != null) {
            this.streamGraph.setTransformationUserHash(Integer.valueOf(transformation.getId()), transformation.getUserProvidedNodeHash());
        }
        if (!this.streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled() && (transformation instanceof PhysicalTransformation) && transformation.getUserProvidedNodeHash() == null && transformation.getUid() == null) {
            throw new IllegalStateException("Auto generated UIDs have been disabled but no UID or hash has been assigned to operator " + transformation.getName());
        }
        if (transformation.getMinResources() != null && transformation.getPreferredResources() != null) {
            this.streamGraph.setResources(transformation.getId(), transformation.getMinResources(), transformation.getPreferredResources());
        }
        this.streamGraph.setManagedMemoryUseCaseWeights(transformation.getId(), transformation.getManagedMemoryOperatorScopeUseCaseWeights(), transformation.getManagedMemorySlotScopeUseCases());
        return transformCoFeedback;
    }

    private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> feedbackTransformation) {
        if (this.shouldExecuteInBatchMode) {
            throw new UnsupportedOperationException("Iterations are not supported in BATCH execution mode. If you want to execute such a pipeline, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "'=" + RuntimeExecutionMode.STREAMING.name());
        }
        if (feedbackTransformation.getFeedbackEdges().size() <= 0) {
            throw new IllegalStateException("Iteration " + feedbackTransformation + " does not have any feedback edges.");
        }
        List<Transformation<?>> inputs = feedbackTransformation.getInputs();
        Preconditions.checkState(inputs.size() == 1);
        Transformation<?> transformation = inputs.get(0);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(transform(transformation));
        if (this.alreadyTransformed.containsKey(feedbackTransformation)) {
            return this.alreadyTransformed.get(feedbackTransformation);
        }
        Tuple2<StreamNode, StreamNode> createIterationSourceAndSink = this.streamGraph.createIterationSourceAndSink(feedbackTransformation.getId(), getNewIterationNodeId(), getNewIterationNodeId(), feedbackTransformation.getWaitTime().longValue(), feedbackTransformation.getParallelism(), feedbackTransformation.getMaxParallelism(), feedbackTransformation.getMinResources(), feedbackTransformation.getPreferredResources());
        StreamNode streamNode = (StreamNode) createIterationSourceAndSink.f0;
        StreamNode streamNode2 = (StreamNode) createIterationSourceAndSink.f1;
        this.streamGraph.setSerializers(Integer.valueOf(streamNode.getId()), null, null, feedbackTransformation.getOutputType().createSerializer(this.executionConfig));
        this.streamGraph.setSerializers(Integer.valueOf(streamNode2.getId()), feedbackTransformation.getOutputType().createSerializer(this.executionConfig), null, null);
        arrayList.add(Integer.valueOf(streamNode.getId()));
        this.alreadyTransformed.put(feedbackTransformation, arrayList);
        ArrayList arrayList2 = new ArrayList();
        Iterator<Transformation<T>> it = feedbackTransformation.getFeedbackEdges().iterator();
        while (it.hasNext()) {
            Collection<Integer> transform = transform(it.next());
            arrayList2.addAll(transform);
            Iterator<Integer> it2 = transform.iterator();
            while (it2.hasNext()) {
                this.streamGraph.addEdge(it2.next(), Integer.valueOf(streamNode2.getId()), 0);
            }
        }
        String determineSlotSharingGroup = determineSlotSharingGroup(null, arrayList2);
        if (determineSlotSharingGroup == null) {
            determineSlotSharingGroup = "SlotSharingGroup-" + feedbackTransformation.getId();
        }
        streamNode2.setSlotSharingGroup(determineSlotSharingGroup);
        streamNode.setSlotSharingGroup(determineSlotSharingGroup);
        return arrayList;
    }

    private <F> Collection<Integer> transformCoFeedback(CoFeedbackTransformation<F> coFeedbackTransformation) {
        if (this.shouldExecuteInBatchMode) {
            throw new UnsupportedOperationException("Iterations are not supported in BATCH execution mode. If you want to execute such a pipeline, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "'=" + RuntimeExecutionMode.STREAMING.name());
        }
        Tuple2<StreamNode, StreamNode> createIterationSourceAndSink = this.streamGraph.createIterationSourceAndSink(coFeedbackTransformation.getId(), getNewIterationNodeId(), getNewIterationNodeId(), coFeedbackTransformation.getWaitTime().longValue(), coFeedbackTransformation.getParallelism(), coFeedbackTransformation.getMaxParallelism(), coFeedbackTransformation.getMinResources(), coFeedbackTransformation.getPreferredResources());
        StreamNode streamNode = (StreamNode) createIterationSourceAndSink.f0;
        StreamNode streamNode2 = (StreamNode) createIterationSourceAndSink.f1;
        this.streamGraph.setSerializers(Integer.valueOf(streamNode.getId()), null, null, coFeedbackTransformation.getOutputType().createSerializer(this.executionConfig));
        this.streamGraph.setSerializers(Integer.valueOf(streamNode2.getId()), coFeedbackTransformation.getOutputType().createSerializer(this.executionConfig), null, null);
        this.alreadyTransformed.put(coFeedbackTransformation, Collections.singleton(Integer.valueOf(streamNode.getId())));
        ArrayList arrayList = new ArrayList();
        Iterator<Transformation<F>> it = coFeedbackTransformation.getFeedbackEdges().iterator();
        while (it.hasNext()) {
            Collection<Integer> transform = transform(it.next());
            arrayList.addAll(transform);
            Iterator<Integer> it2 = transform.iterator();
            while (it2.hasNext()) {
                this.streamGraph.addEdge(it2.next(), Integer.valueOf(streamNode2.getId()), 0);
            }
        }
        String determineSlotSharingGroup = determineSlotSharingGroup(null, arrayList);
        streamNode2.setSlotSharingGroup(determineSlotSharingGroup);
        streamNode.setSlotSharingGroup(determineSlotSharingGroup);
        return Collections.singleton(Integer.valueOf(streamNode.getId()));
    }

    private Collection<Integer> translate(TransformationTranslator<?, Transformation<?>> transformationTranslator, Transformation<?> transformation) {
        Preconditions.checkNotNull(transformationTranslator);
        Preconditions.checkNotNull(transformation);
        List<Collection<Integer>> parentInputIds = getParentInputIds(transformation.getInputs());
        if (this.alreadyTransformed.containsKey(transformation)) {
            return this.alreadyTransformed.get(transformation);
        }
        ContextImpl contextImpl = new ContextImpl(this, this.streamGraph, determineSlotSharingGroup(transformation.getSlotSharingGroup().isPresent() ? ((SlotSharingGroup) transformation.getSlotSharingGroup().get()).getName() : null, (Collection) parentInputIds.stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList())), this.configuration);
        return this.shouldExecuteInBatchMode ? transformationTranslator.translateForBatch(transformation, contextImpl) : transformationTranslator.translateForStreaming(transformation, contextImpl);
    }

    private List<Collection<Integer>> getParentInputIds(@Nullable Collection<Transformation<?>> collection) {
        ArrayList arrayList = new ArrayList();
        if (collection == null) {
            return arrayList;
        }
        Iterator<Transformation<?>> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(transform(it.next()));
        }
        return arrayList;
    }

    private String determineSlotSharingGroup(String str, Collection<Integer> collection) {
        if (str != null) {
            return str;
        }
        String str2 = null;
        Iterator<Integer> it = collection.iterator();
        while (it.hasNext()) {
            String slotSharingGroup = this.streamGraph.getSlotSharingGroup(Integer.valueOf(it.next().intValue()));
            if (str2 == null) {
                str2 = slotSharingGroup;
            } else if (!str2.equals(slotSharingGroup)) {
                return DEFAULT_SLOT_SHARING_GROUP;
            }
        }
        return str2 == null ? DEFAULT_SLOT_SHARING_GROUP : str2;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1352294148:
                if (implMethodName.equals("create")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case WatermarkStatus.ACTIVE_STATUS /* 0 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/streaming/api/operators/InternalTimeServiceManager$Provider") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/runtime/state/CheckpointableKeyedStateBackend;Ljava/lang/ClassLoader;Lorg/apache/flink/streaming/api/operators/KeyContext;Lorg/apache/flink/streaming/runtime/tasks/ProcessingTimeService;Ljava/lang/Iterable;)Lorg/apache/flink/streaming/api/operators/InternalTimeServiceManager;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/InternalBacklogAwareTimerServiceManagerImpl") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/runtime/state/CheckpointableKeyedStateBackend;Ljava/lang/ClassLoader;Lorg/apache/flink/streaming/api/operators/KeyContext;Lorg/apache/flink/streaming/runtime/tasks/ProcessingTimeService;Ljava/lang/Iterable;)Lorg/apache/flink/streaming/api/operators/InternalBacklogAwareTimerServiceManagerImpl;")) {
                    return InternalBacklogAwareTimerServiceManagerImpl::create;
                }
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/streaming/api/operators/InternalTimeServiceManager$Provider") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/runtime/state/CheckpointableKeyedStateBackend;Ljava/lang/ClassLoader;Lorg/apache/flink/streaming/api/operators/KeyContext;Lorg/apache/flink/streaming/runtime/tasks/ProcessingTimeService;Ljava/lang/Iterable;)Lorg/apache/flink/streaming/api/operators/InternalTimeServiceManager;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/runtime/state/CheckpointableKeyedStateBackend;Ljava/lang/ClassLoader;Lorg/apache/flink/streaming/api/operators/KeyContext;Lorg/apache/flink/streaming/runtime/tasks/ProcessingTimeService;Ljava/lang/Iterable;)Lorg/apache/flink/streaming/api/operators/InternalTimeServiceManager;")) {
                    return BatchExecutionInternalTimeServiceManager::create;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        HashMap hashMap = new HashMap();
        hashMap.put(OneInputTransformation.class, new OneInputTransformationTranslator());
        hashMap.put(TwoInputTransformation.class, new TwoInputTransformationTranslator());
        hashMap.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator());
        hashMap.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator());
        hashMap.put(SourceTransformation.class, new SourceTransformationTranslator());
        hashMap.put(SinkTransformation.class, new SinkTransformationTranslator());
        hashMap.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator());
        hashMap.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator());
        hashMap.put(UnionTransformation.class, new UnionTransformationTranslator());
        hashMap.put(PartitionTransformation.class, new PartitionTransformationTranslator());
        hashMap.put(SideOutputTransformation.class, new SideOutputTransformationTranslator());
        hashMap.put(ReduceTransformation.class, new ReduceTransformationTranslator());
        hashMap.put(TimestampsAndWatermarksTransformation.class, new TimestampsAndWatermarksTransformationTranslator());
        hashMap.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator());
        hashMap.put(KeyedBroadcastStateTransformation.class, new KeyedBroadcastStateTransformationTranslator());
        hashMap.put(CacheTransformation.class, new CacheTransformationTranslator());
        translatorMap = Collections.unmodifiableMap(hashMap);
        iterationIdCounter = 0;
    }
}
