package com.huawei.streaming.cql.executor.operatorinfocreater;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.huawei.streaming.api.Application;
import com.huawei.streaming.api.opereators.CombineOperator;
import com.huawei.streaming.api.opereators.Operator;
import com.huawei.streaming.api.opereators.OperatorTransition;
import com.huawei.streaming.api.streams.Schema;
import com.huawei.streaming.config.StreamingConfig;
import com.huawei.streaming.cql.exception.ExecutorException;
import com.huawei.streaming.cql.exception.SemanticAnalyzerException;
import com.huawei.streaming.cql.executor.expressioncreater.PropertyValueExpressionCreator;
import com.huawei.streaming.cql.semanticanalyzer.SemanticAnalyzerFactory;
import com.huawei.streaming.cql.semanticanalyzer.analyzecontext.SelectClauseAnalyzeContext;
import com.huawei.streaming.cql.semanticanalyzer.analyzecontext.expressiondesc.ExpressionDescribe;
import com.huawei.streaming.cql.semanticanalyzer.analyzecontext.expressiondesc.PropertyValueExpressionDesc;
import com.huawei.streaming.cql.semanticanalyzer.parser.ParserFactory;
import com.huawei.streaming.event.EventTypeMng;
import com.huawei.streaming.event.IEventType;
import com.huawei.streaming.exception.ErrorCode;
import com.huawei.streaming.exception.StreamingException;
import com.huawei.streaming.expression.IExpression;
import com.huawei.streaming.expression.PropertyValueExpression;
import com.huawei.streaming.operator.AbsOperator;
import com.huawei.streaming.operator.functionstream.CombineFunctionOp;
import com.huawei.streaming.util.StreamingUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/streaming/cql/executor/operatorinfocreater/CombineInfoCreator.class */
public class CombineInfoCreator implements OperatorInfoCreator {
    private static final Logger LOG = LoggerFactory.getLogger(CombineInfoCreator.class);
    private CombineOperator combineOperator;
    private List<OperatorTransition> transitionIn = null;
    private OperatorTransition transitionOut = null;
    private List<Schema> inputSchemas = Lists.newArrayList();
    private IEventType outputTupleEvent = null;
    private Map<String, String> applicationConfig;

    @Override // com.huawei.streaming.cql.executor.operatorinfocreater.OperatorInfoCreator
    public AbsOperator createInstance(Application application, Operator operator, EventTypeMng eventTypeMng, Map<String, String> map) throws StreamingException {
        prepare(application, operator, eventTypeMng, map);
        Iterator<OperatorTransition> it = this.transitionIn.iterator();
        while (it.hasNext()) {
            this.inputSchemas.addAll(OperatorInfoCreatorFactory.getSchemasByTransition(application, it.next()));
        }
        CombineFunctionOp combineFunctionOp = new CombineFunctionOp();
        StreamingConfig streamingConfig = new StreamingConfig();
        if (operator.getArgs() != null) {
            streamingConfig.putAll(operator.getArgs());
        }
        streamingConfig.putAll(this.applicationConfig);
        setInputStreams(streamingConfig);
        setCombineConditions(streamingConfig);
        setInputSchemas(eventTypeMng, streamingConfig);
        setCombineOutputs(streamingConfig);
        streamingConfig.put("streaming.inner.output.schema", StreamingUtils.serializeSchema(this.outputTupleEvent));
        streamingConfig.put("streaming.inner.output.stream.name", this.transitionOut.getStreamName());
        combineFunctionOp.setConfig(streamingConfig);
        return OperatorInfoCreatorFactory.buildStreamOperator(operator, combineFunctionOp);
    }

    private void prepare(Application application, Operator operator, EventTypeMng eventTypeMng, Map<String, String> map) throws ExecutorException {
        this.applicationConfig = map;
        this.combineOperator = (CombineOperator) operator;
        this.transitionIn = OperatorInfoCreatorFactory.getTransitionIn(application, operator);
        this.transitionOut = OperatorInfoCreatorFactory.getTransitionOut(application, operator);
        this.outputTupleEvent = eventTypeMng.getEventType(this.transitionOut.getSchemaName());
    }

    private void setCombineOutputs(StreamingConfig streamingConfig) throws ExecutorException {
        List<ExpressionDescribe> createCombineExpressions = createCombineExpressions(this.combineOperator.getOutputExpression());
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        for (int i = 0; i < createCombineExpressions.size(); i++) {
            if (!(createCombineExpressions.get(i) instanceof PropertyValueExpressionDesc)) {
                StreamingException executorException = new ExecutorException(ErrorCode.SEMANTICANALYZE_COMBINE_SIMPLE_EXPRESSION, new String[0]);
                LOG.error("Not property value expression in combie clause.", executorException);
                throw executorException;
            }
            PropertyValueExpressionDesc propertyValueExpressionDesc = (PropertyValueExpressionDesc) createCombineExpressions.get(i);
            String schemaId = propertyValueExpressionDesc.getSchemaId();
            PropertyValueExpression propertyValueExpression = (PropertyValueExpression) new PropertyValueExpressionCreator().createInstance(propertyValueExpressionDesc, null);
            if (newArrayList.size() == 0) {
                newArrayList.add(schemaId);
                newArrayList2.add(new ArrayList());
                newArrayList2.get(newArrayList2.size() - 1).add(propertyValueExpression);
            } else if (newArrayList.get(newArrayList.size() - 1).equals(schemaId)) {
                newArrayList2.get(newArrayList2.size() - 1).add(propertyValueExpression);
            } else {
                checkStreamNameRepeated(newArrayList, schemaId);
                newArrayList.add(schemaId);
                newArrayList2.add(new ArrayList());
                newArrayList2.get(newArrayList2.size() - 1).add(propertyValueExpression);
            }
        }
        streamingConfig.put("operator.combine.inputnames.and.expression", createOutputMaps(newArrayList, newArrayList2));
    }

    private Map<String, IExpression[]> createOutputMaps(List<String> list, List<List<PropertyValueExpression>> list2) throws ExecutorException {
        HashMap newHashMap = Maps.newHashMap();
        for (int i = 0; i < list.size(); i++) {
            String str = list.get(i);
            List<PropertyValueExpression> list3 = list2.get(i);
            newHashMap.put(getPlanStreamNameFromTransition(str), list3.toArray(new PropertyValueExpression[list3.size()]));
        }
        return newHashMap;
    }

    private void checkStreamNameRepeated(List<String> list, String str) throws ExecutorException {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            if (str.equals(it.next())) {
                StreamingException executorException = new ExecutorException(ErrorCode.SEMANTICANALYZE_COMBINE_EXPRESSION_TOGETHER, new String[0]);
                LOG.error("Columns in same stream not together in combine select clause.", executorException);
                throw executorException;
            }
        }
    }

    private void setInputSchemas(EventTypeMng eventTypeMng, StreamingConfig streamingConfig) {
        HashMap newHashMap = Maps.newHashMap();
        for (OperatorTransition operatorTransition : this.transitionIn) {
            newHashMap.put(operatorTransition.getStreamName(), eventTypeMng.getEventType(operatorTransition.getSchemaName()));
        }
        streamingConfig.put("operator.combine.inputnames.and.schema", newHashMap);
    }

    private void setInputStreams(StreamingConfig streamingConfig) throws ExecutorException {
        String orderedStreams = this.combineOperator.getOrderedStreams();
        ArrayList newArrayList = Lists.newArrayList();
        for (String str : orderedStreams.split(",")) {
            newArrayList.add(getPlanStreamNameFromTransition(str));
        }
        streamingConfig.put("operator.combine.inputnames", newArrayList);
    }

    private void setCombineConditions(StreamingConfig streamingConfig) throws ExecutorException {
        HashMap newHashMap = Maps.newHashMap();
        Iterator<ExpressionDescribe> it = createCombineExpressions(this.combineOperator.getCombineProperties()).iterator();
        while (it.hasNext()) {
            PropertyValueExpressionDesc propertyValueExpressionDesc = (PropertyValueExpressionDesc) it.next();
            newHashMap.put(getPlanStreamNameFromTransition(propertyValueExpressionDesc.getSchemaId()), propertyValueExpressionDesc.getProperty());
        }
        streamingConfig.put("operator.combine.inputnames.and.key", newHashMap);
    }

    private String getPlanStreamNameFromTransition(String str) throws ExecutorException {
        for (OperatorTransition operatorTransition : this.transitionIn) {
            if (operatorTransition.getSchemaName().equals(str)) {
                return operatorTransition.getStreamName();
            }
        }
        StreamingException executorException = new ExecutorException(ErrorCode.SEMANTICANALYZE_NOFOUND_STREAM, str);
        LOG.error("Can't find stream from transition by stream name {}.", str, executorException);
        throw executorException;
    }

    private List<ExpressionDescribe> createCombineExpressions(String str) throws SemanticAnalyzerException {
        return ((SelectClauseAnalyzeContext) SemanticAnalyzerFactory.createAnalyzer(ParserFactory.createSelectClauseParser().parse(str), this.inputSchemas).analyze()).getExpdes();
    }
}
