package com.huawei.streaming.operator.functionstream;

import com.huawei.streaming.config.StreamingConfig;
import com.huawei.streaming.event.TupleEvent;
import com.huawei.streaming.exception.StreamingException;
import com.huawei.streaming.operator.FunctionOperator;
import com.huawei.streaming.output.OutputStorm;
import com.huawei.streaming.output.OutputType;
import com.huawei.streaming.process.agg.resultmerge.IAggResultSetMerge;
import com.huawei.streaming.process.join.IJoinComposer;
import com.huawei.streaming.process.join.IJoinSetProcessor;
import com.huawei.streaming.process.join.JoinFilterProcessor;
import com.huawei.streaming.processor.JoinProcessor;
import com.huawei.streaming.util.StreamingUtils;
import com.huawei.streaming.view.FirstLevelStream;
import com.huawei.streaming.view.JoinProcessView;
import com.huawei.streaming.window.IWindow;
import com.huawei.streaming.window.LengthSlideWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/streaming/operator/functionstream/SelfJoinFunctionOp.class */
public class SelfJoinFunctionOp extends FunctionOperator {
    private static final long serialVersionUID = 8825931094864211974L;
    private static final Logger LOG = LoggerFactory.getLogger(SelfJoinFunctionOp.class);
    private IWindow leftWindow;
    private IWindow rightWindow;
    private IJoinComposer joinComposer;
    private JoinFilterProcessor filterProcessor;
    private IJoinSetProcessor setProcessor;
    private FirstLevelStream leftStream;
    private FirstLevelStream rightStream;
    private String leftStreamName;
    private String rightStreamName;
    private boolean unidirectional;
    private int uniStreamIndex;
    private OutputStorm outputStorm;
    private OutputType outType;

    public SelfJoinFunctionOp(IWindow iWindow, IWindow iWindow2, IJoinComposer iJoinComposer, JoinFilterProcessor joinFilterProcessor, IAggResultSetMerge iAggResultSetMerge) {
        this.leftStream = new FirstLevelStream();
        this.rightStream = new FirstLevelStream();
        this.unidirectional = true;
        this.uniStreamIndex = 0;
        this.outType = OutputType.I;
        if (iWindow == null) {
            this.leftWindow = new LengthSlideWindow(1);
        } else {
            this.leftWindow = iWindow;
        }
        if (iWindow2 == null) {
            this.rightWindow = new LengthSlideWindow(1);
        } else {
            this.rightWindow = iWindow2;
        }
        this.joinComposer = iJoinComposer;
        this.filterProcessor = joinFilterProcessor;
        this.setProcessor = iAggResultSetMerge;
    }

    public SelfJoinFunctionOp(IWindow iWindow, IWindow iWindow2, IJoinComposer iJoinComposer, JoinFilterProcessor joinFilterProcessor, IAggResultSetMerge iAggResultSetMerge, OutputType outputType) {
        this(iWindow, iWindow2, iJoinComposer, joinFilterProcessor, iAggResultSetMerge);
        if (outputType != null) {
            this.outType = outputType;
        }
    }

    @Override // com.huawei.streaming.operator.FunctionOperator, com.huawei.streaming.operator.AbsOperator, com.huawei.streaming.operator.Configurable
    public void setConfig(StreamingConfig streamingConfig) throws StreamingException {
        super.setConfig(streamingConfig);
        this.leftStreamName = (String) streamingConfig.get(StreamingConfig.OPERATOR_SELFJOIN_INNER_LEFT_INPUT_STREAM_NAME);
        this.rightStreamName = (String) streamingConfig.get(StreamingConfig.OPERATOR_SELFJOIN_INNER_RIGHT_INPUT_STREAM_NAME);
        if (streamingConfig.containsKey(StreamingConfig.OPERATOR_SELFJOIN_INNER_UNIDIRECTIONAL)) {
            this.unidirectional = ((Boolean) streamingConfig.get(StreamingConfig.OPERATOR_SELFJOIN_INNER_UNIDIRECTIONAL)).booleanValue();
            if (streamingConfig.containsKey(StreamingConfig.OPERATOR_SELFJOIN_INNER_UNIDIRECTIONAL_INDEX)) {
                this.uniStreamIndex = ((Integer) streamingConfig.get(StreamingConfig.OPERATOR_SELFJOIN_INNER_UNIDIRECTIONAL_INDEX)).intValue();
            }
        }
        addInputSchema(this.leftStreamName, StreamingUtils.deSerializeSchema((String) streamingConfig.get(StreamingConfig.OPERATOR_SELFJOIN_INNER_INPUT_SCHEMA)));
        addInputSchema(this.rightStreamName, StreamingUtils.deSerializeSchema((String) streamingConfig.get(StreamingConfig.OPERATOR_SELFJOIN_INNER_INPUT_SCHEMA)));
    }

    @Override // com.huawei.streaming.operator.AbsOperator
    public void initialize() throws StreamingException {
        JoinProcessView joinProcessView = new JoinProcessView();
        this.outputStorm = new OutputStorm(this.outType);
        this.outputStorm.setEmit(getEmitter());
        JoinProcessor joinProcessor = new JoinProcessor(this.joinComposer, new String[]{this.leftStreamName, this.rightStreamName}, this.filterProcessor, this.setProcessor, this.outputStorm, this.outType);
        joinProcessor.setSelfJoin(true);
        if (this.unidirectional) {
            joinProcessor.setUnidirectional(Boolean.valueOf(this.unidirectional));
            joinProcessor.setUniStreamIndex(this.uniStreamIndex);
        }
        joinProcessView.setProcessor(joinProcessor);
        this.leftWindow.addView(joinProcessView);
        this.rightWindow.addView(joinProcessView);
        this.leftStream.addView(this.leftWindow);
        this.rightStream.addView(this.rightWindow);
        this.leftStream.start();
        this.rightStream.start();
    }

    @Override // com.huawei.streaming.operator.IOperator
    public void execute(String str, TupleEvent tupleEvent) throws StreamingException {
        LOG.debug("Join Function enter.");
        if (!this.unidirectional) {
            LOG.debug("SelfJoin operator support unidirection only.");
            return;
        }
        if (this.uniStreamIndex == 0) {
            tupleEvent.setStreamName(this.rightStreamName);
            this.rightStream.add(tupleEvent);
            tupleEvent.setStreamName(this.leftStreamName);
            this.leftStream.add(tupleEvent);
            return;
        }
        tupleEvent.setStreamName(this.leftStreamName);
        this.leftStream.add(tupleEvent);
        tupleEvent.setStreamName(this.rightStreamName);
        this.rightStream.add(tupleEvent);
    }

    @Override // com.huawei.streaming.operator.IOperator
    public void destroy() throws StreamingException {
        this.leftStream.stop();
        this.rightStream.stop();
    }
}
