package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.cep.EventComparator;
import org.apache.flink.cep.dynamic.DynamicCEPOptions;
import org.apache.flink.cep.dynamic.operator.DynamicCepOperatorFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PatternProcessorDiscovererTableFactory;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchDynamicSpec;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.nodes.exec.utils.TransformationMetadata;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@ExecNodeMetadata(name = "stream-exec-dynamic-match", version = 1, producedTransformations = {"timestamp-inserter", StreamExecDynamicMatch.MATCH_DYNAMIC_TRANSFORMATION}, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15)
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDynamicMatch.class */
public class StreamExecDynamicMatch extends ExecNodeBase<RowData> implements StreamExecNode<RowData>, MultipleTransformationTranslator<RowData>, StreamExecMatchMixin {
    public static final String TIMESTAMP_INSERTER_TRANSFORMATION = "timestamp-inserter";
    public static final String MATCH_DYNAMIC_TRANSFORMATION = "match-dynamic";
    public static final String FIELD_NAME_MATCH_SPEC = "matchSpec";

    @JsonProperty("matchSpec")
    private final MatchDynamicSpec matchSpec;

    public StreamExecDynamicMatch(ReadableConfig readableConfig, MatchDynamicSpec matchDynamicSpec, InputProperty inputProperty, RowType rowType, String str) {
        this(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecDynamicMatch.class), ExecNodeContext.newPersistedConfig(StreamExecDynamicMatch.class, readableConfig), matchDynamicSpec, Collections.singletonList(inputProperty), rowType, str);
    }

    @JsonCreator
    public StreamExecDynamicMatch(@JsonProperty("id") int i, @JsonProperty("type") ExecNodeContext execNodeContext, @JsonProperty("configuration") ReadableConfig readableConfig, @JsonProperty("matchSpec") MatchDynamicSpec matchDynamicSpec, @JsonProperty("inputProperties") List<InputProperty> list, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str) {
        super(i, execNodeContext, readableConfig, list, rowType, str);
        Preconditions.checkArgument(list.size() == 1);
        this.matchSpec = (MatchDynamicSpec) Preconditions.checkNotNull(matchDynamicSpec);
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        ExecEdge execEdge = getInputEdges().get(0);
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        RowType rowType = (RowType) execEdge.getOutputType();
        SortSpec orderKeys = this.matchSpec.getOrderKeys();
        checkOrderKeys(orderKeys, rowType);
        Transformation<RowData> translateOrder = translateOrder(orderKeys, translateToPlan, rowType, execNodeConfig, () -> {
            return Boolean.valueOf(this.inputsContainSingleton());
        });
        InternalTypeInfo outputType = translateToPlan.getOutputType();
        TypeSerializer createSerializer = outputType.createSerializer(plannerBase.getExecEnv().getConfig());
        ClassLoader classLoader = plannerBase.getFlinkContext().getClassLoader();
        PatternProcessorDiscovererTableFactory patternProcessorDiscovererTableFactory = (PatternProcessorDiscovererTableFactory) FactoryUtil.discoverFactory(classLoader, PatternProcessorDiscovererTableFactory.class, (String) Optional.ofNullable(this.matchSpec.getOptions()).map(map -> {
            return (String) map.get(DynamicCEPOptions.DISCOVERER_ID_OPTION.key());
        }).orElseThrow(() -> {
            return new TableException("PatternProcessorDiscovererFactory id is required. Provide it via " + DynamicCEPOptions.DISCOVERER_ID_OPTION.key() + " option.");
        }));
        DynamicCEPOptions<RowData> buildCepOptions = buildCepOptions(createEventComparator(orderKeys, execNodeConfig, classLoader, rowType), rowType);
        validateDiscovererFactoryOptions(patternProcessorDiscovererTableFactory, buildCepOptions.getAdditionalOptions());
        OneInputTransformation createOneInputTransformation = ExecNodeUtil.createOneInputTransformation((Transformation) translateOrder, createTransformationMeta(MATCH_DYNAMIC_TRANSFORMATION, execNodeConfig), (StreamOperatorFactory) new DynamicCepOperatorFactory(patternProcessorDiscovererTableFactory, createSerializer, buildCepOptions), (TypeInformation) InternalTypeInfo.of(getOutputType()), translateOrder.getParallelism(), false);
        RowDataKeySelector rowDataSelector = KeySelectorUtil.getRowDataSelector(classLoader, this.matchSpec.getPartition().getFieldIndices(), outputType);
        createOneInputTransformation.setStateKeySelector(rowDataSelector);
        createOneInputTransformation.setStateKeyType(rowDataSelector.getProducedType());
        if (inputsContainSingleton()) {
            createOneInputTransformation.setParallelism(1);
            createOneInputTransformation.setMaxParallelism(1);
        }
        return createOneInputTransformation;
    }

    private DynamicCEPOptions<RowData> buildCepOptions(EventComparator<RowData> eventComparator, RowType rowType) {
        DynamicCEPOptions.PatternTimeType patternTimeType = TypeCheckUtils.isProcTime(rowType.getTypeAt(this.matchSpec.getOrderKeys().getFieldSpec(0).getFieldIndex())) ? DynamicCEPOptions.PatternTimeType.PROCESSING_TIME : DynamicCEPOptions.PatternTimeType.EVENT_TIME;
        Configuration fromMap = Configuration.fromMap(this.matchSpec.getOptions());
        DynamicCEPOptions.Builder withTimeType = DynamicCEPOptions.builder().withCoordinatorFailBehaviour((DynamicCEPOptions.CoordinatorFailBehaviour) fromMap.get(DynamicCEPOptions.COORDINATOR_FAIL_BEHAVIOUR_OPTION)).withEmptyProcessorsBehaviour((DynamicCEPOptions.EmptyProcessorsBehaviour) fromMap.get(DynamicCEPOptions.EMPTY_PROCESSORS_BEHAVIOUR_OPTION)).withOperatorStartupBehaviour((DynamicCEPOptions.OperatorStartupBehaviour) fromMap.get(DynamicCEPOptions.OPERATOR_STARTUP_BEHAVIOUR_OPTION)).withComparator(eventComparator).withTimeType(patternTimeType);
        fromMap.removeConfig(DynamicCEPOptions.COORDINATOR_FAIL_BEHAVIOUR_OPTION);
        fromMap.removeConfig(DynamicCEPOptions.EMPTY_PROCESSORS_BEHAVIOUR_OPTION);
        fromMap.removeConfig(DynamicCEPOptions.OPERATOR_STARTUP_BEHAVIOUR_OPTION);
        fromMap.removeConfig(DynamicCEPOptions.DISCOVERER_ID_OPTION);
        return withTimeType.withAdditionalOptions(fromMap).build();
    }

    private void validateDiscovererFactoryOptions(PatternProcessorDiscovererTableFactory patternProcessorDiscovererTableFactory, Configuration configuration) {
        FactoryUtil.validateFactoryOptions(patternProcessorDiscovererTableFactory, configuration);
        ArrayList arrayList = new ArrayList(patternProcessorDiscovererTableFactory.requiredOptions());
        arrayList.addAll(patternProcessorDiscovererTableFactory.optionalOptions());
        FactoryUtil.validateUnconsumedKeys(patternProcessorDiscovererTableFactory.factoryIdentifier(), configuration.keySet(), (Set) arrayList.stream().map((v0) -> {
            return v0.key();
        }).collect(Collectors.toSet()));
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMatchMixin
    public TransformationMetadata createTimestampInserterTransformationMeta(int i, ExecNodeConfig execNodeConfig) {
        return createTransformationMeta("timestamp-inserter", String.format("StreamRecordTimestampInserter(rowtime field: %s)", Integer.valueOf(i)), "StreamRecordTimestampInserter", execNodeConfig);
    }
}
