package com.huawei.streaming.operator.functionstream;

import com.huawei.streaming.event.IEventType;
import com.huawei.streaming.event.TupleEvent;
import com.huawei.streaming.exception.ErrorCode;
import com.huawei.streaming.exception.StreamingException;
import com.huawei.streaming.expression.IExpression;
import com.huawei.streaming.operator.FunctionOperator;
import com.huawei.streaming.output.OutputStorm;
import com.huawei.streaming.output.OutputType;
import com.huawei.streaming.process.SelectSubProcess;
import com.huawei.streaming.processor.SimpleOutputProcessor;
import com.huawei.streaming.view.FilterView;
import com.huawei.streaming.view.FirstLevelStream;
import com.huawei.streaming.view.FunctorView;
import com.huawei.streaming.view.IView;
import com.huawei.streaming.view.ProcessView;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/streaming/operator/functionstream/SplitOp.class */
public class SplitOp extends FunctionOperator {
    private static final long serialVersionUID = 8165331175152418203L;
    private static final Logger LOG = LoggerFactory.getLogger(SplitOp.class);
    private Map<String, IEventType> outputSchemas;
    private Map<String, SelectSubProcess> selectorMap;
    private Map<String, IExpression> filterMap;
    private List<FirstLevelStream> firstStreamList = new ArrayList();

    public SplitOp(Map<String, SelectSubProcess> map, Map<String, IExpression> map2, Map<String, IEventType> map3) {
        this.outputSchemas = null;
        this.selectorMap = null;
        this.filterMap = null;
        this.selectorMap = map;
        this.filterMap = map2;
        this.outputSchemas = map3;
    }

    @Override // com.huawei.streaming.operator.AbsOperator
    public void initialize() throws StreamingException {
        if (this.filterMap == null || this.filterMap.size() == 0) {
            LOG.error("Arguments in '{}' operator is null.", getClass().getName());
            throw new StreamingException(ErrorCode.UNKNOWN_SERVER_COMMON_ERROR, new String[0]);
        }
        for (Map.Entry<String, IExpression> entry : this.filterMap.entrySet()) {
            String key = entry.getKey();
            IExpression value = entry.getValue();
            IView functorView = null == value ? new FunctorView() : new FilterView(value);
            ProcessView processView = new ProcessView();
            OutputStorm outputStorm = new OutputStorm();
            outputStorm.setEmit(getEmitter(key));
            processView.setProcessor(new SimpleOutputProcessor(this.selectorMap.get(key), null, outputStorm, OutputType.I));
            functorView.addView(processView);
            FirstLevelStream firstLevelStream = new FirstLevelStream();
            firstLevelStream.addView(functorView);
            firstLevelStream.start();
            this.firstStreamList.add(firstLevelStream);
        }
    }

    @Override // com.huawei.streaming.operator.IOperator
    public void execute(String str, TupleEvent tupleEvent) throws StreamingException {
        Iterator<FirstLevelStream> it = this.firstStreamList.iterator();
        while (it.hasNext()) {
            it.next().add(tupleEvent);
        }
    }

    @Override // com.huawei.streaming.operator.IOperator
    public void destroy() throws StreamingException {
        Iterator<FirstLevelStream> it = this.firstStreamList.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
    }

    public Map<String, IEventType> getOutputSchemaMap() {
        return this.outputSchemas;
    }
}
