package com.huawei.streaming.operator.functionstream;

import com.huawei.streaming.event.IEventType;
import com.huawei.streaming.event.TupleEvent;
import com.huawei.streaming.exception.StreamingException;
import com.huawei.streaming.expression.IExpression;
import com.huawei.streaming.operator.FunctionOperator;
import com.huawei.streaming.output.OutputStorm;
import com.huawei.streaming.output.OutputType;
import com.huawei.streaming.process.SelectSubProcess;
import com.huawei.streaming.processor.SimpleOutputProcessor;
import com.huawei.streaming.view.FilterView;
import com.huawei.streaming.view.FirstLevelStream;
import com.huawei.streaming.view.FunctorView;
import com.huawei.streaming.view.ProcessView;
import com.huawei.streaming.view.ViewImpl;

/* loaded from: input_file:com/huawei/streaming/operator/functionstream/FunctorOp.class */
public class FunctorOp extends FunctionOperator {
    private static final long serialVersionUID = -9115269739974301889L;
    private SelectSubProcess selectorProcess;
    private IExpression filterExpression;
    private FirstLevelStream firststream = new FirstLevelStream();
    private OutputStorm outputStorm;

    public FunctorOp(SelectSubProcess selectSubProcess, IEventType iEventType, IExpression iExpression) {
        this.selectorProcess = selectSubProcess;
        this.filterExpression = iExpression;
    }

    @Override // com.huawei.streaming.operator.AbsOperator
    public void initialize() throws StreamingException {
        ViewImpl filterView = this.filterExpression != null ? new FilterView(this.filterExpression) : new FunctorView();
        ProcessView processView = new ProcessView();
        this.outputStorm = new OutputStorm();
        this.outputStorm.setEmit(getEmitter());
        processView.setProcessor(new SimpleOutputProcessor(this.selectorProcess, null, this.outputStorm, OutputType.I));
        filterView.addView(processView);
        this.firststream.addView(filterView);
        this.firststream.start();
    }

    @Override // com.huawei.streaming.operator.IOperator
    public void execute(String str, TupleEvent tupleEvent) throws StreamingException {
        this.firststream.add(tupleEvent);
    }

    @Override // com.huawei.streaming.operator.IOperator
    public void destroy() throws StreamingException {
        this.firststream.stop();
    }
}
