package com.huawei.streaming.cql.executor.operatorinfocreater;

import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.huawei.streaming.api.AnnotationUtils;
import com.huawei.streaming.api.Application;
import com.huawei.streaming.api.opereators.BaseDataSourceOperator;
import com.huawei.streaming.api.opereators.DataSourceOperator;
import com.huawei.streaming.api.opereators.Operator;
import com.huawei.streaming.api.opereators.OperatorTransition;
import com.huawei.streaming.api.opereators.RDBDataSourceOperator;
import com.huawei.streaming.api.streams.Schema;
import com.huawei.streaming.config.StreamingConfig;
import com.huawei.streaming.cql.CQLUtils;
import com.huawei.streaming.cql.exception.ApplicationBuildException;
import com.huawei.streaming.cql.exception.ExecutorException;
import com.huawei.streaming.cql.exception.ParseException;
import com.huawei.streaming.cql.executor.operatorviewscreater.AggResultSetMergeViewCreator;
import com.huawei.streaming.cql.executor.operatorviewscreater.AggResultSetParameters;
import com.huawei.streaming.cql.executor.operatorviewscreater.FilterViewExpressionCreator;
import com.huawei.streaming.cql.executor.operatorviewscreater.WindowViewCreator;
import com.huawei.streaming.cql.semanticanalyzer.parser.IParser;
import com.huawei.streaming.cql.semanticanalyzer.parser.ParserFactory;
import com.huawei.streaming.cql.semanticanalyzer.parser.context.DataSourceBodyContext;
import com.huawei.streaming.cql.semanticanalyzer.parser.context.ExpressionContext;
import com.huawei.streaming.datasource.DataSourceContainer;
import com.huawei.streaming.datasource.IDataSource;
import com.huawei.streaming.datasource.PreStatementRDBDataSource;
import com.huawei.streaming.event.Attribute;
import com.huawei.streaming.event.EventTypeMng;
import com.huawei.streaming.event.IEventType;
import com.huawei.streaming.event.TupleEventType;
import com.huawei.streaming.exception.ErrorCode;
import com.huawei.streaming.exception.StreamingException;
import com.huawei.streaming.expression.IExpression;
import com.huawei.streaming.operator.AbsOperator;
import com.huawei.streaming.operator.functionstream.DataSourceFunctionOp;
import com.huawei.streaming.process.agg.resultmerge.IAggResultSetMerge;
import com.huawei.streaming.process.join.CrossBiJoinComposer;
import com.huawei.streaming.process.join.IJoinComposer;
import com.huawei.streaming.process.join.JoinFilterProcessor;
import com.huawei.streaming.process.join.SimpleEventCollection;
import com.huawei.streaming.util.StreamingUtils;
import com.huawei.streaming.window.IWindow;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/streaming/cql/executor/operatorinfocreater/DataSourceInfoOperatorCreator.class */
public class DataSourceInfoOperatorCreator implements OperatorInfoCreator {
    private static final Logger LOG = LoggerFactory.getLogger(DataSourceInfoOperatorCreator.class);
    private BaseDataSourceOperator dataSourceOperator;
    private List<Schema> leftInputSchemas;
    private List<Schema> rightInputSchemas;
    private List<Schema> outputSchemas;
    private Map<String, String> applicationConfig;
    private OperatorTransition leftTransitionIn = null;
    private OperatorTransition transitionOut = null;
    private IEventType leftInputTupleEvent = null;
    private IEventType rightInputTupleEvent = null;

    @Override // com.huawei.streaming.cql.executor.operatorinfocreater.OperatorInfoCreator
    public AbsOperator createInstance(Application application, Operator operator, EventTypeMng eventTypeMng, Map<String, String> map) throws StreamingException {
        prepare(application, operator, eventTypeMng, map);
        IWindow create = new WindowViewCreator().create(this.leftInputSchemas, this.dataSourceOperator.getLeftWindow(), map);
        JoinFilterProcessor createJoinFilterProcessor = createJoinFilterProcessor();
        IExpression iExpression = null;
        if (createJoinFilterProcessor != null) {
            iExpression = createJoinFilterProcessor.getExpr();
        }
        IAggResultSetMerge create2 = new AggResultSetMergeViewCreator(createResultSetMergeParmeters(eventTypeMng, create, iExpression)).create();
        DataSourceContainer createDataSourceInstance = createDataSourceInstance();
        DataSourceFunctionOp dataSourceFunctionOp = new DataSourceFunctionOp(create, createDataSourceInstance, createDataSourceCQLExpressions(createDataSourceInstance), createJoinComposer(), createJoinFilterProcessor, create2, OutputTypeAnalyzer.createOutputType(this.dataSourceOperator.getLeftWindow()));
        StreamingConfig streamingConfig = new StreamingConfig();
        streamingConfig.putAll(this.applicationConfig);
        setUniDirectionConfig(streamingConfig);
        setJoinConfig(streamingConfig);
        dataSourceFunctionOp.setConfig(streamingConfig);
        return OperatorInfoCreatorFactory.buildStreamOperator(operator, dataSourceFunctionOp);
    }

    private Map<String, IExpression> createDataSourceCQLExpressions(DataSourceContainer dataSourceContainer) throws ExecutorException {
        HashMap newHashMap = Maps.newHashMap();
        for (String str : dataSourceContainer.getCQLQueryArguments()) {
            newHashMap.put(str, new FilterViewExpressionCreator().create(this.leftInputSchemas, str, this.applicationConfig));
        }
        return newHashMap;
    }

    private DataSourceContainer createDataSourceInstance() throws ExecutorException {
        if (!(this.dataSourceOperator instanceof RDBDataSourceOperator)) {
            return convertCommonDataSource((DataSourceOperator) this.dataSourceOperator);
        }
        DataSourceOperator createRDBDataSource = createRDBDataSource(this.dataSourceOperator);
        setDataSourceConfig(createRDBDataSource);
        createRDBDataSource.setDataSourceClassName(PreStatementRDBDataSource.class.getName());
        return convertCommonDataSource(createRDBDataSource);
    }

    private void setDataSourceConfig(DataSourceOperator dataSourceOperator) throws ApplicationBuildException {
        dataSourceOperator.setDataSourceConfig(AnnotationUtils.getAnnotationsToConfig(this.dataSourceOperator));
    }

    private DataSourceOperator createRDBDataSource(BaseDataSourceOperator baseDataSourceOperator) {
        DataSourceOperator dataSourceOperator = new DataSourceOperator(baseDataSourceOperator.getId(), baseDataSourceOperator.getParallelNumber());
        dataSourceOperator.setName(baseDataSourceOperator.getName());
        dataSourceOperator.setArgs(baseDataSourceOperator.getArgs());
        dataSourceOperator.setLeftStreamName(baseDataSourceOperator.getLeftStreamName());
        dataSourceOperator.setLeftWindow(baseDataSourceOperator.getLeftWindow());
        dataSourceOperator.setFilterAfterJoinExpression(baseDataSourceOperator.getFilterAfterJoinExpression());
        dataSourceOperator.setQueryArguments(baseDataSourceOperator.getQueryArguments());
        dataSourceOperator.setDataSourceSchema(baseDataSourceOperator.getDataSourceSchema());
        dataSourceOperator.setFilterAfterAggregate(baseDataSourceOperator.getFilterAfterAggregate());
        dataSourceOperator.setGroupbyExpression(baseDataSourceOperator.getGroupbyExpression());
        baseDataSourceOperator.setOrderBy(baseDataSourceOperator.getOrderBy());
        baseDataSourceOperator.setLimit(baseDataSourceOperator.getLimit());
        dataSourceOperator.setOutputExpression(baseDataSourceOperator.getOutputExpression());
        return dataSourceOperator;
    }

    private DataSourceContainer convertCommonDataSource(DataSourceOperator dataSourceOperator) throws ExecutorException {
        DataSourceContainer dataSourceContainer = new DataSourceContainer();
        dataSourceContainer.setSchema(this.rightInputTupleEvent);
        dataSourceContainer.setDataSource(createIDataSource(dataSourceOperator));
        try {
            dataSourceContainer.setQueryArguments(dataSourceQueryArguments());
            return dataSourceContainer;
        } catch (StreamingException e) {
            throw ExecutorException.wrapStreamingException(e);
        }
    }

    private IDataSource createIDataSource(DataSourceOperator dataSourceOperator) throws ExecutorException {
        IDataSource createIDataSourceInstance = createIDataSourceInstance(dataSourceOperator);
        StreamingConfig streamingConfig = new StreamingConfig();
        streamingConfig.putAll(this.applicationConfig);
        if (dataSourceOperator.getDataSourceConfig() != null) {
            streamingConfig.putAll(dataSourceOperator.getDataSourceConfig());
        }
        try {
            createIDataSourceInstance.setConfig(streamingConfig);
            createIDataSourceInstance.setSchema(this.rightInputTupleEvent);
            return createIDataSourceInstance;
        } catch (StreamingException e) {
            throw ExecutorException.wrapStreamingException(e);
        }
    }

    private IDataSource createIDataSourceInstance(DataSourceOperator dataSourceOperator) throws ApplicationBuildException {
        String dataSourceClassName = dataSourceOperator.getDataSourceClassName();
        try {
            return (IDataSource) Class.forName(dataSourceClassName, true, CQLUtils.getClassLoader()).newInstance();
        } catch (ReflectiveOperationException e) {
            StreamingException applicationBuildException = new ApplicationBuildException(ErrorCode.SEMANTICANALYZE_DATASOURCE_UNKNOWN, dataSourceClassName);
            LOG.error("Unknown dataSource, reflective operation error.", applicationBuildException);
            throw applicationBuildException;
        }
    }

    private String[] dataSourceQueryArguments() throws ExecutorException {
        String join = Joiner.on(", ").join(this.dataSourceOperator.getQueryArguments());
        IParser createDataSourceArgumentsParser = ParserFactory.createDataSourceArgumentsParser();
        try {
            if (Strings.isNullOrEmpty(join)) {
                return new String[]{""};
            }
            List<ExpressionContext> queryarguments = ((DataSourceBodyContext) createDataSourceArgumentsParser.parse(join)).getQueryarguments();
            String[] strArr = new String[queryarguments.size()];
            for (int i = 0; i < strArr.length; i++) {
                strArr[i] = queryarguments.get(i).toString();
            }
            return strArr;
        } catch (ParseException e) {
            StreamingException applicationBuildException = new ApplicationBuildException(ErrorCode.SEMANTICANALYZE_PARSE_ERROR, join);
            LOG.error("Data source semantic analyze error.", applicationBuildException);
            throw applicationBuildException;
        }
    }

    private void prepare(Application application, Operator operator, EventTypeMng eventTypeMng, Map<String, String> map) throws ExecutorException {
        this.applicationConfig = map;
        this.dataSourceOperator = (BaseDataSourceOperator) operator;
        this.leftTransitionIn = OperatorInfoCreatorFactory.getTransitionIn(application, operator, this.dataSourceOperator.getLeftStreamName());
        this.transitionOut = OperatorInfoCreatorFactory.getTransitionOut(application, operator);
        this.leftInputSchemas = OperatorInfoCreatorFactory.getSchemasByTransition(application, this.leftTransitionIn);
        this.rightInputSchemas = Lists.newArrayList(new Schema[]{this.dataSourceOperator.getDataSourceSchema()});
        this.outputSchemas = OperatorInfoCreatorFactory.getSchemasByTransition(application, this.transitionOut);
        this.leftInputTupleEvent = eventTypeMng.getEventType(this.leftTransitionIn.getSchemaName());
        this.rightInputTupleEvent = parseSchemaToIEvent(this.dataSourceOperator.getDataSourceSchema());
    }

    private TupleEventType parseSchemaToIEvent(Schema schema) throws ExecutorException {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < schema.getCols().size(); i++) {
            String type = schema.getCols().get(i).getType();
            try {
                arrayList.add(new Attribute(Class.forName(type, true, CQLUtils.getClassLoader()), schema.getCols().get(i).getName()));
            } catch (ClassNotFoundException e) {
                StreamingException applicationBuildException = new ApplicationBuildException(ErrorCode.SEMANTICANALYZE_UNSUPPORTED_DATATYPE, type);
                LOG.error("Unsupport data type.", applicationBuildException);
                throw applicationBuildException;
            }
        }
        return new TupleEventType(schema.getId(), arrayList);
    }

    private AggResultSetParameters createResultSetMergeParmeters(EventTypeMng eventTypeMng, IWindow iWindow, IExpression iExpression) throws ExecutorException {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(cloneSchema(this.leftInputSchemas, this.leftTransitionIn.getStreamName()));
        arrayList.addAll(cloneSchema(this.rightInputSchemas, this.rightInputTupleEvent.getEventTypeName()));
        HashMap hashMap = new HashMap();
        hashMap.put(this.leftInputSchemas.get(0).getStreamName(), iWindow);
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(this.dataSourceOperator.getLeftWindow());
        AggResultSetParameters aggResultSetParameters = new AggResultSetParameters();
        aggResultSetParameters.setBasicAggOperator(this.dataSourceOperator);
        aggResultSetParameters.setInputSchemas(arrayList);
        aggResultSetParameters.setStreamschema(eventTypeMng);
        aggResultSetParameters.setOutputSchemas(this.outputSchemas);
        aggResultSetParameters.setTransitionOut(this.transitionOut);
        aggResultSetParameters.setStreamWindows(hashMap);
        aggResultSetParameters.setExpressionBeforeAggregate(iExpression);
        aggResultSetParameters.setSystemConfig(this.applicationConfig);
        aggResultSetParameters.setOperatorWindows(newArrayList);
        return aggResultSetParameters;
    }

    private List<Schema> cloneSchema(List<Schema> list, String str) throws ExecutorException {
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < list.size(); i++) {
            Schema cloneSchema = list.get(i).cloneSchema();
            cloneSchema.setStreamName(str);
            newArrayList.add(cloneSchema);
        }
        return newArrayList;
    }

    private void setJoinConfig(StreamingConfig streamingConfig) {
        streamingConfig.put("operator.join.inner.left.input.stream.name", this.dataSourceOperator.getLeftStreamName());
        streamingConfig.put("operator.join.inner.left.schema", StreamingUtils.serializeSchema(this.leftInputTupleEvent));
    }

    private void setUniDirectionConfig(StreamingConfig streamingConfig) {
        streamingConfig.put("operator.join.inner.unidirectional", true);
        streamingConfig.put("operator.join.inner.unidirectional.index", 0);
        streamingConfig.put("operator.selfjoin.inner.unidirectional", true);
        streamingConfig.put("operator.selfjoin.inner.unidirectional.index", 0);
    }

    private JoinFilterProcessor createJoinFilterProcessor() throws ExecutorException {
        if (this.dataSourceOperator.getFilterAfterJoinExpression() == null) {
            return null;
        }
        return new JoinFilterProcessor(new FilterViewExpressionCreator().create(getInputStream(), this.dataSourceOperator.getFilterAfterJoinExpression(), this.applicationConfig));
    }

    private List<Schema> getInputStream() {
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.addAll(this.leftInputSchemas);
        newArrayList.addAll(this.rightInputSchemas);
        return newArrayList;
    }

    private IJoinComposer createJoinComposer() throws ExecutorException {
        return createCrossBiJoinComposer();
    }

    private IJoinComposer createCrossBiJoinComposer() {
        return new CrossBiJoinComposer(new SimpleEventCollection(this.dataSourceOperator.getLeftStreamName(), this.leftInputTupleEvent), new SimpleEventCollection(this.rightInputTupleEvent.getEventTypeName(), this.rightInputTupleEvent), false);
    }
}
