package org.apache.flink.streaming.connectors.kafka.shuffle;

import java.util.Properties;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.PropertiesUtil;

@Experimental
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.class */
public class FlinkKafkaShuffle {
    static final String PRODUCER_PARALLELISM = "producer parallelism";
    static final String PARTITION_NUMBER = "partition number";

    public static <T, K> KeyedStream<T, K> persistentKeyBy(DataStream<T> dataStream, String str, int i, int i2, Properties properties, KeySelector<T, K> keySelector) {
        Properties flatten = PropertiesUtil.flatten(properties);
        flatten.setProperty(PRODUCER_PARALLELISM, String.valueOf(i));
        flatten.setProperty(PARTITION_NUMBER, String.valueOf(i2));
        StreamExecutionEnvironment executionEnvironment = dataStream.getExecutionEnvironment();
        writeKeyBy(dataStream, str, flatten, keySelector);
        return readKeyBy(str, executionEnvironment, dataStream.getType(), flatten, keySelector);
    }

    public static <T> KeyedStream<T, Tuple> persistentKeyBy(DataStream<T> dataStream, String str, int i, int i2, Properties properties, int... iArr) {
        return persistentKeyBy(dataStream, str, i, i2, properties, keySelector(dataStream, iArr));
    }

    public static <T, K> void writeKeyBy(DataStream<T> dataStream, String str, Properties properties, KeySelector<T, K> keySelector) {
        StreamExecutionEnvironment executionEnvironment = dataStream.getExecutionEnvironment();
        FlinkKafkaShuffleProducer flinkKafkaShuffleProducer = new FlinkKafkaShuffleProducer(str, dataStream.getType().createSerializer(executionEnvironment.getConfig()), properties, (KeySelector) executionEnvironment.clean(keySelector), FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5);
        Preconditions.checkArgument(properties.getProperty(PRODUCER_PARALLELISM) != null, "Missing producer parallelism for Kafka Shuffle");
        addKafkaShuffle(dataStream, flinkKafkaShuffleProducer, PropertiesUtil.getInt(properties, PRODUCER_PARALLELISM, Integer.MIN_VALUE));
    }

    public static <T> void writeKeyBy(DataStream<T> dataStream, String str, Properties properties, int... iArr) {
        writeKeyBy(dataStream, str, properties, keySelector(dataStream, iArr));
    }

    public static <T, K> KeyedStream<T, K> readKeyBy(String str, StreamExecutionEnvironment streamExecutionEnvironment, TypeInformation<T> typeInformation, Properties properties, KeySelector<T, K> keySelector) {
        TypeSerializer<T> createSerializer = typeInformation.createSerializer(streamExecutionEnvironment.getConfig());
        FlinkKafkaShuffleConsumer flinkKafkaShuffleConsumer = new FlinkKafkaShuffleConsumer(str, new TypeInformationSerializationSchema(typeInformation, createSerializer), createSerializer, properties);
        Preconditions.checkArgument(properties.getProperty(PARTITION_NUMBER) != null, "Missing partition number for Kafka Shuffle");
        return DataStreamUtils.reinterpretAsKeyedStream(streamExecutionEnvironment.addSource(flinkKafkaShuffleConsumer).setParallelism(PropertiesUtil.getInt(properties, PARTITION_NUMBER, Integer.MIN_VALUE)), keySelector);
    }

    private static <T, K> void addKafkaShuffle(DataStream<T> dataStream, FlinkKafkaShuffleProducer<T, K> flinkKafkaShuffleProducer, int i) {
        dataStream.getTransformation().getOutputType();
        LegacySinkTransformation legacySinkTransformation = new LegacySinkTransformation(dataStream.getTransformation(), "kafka_shuffle", new StreamKafkaShuffleSink(flinkKafkaShuffleProducer), dataStream.getExecutionEnvironment().getParallelism());
        dataStream.getExecutionEnvironment().addOperator(legacySinkTransformation);
        legacySinkTransformation.setParallelism(i);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v11, types: [org.apache.flink.api.java.functions.KeySelector] */
    private static <T> KeySelector<T, Tuple> keySelector(DataStream<T> dataStream, int... iArr) {
        return ((dataStream.getType() instanceof BasicArrayTypeInfo) || (dataStream.getType() instanceof PrimitiveArrayTypeInfo)) ? KeySelectorUtil.getSelectorForArray(iArr, dataStream.getType()) : KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys(iArr, dataStream.getType()), dataStream.getType(), dataStream.getExecutionEnvironment().getConfig());
    }
}
