package com.huawei.streaming.operator.functionstream;

import com.huawei.streaming.event.TupleEvent;
import com.huawei.streaming.exception.StreamingException;
import com.huawei.streaming.operator.FunctionOperator;
import com.huawei.streaming.output.OutputStorm;
import com.huawei.streaming.output.OutputType;
import com.huawei.streaming.process.agg.resultmerge.IAggResultSetMerge;
import com.huawei.streaming.process.agg.resultmerge.IResultSetMerge;
import com.huawei.streaming.processor.AggregateProcessor;
import com.huawei.streaming.view.FilterView;
import com.huawei.streaming.view.FirstLevelStream;
import com.huawei.streaming.view.MergeView;
import com.huawei.streaming.view.ProcessView;
import com.huawei.streaming.window.IWindow;
import com.huawei.streaming.window.group.IGroupWindow;

/* loaded from: input_file:com/huawei/streaming/operator/functionstream/AggFunctionOp.class */
public class AggFunctionOp extends FunctionOperator {
    private static final long serialVersionUID = 6844442291453119711L;
    private FirstLevelStream firstStream;
    private IWindow window;
    private IResultSetMerge resultMerge;
    private FilterView filterView;
    private OutputStorm outputStorm;
    private OutputType outType;

    public AggFunctionOp(IWindow iWindow, FilterView filterView, IAggResultSetMerge iAggResultSetMerge) {
        this.firstStream = new FirstLevelStream();
        this.outType = OutputType.I;
        if (iAggResultSetMerge == null) {
            throw new IllegalArgumentException("Aggregate Result Process is Null.");
        }
        this.window = iWindow;
        this.filterView = filterView;
        this.resultMerge = iAggResultSetMerge;
    }

    public AggFunctionOp(IWindow iWindow, FilterView filterView, IAggResultSetMerge iAggResultSetMerge, OutputType outputType) {
        this(iWindow, filterView, iAggResultSetMerge);
        if (outputType != null) {
            this.outType = outputType;
        }
    }

    @Override // com.huawei.streaming.operator.AbsOperator
    public void initialize() throws StreamingException {
        this.outputStorm = new OutputStorm(this.outType);
        this.outputStorm.setEmit(getEmitter());
        AggregateProcessor aggregateProcessor = new AggregateProcessor(this.resultMerge, this.outputStorm, this.outType);
        ProcessView processView = new ProcessView();
        processView.setProcessor(aggregateProcessor);
        if (this.window != null) {
            if (this.window instanceof IGroupWindow) {
                MergeView mergeView = new MergeView();
                mergeView.addView(processView);
                if (null != this.filterView) {
                    this.window.addView(this.filterView);
                    this.filterView.addView(mergeView);
                } else {
                    this.window.addView(mergeView);
                }
            } else if (null != this.filterView) {
                this.window.addView(this.filterView);
                this.filterView.addView(processView);
            } else {
                this.window.addView(processView);
            }
            this.firstStream.addView(this.window);
        } else if (null != this.filterView) {
            this.firstStream.addView(this.filterView);
            this.filterView.addView(processView);
        } else {
            this.firstStream.addView(processView);
        }
        this.firstStream.start();
    }

    @Override // com.huawei.streaming.operator.IOperator
    public void execute(String str, TupleEvent tupleEvent) throws StreamingException {
        this.firstStream.add(tupleEvent);
    }

    @Override // com.huawei.streaming.operator.IOperator
    public void destroy() throws StreamingException {
        this.firstStream.stop();
    }
}
