package org.apache.flink.table.planner.plan.rules.physical.stream;

import java.util.List;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.functions.python.PythonFunctionKind;
import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregate;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalPythonOverAggregate;
import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
import org.apache.flink.table.planner.plan.utils.PythonUtil;

/* loaded from: input_file:org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonOverAggregateRule.class */
public class StreamPhysicalPythonOverAggregateRule extends ConverterRule {
    public static final StreamPhysicalPythonOverAggregateRule INSTANCE = new StreamPhysicalPythonOverAggregateRule(ConverterRule.Config.INSTANCE.withConversion(FlinkLogicalOverAggregate.class, FlinkConventions.LOGICAL(), FlinkConventions.STREAM_PHYSICAL(), "StreamPhysicalPythonOverAggregateRule").withRuleFactory(StreamPhysicalPythonOverAggregateRule::new));

    private StreamPhysicalPythonOverAggregateRule(ConverterRule.Config config) {
        super(config);
    }

    @Override // org.apache.calcite.plan.RelOptRule
    public boolean matches(RelOptRuleCall relOptRuleCall) {
        FlinkLogicalOverAggregate flinkLogicalOverAggregate = (FlinkLogicalOverAggregate) relOptRuleCall.rel(0);
        List<AggregateCall> aggregateCalls = flinkLogicalOverAggregate.groups.get(0).getAggregateCalls(flinkLogicalOverAggregate);
        boolean anyMatch = aggregateCalls.stream().anyMatch(aggregateCall -> {
            return PythonUtil.isPythonAggregate(aggregateCall, PythonFunctionKind.GENERAL);
        });
        boolean anyMatch2 = aggregateCalls.stream().anyMatch(aggregateCall2 -> {
            return PythonUtil.isPythonAggregate(aggregateCall2, PythonFunctionKind.PANDAS);
        });
        boolean anyMatch3 = aggregateCalls.stream().anyMatch(aggregateCall3 -> {
            return !PythonUtil.isPythonAggregate(aggregateCall3, null);
        });
        if (!anyMatch2 && !anyMatch) {
            return false;
        }
        if (anyMatch) {
            throw new TableException("Non-Pandas Python UDAFs are not supported in stream mode currently.");
        }
        if (anyMatch3) {
            throw new TableException("Python UDAF and Java/Scala UDAF cannot be used together.");
        }
        return true;
    }

    @Override // org.apache.calcite.rel.convert.ConverterRule
    public RelNode convert(RelNode relNode) {
        FlinkLogicalOverAggregate flinkLogicalOverAggregate = (FlinkLogicalOverAggregate) relNode;
        if (flinkLogicalOverAggregate.groups.size() > 1) {
            throw new TableException("Over Agg: Unsupported use of OVER windows. All aggregates must be computed on the same window. please re-check the over window statement.");
        }
        ImmutableBitSet immutableBitSet = flinkLogicalOverAggregate.groups.get(0).keys;
        FlinkRelDistribution hash = !immutableBitSet.isEmpty() ? FlinkRelDistribution.hash(immutableBitSet.asList(), true, true) : FlinkRelDistribution.SINGLETON();
        RelNode input = flinkLogicalOverAggregate.getInput();
        return new StreamPhysicalPythonOverAggregate(relNode.getCluster(), relNode.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL()), RelOptRule.convert(input, input.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL()).replace(hash)), relNode.getRowType(), flinkLogicalOverAggregate);
    }
}
