package com.huawei.streaming.cql.executor.operatorinfocreater;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.huawei.streaming.api.Application;
import com.huawei.streaming.api.opereators.JoinFunctionOperator;
import com.huawei.streaming.api.opereators.Operator;
import com.huawei.streaming.api.opereators.OperatorTransition;
import com.huawei.streaming.api.opereators.UniDiRectionType;
import com.huawei.streaming.api.opereators.Window;
import com.huawei.streaming.api.streams.Schema;
import com.huawei.streaming.config.StreamingConfig;
import com.huawei.streaming.cql.exception.ExecutorException;
import com.huawei.streaming.cql.executor.operatorviewscreater.AggResultSetMergeViewCreator;
import com.huawei.streaming.cql.executor.operatorviewscreater.AggResultSetParameters;
import com.huawei.streaming.cql.executor.operatorviewscreater.FilterViewExpressionCreator;
import com.huawei.streaming.cql.executor.operatorviewscreater.WindowViewCreator;
import com.huawei.streaming.event.EventTypeMng;
import com.huawei.streaming.event.IEventType;
import com.huawei.streaming.exception.ErrorCode;
import com.huawei.streaming.exception.StreamingException;
import com.huawei.streaming.expression.ExpressionOperator;
import com.huawei.streaming.expression.IExpression;
import com.huawei.streaming.expression.LogicExpression;
import com.huawei.streaming.expression.PropertyValueExpression;
import com.huawei.streaming.expression.RelationExpression;
import com.huawei.streaming.operator.AbsOperator;
import com.huawei.streaming.operator.FunctionOperator;
import com.huawei.streaming.operator.functionstream.JoinFunctionOp;
import com.huawei.streaming.operator.functionstream.SelfJoinFunctionOp;
import com.huawei.streaming.output.OutputType;
import com.huawei.streaming.process.agg.resultmerge.IAggResultSetMerge;
import com.huawei.streaming.process.join.CrossBiJoinComposer;
import com.huawei.streaming.process.join.FullOutBiJoinComposer;
import com.huawei.streaming.process.join.IJoinComposer;
import com.huawei.streaming.process.join.IndexedMultiPropertyEventCollection;
import com.huawei.streaming.process.join.InnerBiJoinComposer;
import com.huawei.streaming.process.join.JoinFilterProcessor;
import com.huawei.streaming.process.join.SideBiJoinComposer;
import com.huawei.streaming.process.join.SideJoinType;
import com.huawei.streaming.process.join.SimpleEventCollection;
import com.huawei.streaming.util.StreamingUtils;
import com.huawei.streaming.window.IWindow;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/streaming/cql/executor/operatorinfocreater/JoinInfoOperatorCreator.class */
public class JoinInfoOperatorCreator implements OperatorInfoCreator {
    private static final Logger LOG = LoggerFactory.getLogger(JoinInfoOperatorCreator.class);
    private static final int JOINEXPRESSIONARRLENGHT = 2;
    private static final String DEFAULT_LEFT_SELFJOIN_STREAM_NAME = "left_self";
    private static final String DEFAULT_RIGHT_SELFJOIN_STREAM_NAME = "right_self";
    private JoinFunctionOperator joinOperator;
    private List<Schema> leftInputSchemas;
    private List<Schema> rightInputSchemas;
    private List<Schema> outputSchemas;
    private Map<String, String> applicationConfig;
    private OutputType outputType;
    private OperatorTransition leftTransitionIn = null;
    private OperatorTransition rightTransitionIn = null;
    private OperatorTransition transitionOut = null;
    private IEventType leftInputTupleEvent = null;
    private IEventType rightInputTupleEvent = null;

    @Override // com.huawei.streaming.cql.executor.operatorinfocreater.OperatorInfoCreator
    public AbsOperator createInstance(Application application, Operator operator, EventTypeMng eventTypeMng, Map<String, String> map) throws StreamingException {
        prepare(application, operator, eventTypeMng, map);
        this.outputType = getOutputType(this.joinOperator.getLeftWindow(), this.joinOperator.getRightWindow(), this.joinOperator.getUniDirectionIndex());
        IWindow create = new WindowViewCreator().create(this.leftInputSchemas, this.joinOperator.getLeftWindow(), map);
        IWindow create2 = new WindowViewCreator().create(this.rightInputSchemas, this.joinOperator.getRightWindow(), map);
        JoinFilterProcessor createJoinFilterProcessor = createJoinFilterProcessor();
        IExpression iExpression = null;
        if (createJoinFilterProcessor != null) {
            iExpression = createJoinFilterProcessor.getExpr();
        }
        FunctionOperator createJoinOperator = createJoinOperator(create, create2, createJoinFilterProcessor, new AggResultSetMergeViewCreator(createResultSetMergeParmeters(eventTypeMng, create, create2, iExpression)).create());
        StreamingConfig streamingConfig = new StreamingConfig();
        streamingConfig.putAll(this.applicationConfig);
        setUniDirectionConfig(streamingConfig);
        setJoinConfig(streamingConfig);
        createJoinOperator.setConfig(streamingConfig);
        return OperatorInfoCreatorFactory.buildStreamOperator(operator, createJoinOperator);
    }

    private FunctionOperator createJoinOperator(IWindow iWindow, IWindow iWindow2, JoinFilterProcessor joinFilterProcessor, IAggResultSetMerge iAggResultSetMerge) throws ExecutorException {
        if (!isSelfJoin()) {
            return new JoinFunctionOp(iWindow, iWindow2, createJoinComposer(), joinFilterProcessor, iAggResultSetMerge, this.outputType);
        }
        if (UniDiRectionType.NONE_STREAM != this.joinOperator.getUniDirectionIndex()) {
            return new SelfJoinFunctionOp(iWindow, iWindow2, createJoinComposer(), joinFilterProcessor, iAggResultSetMerge, this.outputType);
        }
        StreamingException executorException = new ExecutorException(ErrorCode.SEMANTICANALYZE_UNIDIRECTION_SELFJOIN, new String[0]);
        LOG.error("No Unidirection in selfJoin.", executorException);
        throw executorException;
    }

    private void prepare(Application application, Operator operator, EventTypeMng eventTypeMng, Map<String, String> map) throws ExecutorException {
        this.applicationConfig = map;
        this.joinOperator = (JoinFunctionOperator) operator;
        this.leftTransitionIn = OperatorInfoCreatorFactory.getTransitionIn(application, operator, this.joinOperator.getLeftStreamName());
        this.rightTransitionIn = OperatorInfoCreatorFactory.getTransitionIn(application, operator, this.joinOperator.getRightStreamName());
        this.transitionOut = OperatorInfoCreatorFactory.getTransitionOut(application, operator);
        this.leftInputSchemas = OperatorInfoCreatorFactory.getSchemasByTransition(application, this.leftTransitionIn);
        this.rightInputSchemas = OperatorInfoCreatorFactory.getSchemasByTransition(application, this.rightTransitionIn);
        this.outputSchemas = OperatorInfoCreatorFactory.getSchemasByTransition(application, this.transitionOut);
        this.leftInputTupleEvent = eventTypeMng.getEventType(this.leftTransitionIn.getSchemaName());
        this.rightInputTupleEvent = eventTypeMng.getEventType(this.rightTransitionIn.getSchemaName());
    }

    private AggResultSetParameters createResultSetMergeParmeters(EventTypeMng eventTypeMng, IWindow iWindow, IWindow iWindow2, IExpression iExpression) throws ExecutorException {
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.addAll(cloneSchema(this.leftInputSchemas, this.leftTransitionIn.getStreamName()));
        newArrayList.addAll(cloneSchema(this.rightInputSchemas, this.rightTransitionIn.getStreamName()));
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(this.leftInputSchemas.get(0).getStreamName(), iWindow);
        newHashMap.put(this.rightInputSchemas.get(0).getStreamName(), iWindow2);
        ArrayList newArrayList2 = Lists.newArrayList();
        newArrayList2.add(this.joinOperator.getLeftWindow());
        newArrayList2.add(this.joinOperator.getRightWindow());
        AggResultSetParameters aggResultSetParameters = new AggResultSetParameters();
        aggResultSetParameters.setBasicAggOperator(this.joinOperator);
        aggResultSetParameters.setInputSchemas(newArrayList);
        aggResultSetParameters.setStreamschema(eventTypeMng);
        aggResultSetParameters.setOutputSchemas(this.outputSchemas);
        aggResultSetParameters.setTransitionOut(this.transitionOut);
        aggResultSetParameters.setStreamWindows(newHashMap);
        aggResultSetParameters.setExpressionBeforeAggregate(iExpression);
        aggResultSetParameters.setSystemConfig(this.applicationConfig);
        aggResultSetParameters.setOperatorWindows(newArrayList2);
        return aggResultSetParameters;
    }

    private void setJoinConfig(StreamingConfig streamingConfig) {
        streamingConfig.put("operator.join.inner.left.input.stream.name", this.joinOperator.getLeftStreamName());
        streamingConfig.put("operator.join.inner.right.input.stream.name", this.joinOperator.getRightStreamName());
        streamingConfig.put("operator.selfjoin.inner.left.input.stream.name", DEFAULT_LEFT_SELFJOIN_STREAM_NAME);
        streamingConfig.put("operator.selfjoin.inner.right.input.stream.name", DEFAULT_RIGHT_SELFJOIN_STREAM_NAME);
        streamingConfig.put("operator.join.inner.left.schema", StreamingUtils.serializeSchema(this.leftInputTupleEvent));
        streamingConfig.put("operator.join.inner.right.schema", StreamingUtils.serializeSchema(this.rightInputTupleEvent));
        streamingConfig.put("operator.selfjoin.inner.input.schema", StreamingUtils.serializeSchema(this.leftInputTupleEvent));
    }

    private void setUniDirectionConfig(StreamingConfig streamingConfig) {
        if (this.joinOperator.getUniDirectionIndex() == null) {
            streamingConfig.put("operator.join.inner.unidirectional", false);
            streamingConfig.put("operator.selfjoin.inner.unidirectional", false);
            return;
        }
        switch (this.joinOperator.getUniDirectionIndex()) {
            case LEFT_STREAM:
                streamingConfig.put("operator.join.inner.unidirectional", true);
                streamingConfig.put("operator.join.inner.unidirectional.index", 0);
                streamingConfig.put("operator.selfjoin.inner.unidirectional", true);
                streamingConfig.put("operator.selfjoin.inner.unidirectional.index", 0);
                return;
            case RIGHT_STREAM:
                streamingConfig.put("operator.join.inner.unidirectional", true);
                streamingConfig.put("operator.join.inner.unidirectional.index", 1);
                streamingConfig.put("operator.selfjoin.inner.unidirectional", true);
                streamingConfig.put("operator.selfjoin.inner.unidirectional.index", 1);
                return;
            default:
                streamingConfig.put("operator.join.inner.unidirectional", false);
                streamingConfig.put("operator.selfjoin.inner.unidirectional", false);
                return;
        }
    }

    private JoinFilterProcessor createJoinFilterProcessor() throws ExecutorException {
        if (this.joinOperator.getFilterAfterJoinExpression() == null) {
            return null;
        }
        return new JoinFilterProcessor(new FilterViewExpressionCreator().create(getInputStream(), this.joinOperator.getFilterAfterJoinExpression(), this.applicationConfig));
    }

    private IJoinComposer createJoinComposer() throws ExecutorException {
        switch (this.joinOperator.getJoinType()) {
            case LEFT_OUTER_JOIN:
                return createSideBiJoinComposer(true);
            case RIGHT_OUTER_JOIN:
                return createSideBiJoinComposer(false);
            case FULL_OUTER_JOIN:
                return createFullOutBiJoinComposer();
            case INNER_JOIN:
                return createInnerBiJoinComposer();
            case CROSS_JOIN:
                return createCrossBiJoinComposer();
            default:
                StreamingException executorException = new ExecutorException(ErrorCode.SEMANTICANALYZE_JOIN_UNSPPORTTED_NATURAL_JOIN, new String[0]);
                LOG.error("Unsupport natural join.", executorException);
                throw executorException;
        }
    }

    private IJoinComposer createCrossBiJoinComposer() {
        return new CrossBiJoinComposer(new SimpleEventCollection(this.joinOperator.getLeftStreamName(), this.leftInputTupleEvent), new SimpleEventCollection(this.joinOperator.getRightStreamName(), this.rightInputTupleEvent), isOutputRStreaming());
    }

    private IJoinComposer createInnerBiJoinComposer() throws ExecutorException {
        PropertyValueExpression[][] revertJoinConditions = revertJoinConditions(getJoinConditions());
        return new InnerBiJoinComposer(new IndexedMultiPropertyEventCollection(this.joinOperator.getLeftStreamName(), this.leftInputTupleEvent, revertJoinConditions[0]), new IndexedMultiPropertyEventCollection(this.joinOperator.getRightStreamName(), this.rightInputTupleEvent, revertJoinConditions[1]), isOutputRStreaming());
    }

    private IJoinComposer createFullOutBiJoinComposer() throws ExecutorException {
        PropertyValueExpression[][] revertJoinConditions = revertJoinConditions(getJoinConditions());
        return new FullOutBiJoinComposer(new IndexedMultiPropertyEventCollection(this.joinOperator.getLeftStreamName(), this.leftInputTupleEvent, revertJoinConditions[0]), new IndexedMultiPropertyEventCollection(this.joinOperator.getRightStreamName(), this.rightInputTupleEvent, revertJoinConditions[1]), isOutputRStreaming());
    }

    private IJoinComposer createSideBiJoinComposer(boolean z) throws ExecutorException {
        SideJoinType sideJoinType = z ? SideJoinType.LEFTJOIN : SideJoinType.RIGHTJOIN;
        PropertyValueExpression[][] revertJoinConditions = revertJoinConditions(getJoinConditions());
        return new SideBiJoinComposer(new IndexedMultiPropertyEventCollection(this.joinOperator.getLeftStreamName(), this.leftInputTupleEvent, revertJoinConditions[0]), new IndexedMultiPropertyEventCollection(this.joinOperator.getRightStreamName(), this.rightInputTupleEvent, revertJoinConditions[1]), sideJoinType, isOutputRStreaming());
    }

    private PropertyValueExpression[][] revertJoinConditions(PropertyValueExpression[][] propertyValueExpressionArr) {
        PropertyValueExpression[][] propertyValueExpressionArr2 = new PropertyValueExpression[2][propertyValueExpressionArr.length];
        for (int i = 0; i < propertyValueExpressionArr.length; i++) {
            propertyValueExpressionArr2[0][i] = propertyValueExpressionArr[i][0];
            propertyValueExpressionArr2[1][i] = propertyValueExpressionArr[i][1];
        }
        return propertyValueExpressionArr2;
    }

    private PropertyValueExpression[][] getJoinConditions() throws ExecutorException {
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.addAll(cloneSchema(this.leftInputSchemas, this.leftTransitionIn.getStreamName()));
        newArrayList.addAll(cloneSchema(this.rightInputSchemas, this.rightTransitionIn.getStreamName()));
        IExpression create = new FilterViewExpressionCreator().create(newArrayList, this.joinOperator.getJoinExpression(), this.applicationConfig);
        ArrayList newArrayList2 = Lists.newArrayList();
        parseJoinConditions(create, newArrayList, newArrayList2);
        resetIndexForSelftJoin(newArrayList2);
        return (PropertyValueExpression[][]) newArrayList2.toArray(new PropertyValueExpression[newArrayList2.size()][2]);
    }

    private void resetIndexForSelftJoin(List<PropertyValueExpression[]> list) {
        if (isSelfJoin()) {
            Iterator<PropertyValueExpression[]> it = list.iterator();
            while (it.hasNext()) {
                it.next()[1].setStreamIndex(1);
            }
        }
    }

    private void parseJoinConditions(IExpression iExpression, List<Schema> list, List<PropertyValueExpression[]> list2) throws ExecutorException {
        if (iExpression instanceof RelationExpression) {
            list2.add(parseJoinLeftAndRightConditions((RelationExpression) iExpression));
        } else if (iExpression instanceof LogicExpression) {
            parseJoinConditions(((LogicExpression) iExpression).getLeftExpr(), list, list2);
            parseJoinConditions(((LogicExpression) iExpression).getRightExpr(), list, list2);
        } else {
            StreamingException executorException = new ExecutorException(ErrorCode.SEMANTICANALYZE_UNSPPORTED_JOIN_CONDITION, iExpression.toString());
            LOG.error("Unsupport join condition, support logic expression.", executorException);
            throw executorException;
        }
    }

    private PropertyValueExpression[] parseJoinLeftAndRightConditions(RelationExpression relationExpression) throws ExecutorException {
        if (!relationExpression.getOp().equals(ExpressionOperator.EQUAL)) {
            StreamingException executorException = new ExecutorException(ErrorCode.SEMANTICANALYZE_UNSPPORTED_JOIN_CONDITION, "");
            LOG.error("Unsupport join condition, support equal.", executorException);
            throw executorException;
        }
        PropertyValueExpression[] propertyValueExpressionArr = new PropertyValueExpression[2];
        if (!(relationExpression.getLeftExpr() instanceof PropertyValueExpression)) {
            StreamingException executorException2 = new ExecutorException(ErrorCode.SEMANTICANALYZE_UNSPPORTED_JOIN_CONDITION, "");
            LOG.error("Unsupport join condition, left expression must be property expression.", executorException2);
            throw executorException2;
        }
        propertyValueExpressionArr[0] = (PropertyValueExpression) relationExpression.getLeftExpr();
        if (relationExpression.getRightExpr() instanceof PropertyValueExpression) {
            propertyValueExpressionArr[1] = (PropertyValueExpression) relationExpression.getRightExpr();
            return propertyValueExpressionArr;
        }
        StreamingException executorException3 = new ExecutorException(ErrorCode.SEMANTICANALYZE_UNSPPORTED_JOIN_CONDITION, "");
        LOG.error("Unsupport join condition, right expression must be property expression.", executorException3);
        throw executorException3;
    }

    private List<Schema> cloneSchema(List<Schema> list, String str) throws ExecutorException {
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < list.size(); i++) {
            Schema cloneSchema = list.get(i).cloneSchema();
            cloneSchema.setStreamName(str);
            newArrayList.add(cloneSchema);
        }
        return newArrayList;
    }

    private List<Schema> getInputStream() {
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.addAll(this.leftInputSchemas);
        newArrayList.addAll(this.rightInputSchemas);
        return newArrayList;
    }

    private boolean isSelfJoin() {
        return this.joinOperator.getLeftStreamName().equals(this.joinOperator.getRightStreamName());
    }

    private OutputType getOutputType(Window window, Window window2, UniDiRectionType uniDiRectionType) throws ExecutorException {
        if (!OutputTypeAnalyzer.isRStreamWindow(window) && !OutputTypeAnalyzer.isRStreamWindow(window2)) {
            return OutputType.I;
        }
        switch (uniDiRectionType) {
            case LEFT_STREAM:
                return OutputTypeAnalyzer.isRStreamWindow(window) ? OutputType.R : OutputType.I;
            case RIGHT_STREAM:
                return OutputTypeAnalyzer.isRStreamWindow(window2) ? OutputType.R : OutputType.I;
            default:
                if (OutputTypeAnalyzer.isRStreamWindow(window) && OutputTypeAnalyzer.isRStreamWindow(window2)) {
                    return OutputType.R;
                }
                StreamingException executorException = new ExecutorException(ErrorCode.SEMANTICANALYZE_UNSPPORTED_WINDOW_JOIN, new String[0]);
                LOG.error("RStream Window is not allowed in this join.", executorException);
                throw executorException;
        }
    }

    private boolean isOutputRStreaming() {
        return this.outputType == OutputType.R;
    }
}
