package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.runtime.partitioner.ForwardForConsecutiveHashPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecExchange;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@ExecNodeMetadata(name = "stream-exec-exchange", version = 1, producedTransformations = {StreamExecExchange.EXCHANGE_TRANSFORMATION}, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15)
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.class */
public class StreamExecExchange extends CommonExecExchange implements StreamExecNode<RowData> {
    public static final String EXCHANGE_TRANSFORMATION = "exchange";

    public StreamExecExchange(ReadableConfig readableConfig, InputProperty inputProperty, RowType rowType, String str) {
        this(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecExchange.class), ExecNodeContext.newPersistedConfig(StreamExecExchange.class, readableConfig), Collections.singletonList(inputProperty), rowType, str);
    }

    @JsonCreator
    public StreamExecExchange(@JsonProperty("id") int i, @JsonProperty("type") ExecNodeContext execNodeContext, @JsonProperty("configuration") ReadableConfig readableConfig, @JsonProperty("inputProperties") List<InputProperty> list, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str) {
        super(i, execNodeContext, readableConfig, list, rowType, str);
        Preconditions.checkArgument(list.size() == 1);
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase, org.apache.flink.table.planner.plan.nodes.exec.ExecNode
    public String getDescription() {
        InputProperty.RequiredDistribution requiredDistribution = getInputProperties().get(0).getRequiredDistribution();
        StringBuilder sb = new StringBuilder();
        String lowerCase = requiredDistribution.getType().name().toLowerCase();
        if (lowerCase.equals("singleton")) {
            lowerCase = "single";
        } else if ((requiredDistribution instanceof InputProperty.KeepInputAsIsDistribution) && ((InputProperty.KeepInputAsIsDistribution) requiredDistribution).isStrict()) {
            lowerCase = "forward";
        }
        sb.append("distribution=[").append(lowerCase);
        if (requiredDistribution instanceof InputProperty.HashDistribution) {
            sb.append(getHashDistributionDescription((InputProperty.HashDistribution) requiredDistribution));
        } else if ((requiredDistribution instanceof InputProperty.KeepInputAsIsDistribution) && !((InputProperty.KeepInputAsIsDistribution) requiredDistribution).isStrict()) {
            sb.append("[hash").append(getHashDistributionDescription((InputProperty.HashDistribution) ((InputProperty.KeepInputAsIsDistribution) requiredDistribution).getInputDistribution())).append("]");
        }
        sb.append("]");
        return String.format("Exchange(%s)", sb);
    }

    private String getHashDistributionDescription(InputProperty.HashDistribution hashDistribution) {
        RowType outputType = getInputEdges().get(0).getOutputType();
        return (String) Arrays.stream((String[]) Arrays.stream(hashDistribution.getKeys()).mapToObj(i -> {
            return (String) outputType.getFieldNames().get(i);
        }).toArray(i2 -> {
            return new String[i2];
        })).collect(Collectors.joining(", ", "[", "]"));
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        GlobalPartitioner forwardForConsecutiveHashPartitioner;
        int parallelism;
        Transformation<?> translateToPlan = getInputEdges().get(0).translateToPlan(plannerBase);
        InputProperty inputProperty = getInputProperties().get(0);
        InputProperty.DistributionType type = inputProperty.getRequiredDistribution().getType();
        InputProperty.RequiredDistribution requiredDistribution = inputProperty.getRequiredDistribution();
        InternalTypeInfo outputType = translateToPlan.getOutputType();
        switch (type) {
            case SINGLETON:
                forwardForConsecutiveHashPartitioner = new GlobalPartitioner();
                parallelism = 1;
                break;
            case HASH:
                forwardForConsecutiveHashPartitioner = createHashPartitioner(((InputProperty.HashDistribution) requiredDistribution).getKeys(), outputType);
                parallelism = -1;
                break;
            case KEEP_INPUT_AS_IS:
                if (((InputProperty.KeepInputAsIsDistribution) requiredDistribution).isStrict()) {
                    forwardForConsecutiveHashPartitioner = new ForwardPartitioner(true);
                } else {
                    InputProperty.RequiredDistribution inputDistribution = ((InputProperty.KeepInputAsIsDistribution) requiredDistribution).getInputDistribution();
                    Preconditions.checkArgument(inputDistribution instanceof InputProperty.HashDistribution, "Only HashDistribution is supported now");
                    forwardForConsecutiveHashPartitioner = new ForwardForConsecutiveHashPartitioner(createHashPartitioner(((InputProperty.HashDistribution) inputDistribution).getKeys(), outputType));
                }
                parallelism = translateToPlan.getParallelism();
                break;
            default:
                throw new TableException(String.format("%s is not supported now!", type));
        }
        PartitionTransformation partitionTransformation = new PartitionTransformation(translateToPlan, forwardForConsecutiveHashPartitioner);
        createTransformationMeta(EXCHANGE_TRANSFORMATION, execNodeConfig).fill(partitionTransformation);
        partitionTransformation.setParallelism(parallelism);
        partitionTransformation.setOutputType(InternalTypeInfo.of(getOutputType()));
        return partitionTransformation;
    }

    public static StreamPartitioner<RowData> createHashPartitioner(int[] iArr, InternalTypeInfo<RowData> internalTypeInfo) {
        return new KeyGroupStreamPartitioner(KeySelectorUtil.getRowDataSelector(iArr, internalTypeInfo), 128);
    }
}
