package org.apache.flink.streaming.runtime.translators;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.representation.PrettySnippet;
import org.apache.flink.representation.SnippetType;
import org.apache.flink.runtime.jobgraph.OperationKindTag;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.runtime.operators.sink.BatchCommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.BatchGlobalCommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.CommittableTypeInformation;
import org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.StatelessSinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.StreamingCommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.StreamingGlobalCommitterOperatorFactory;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.util.graph.StreamGraphUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.class */
public class SinkTransformationTranslator<InputT, CommT, WriterStateT, GlobalCommT> implements TransformationTranslator<Object, SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT>> {
    protected static final Logger LOG = LoggerFactory.getLogger(SinkTransformationTranslator.class);
    private static final String PREVIOUS_SINK_STATE_NAME = "bucket-states";

    @Override // org.apache.flink.streaming.api.graph.TransformationTranslator
    public Collection<Integer> translateForBatch(SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, TransformationTranslator.Context context) {
        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), sinkTransformation);
        try {
            internalTranslate(sinkTransformation, getParallelism(sinkTransformation, context), PREVIOUS_SINK_STATE_NAME, new BatchCommitterOperatorFactory(sinkTransformation.getSink()), 1, 1, new BatchGlobalCommitterOperatorFactory(sinkTransformation.getSink()), context);
            return Collections.emptyList();
        } catch (IOException e) {
            throw new FlinkRuntimeException("Could not add the Committer or GlobalCommitter to the stream graph.", e);
        }
    }

    @Override // org.apache.flink.streaming.api.graph.TransformationTranslator
    public Collection<Integer> translateForStreaming(SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, TransformationTranslator.Context context) {
        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), sinkTransformation);
        int parallelism = getParallelism(sinkTransformation, context);
        try {
            internalTranslate(sinkTransformation, parallelism, PREVIOUS_SINK_STATE_NAME, new StreamingCommitterOperatorFactory(sinkTransformation.getSink()), parallelism, sinkTransformation.getMaxParallelism(), new StreamingGlobalCommitterOperatorFactory(sinkTransformation.getSink()), context);
            return Collections.emptyList();
        } catch (IOException e) {
            throw new FlinkRuntimeException("Could not add the Committer or GlobalCommitter to the stream graph.", e);
        }
    }

    private void internalTranslate(SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, int i, @Nullable String str, OneInputStreamOperatorFactory<CommT, CommT> oneInputStreamOperatorFactory, int i2, int i3, OneInputStreamOperatorFactory<CommT, GlobalCommT> oneInputStreamOperatorFactory2, TransformationTranslator.Context context) throws IOException {
        StreamGraphUtils.validateTransformationUid(context.getStreamGraph(), sinkTransformation);
        int addWriter = addWriter(sinkTransformation, i, str, context);
        int addCommitter = addCommitter(addWriter, sinkTransformation, oneInputStreamOperatorFactory, i2, i3, context);
        addGlobalCommitter(addCommitter > 0 ? addCommitter : addWriter, sinkTransformation, oneInputStreamOperatorFactory2, context);
    }

    private int addWriter(SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, int i, @Nullable String str, TransformationTranslator.Context context) {
        boolean isPresent = sinkTransformation.getSink().getWriterStateSerializer().isPresent();
        Preconditions.checkState(sinkTransformation.getInputs().size() == 1);
        Transformation<?> transformation = sinkTransformation.getInputs().get(0);
        TypeInformation<IN> outputType = transformation.getOutputType();
        AbstractStreamOperatorFactory statefulSinkWriterOperatorFactory = isPresent ? new StatefulSinkWriterOperatorFactory(sinkTransformation.getSink(), str) : new StatelessSinkWriterOperatorFactory(sinkTransformation.getSink());
        ChainingStrategy chainingStrategy = sinkTransformation.getChainingStrategy();
        if (chainingStrategy != null) {
            statefulSinkWriterOperatorFactory.setChainingStrategy(chainingStrategy);
        }
        return addOperatorToStreamGraph(statefulSinkWriterOperatorFactory, context.getStreamNodeIds(transformation), outputType, extractCommittableTypeInformation(sinkTransformation.getSink()), String.format("%s %s", "Sink Writer:", sinkTransformation.getName()), sinkTransformation.getUid(), i, sinkTransformation.getMaxParallelism(), sinkTransformation, context);
    }

    private int addCommitter(int i, SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, OneInputStreamOperatorFactory<CommT, CommT> oneInputStreamOperatorFactory, int i2, int i3, TransformationTranslator.Context context) throws IOException {
        if (!sinkTransformation.getSink().createCommitter().isPresent()) {
            return -1;
        }
        CommittableTypeInformation<CommT> extractCommittableTypeInformation = extractCommittableTypeInformation(sinkTransformation.getSink());
        Preconditions.checkNotNull(extractCommittableTypeInformation);
        return addOperatorToStreamGraph(oneInputStreamOperatorFactory, Collections.singletonList(Integer.valueOf(i)), extractCommittableTypeInformation, extractCommittableTypeInformation, String.format("%s %s", "Sink Committer:", sinkTransformation.getName()), sinkTransformation.getUid() == null ? null : String.format("%s %s", "Sink Committer:", sinkTransformation.getUid()), i2, i3, sinkTransformation, context);
    }

    private void addGlobalCommitter(int i, SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, OneInputStreamOperatorFactory<CommT, GlobalCommT> oneInputStreamOperatorFactory, TransformationTranslator.Context context) throws IOException {
        if (sinkTransformation.getSink().createGlobalCommitter().isPresent()) {
            addOperatorToStreamGraph(oneInputStreamOperatorFactory, Collections.singletonList(Integer.valueOf(i)), (TypeInformation) Preconditions.checkNotNull(extractCommittableTypeInformation(sinkTransformation.getSink())), null, String.format("%s %s", "Sink Global Committer:", sinkTransformation.getName()), sinkTransformation.getUid() == null ? null : String.format("%s %s", "Sink Global Committer:", sinkTransformation.getUid()), 1, 1, sinkTransformation, context);
        }
    }

    private int getParallelism(SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, TransformationTranslator.Context context) {
        return sinkTransformation.getParallelism() != -1 ? sinkTransformation.getParallelism() : context.getStreamGraph().getExecutionConfig().getParallelism();
    }

    private <IN, OUT> int addOperatorToStreamGraph(StreamOperatorFactory<OUT> streamOperatorFactory, Collection<Integer> collection, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2, String str, @Nullable String str2, int i, int i2, SinkTransformation<InputT, CommT, WriterStateT, GlobalCommT> sinkTransformation, TransformationTranslator.Context context) {
        StreamGraph streamGraph = context.getStreamGraph();
        String slotSharingGroup = context.getSlotSharingGroup();
        int newNodeId = Transformation.getNewNodeId();
        streamGraph.addOperator(Integer.valueOf(newNodeId), slotSharingGroup, sinkTransformation.getCoLocationGroupKey(), streamOperatorFactory, typeInformation, typeInformation2, str, streamOperatorFactory instanceof SimpleOperatorFactory ? ((SimpleOperatorFactory) streamOperatorFactory).getOperator().getOperationKindTags() : OperationKindTag.asSet(new OperationKindTag[]{OperationKindTag.SINK, OperationKindTag.ONE_INPUT}), (PrettySnippet) sinkTransformation.getPrettySnippet().orElse(PrettySnippet.of(SnippetType.SOURCE, sinkTransformation.getName())));
        streamGraph.setParallelism(Integer.valueOf(newNodeId), i);
        streamGraph.setMaxParallelism(newNodeId, i2);
        StreamGraphUtils.configureBufferTimeout(streamGraph, newNodeId, sinkTransformation, context.getDefaultBufferTimeout());
        if (str2 != null) {
            streamGraph.setTransformationUID(Integer.valueOf(newNodeId), str2);
        }
        Iterator<Integer> it = collection.iterator();
        while (it.hasNext()) {
            streamGraph.addEdge(Integer.valueOf(it.next().intValue()), Integer.valueOf(newNodeId), 0);
        }
        return newNodeId;
    }

    private CommittableTypeInformation<CommT> extractCommittableTypeInformation(Sink<InputT, CommT, WriterStateT, GlobalCommT> sink) {
        if (!sink.getCommittableSerializer().isPresent()) {
            return null;
        }
        Type parameterType = TypeExtractor.getParameterType(Sink.class, sink.getClass(), 1);
        LOG.debug("Extracted committable type [{}] from sink [{}].", parameterType.toString(), sink.getClass().getCanonicalName());
        return new CommittableTypeInformation<>(TypeExtractionUtils.typeToClass(parameterType), () -> {
            return (SimpleVersionedSerializer) sink.getCommittableSerializer().get();
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1937278072:
                if (implMethodName.equals("lambda$extractCommittableTypeInformation$937ef950$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case StreamStatus.ACTIVE_STATUS /* 0 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/connector/sink/Sink;)Lorg/apache/flink/core/io/SimpleVersionedSerializer;")) {
                    Sink sink = (Sink) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return (SimpleVersionedSerializer) sink.getCommittableSerializer().get();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
