package com.huawei.streaming.operator.functionstream;

import com.huawei.streaming.config.StreamingConfig;
import com.huawei.streaming.datasource.DataSourceContainer;
import com.huawei.streaming.event.TupleEvent;
import com.huawei.streaming.exception.StreamingException;
import com.huawei.streaming.exception.StreamingRuntimeException;
import com.huawei.streaming.expression.IExpression;
import com.huawei.streaming.output.OutputType;
import com.huawei.streaming.process.join.CrossBiJoinComposer;
import com.huawei.streaming.process.join.IJoinComposer;
import com.huawei.streaming.process.join.IJoinSetProcessor;
import com.huawei.streaming.process.join.JoinFilterProcessor;
import com.huawei.streaming.window.EventBasedWindow;
import com.huawei.streaming.window.IWindow;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/streaming/operator/functionstream/DataSourceFunctionOp.class */
public class DataSourceFunctionOp extends JoinFunctionOp {
    private static final long serialVersionUID = 1499564805532907613L;
    private static final Logger LOG = LoggerFactory.getLogger(DataSourceFunctionOp.class);
    private DataSourceContainer dataSource;
    private Map<String, IExpression> cqlExpressions;
    private CrossBiJoinComposer crossJoinComposer;

    public DataSourceFunctionOp(IWindow iWindow, DataSourceContainer dataSourceContainer, Map<String, IExpression> map, IJoinComposer iJoinComposer, JoinFilterProcessor joinFilterProcessor, IJoinSetProcessor iJoinSetProcessor) {
        this(iWindow, dataSourceContainer, map, iJoinComposer, joinFilterProcessor, iJoinSetProcessor, OutputType.I);
    }

    public DataSourceFunctionOp(IWindow iWindow, DataSourceContainer dataSourceContainer, Map<String, IExpression> map, IJoinComposer iJoinComposer, JoinFilterProcessor joinFilterProcessor, IJoinSetProcessor iJoinSetProcessor, OutputType outputType) {
        super(iWindow, new EventBasedWindow(), iJoinComposer, joinFilterProcessor, iJoinSetProcessor, outputType);
        this.dataSource = dataSourceContainer;
        this.cqlExpressions = map;
        if (iJoinComposer instanceof CrossBiJoinComposer) {
            this.crossJoinComposer = (CrossBiJoinComposer) iJoinComposer;
        } else {
            LOG.error("Only cross join composer is allowed in datasource function operator.");
            throw new StreamingRuntimeException("Only cross join composer is allowed in datasource function operator.");
        }
    }

    @Override // com.huawei.streaming.operator.functionstream.JoinFunctionOp, com.huawei.streaming.operator.FunctionOperator, com.huawei.streaming.operator.AbsOperator, com.huawei.streaming.operator.Configurable
    public void setConfig(StreamingConfig streamingConfig) throws StreamingException {
        super.setConfig(streamingConfig);
        this.rightStreamName = this.crossJoinComposer.getRightStream().getStreamName();
    }

    @Override // com.huawei.streaming.operator.functionstream.JoinFunctionOp, com.huawei.streaming.operator.AbsOperator
    public void initialize() throws StreamingException {
        super.initialize();
        this.dataSource.initialize();
    }

    @Override // com.huawei.streaming.operator.functionstream.JoinFunctionOp, com.huawei.streaming.operator.IOperator
    public void execute(String str, TupleEvent tupleEvent) throws StreamingException {
        LOG.debug("Join Function enter.");
        if (str.equals(this.leftStreamName)) {
            evaluateDataSource(tupleEvent);
            this.leftStream.add(tupleEvent);
        } else if (str.equals(this.rightStreamName)) {
            this.rightStream.add(tupleEvent);
        } else {
            LOG.warn("The tuple's streamName is invalid,streamName={}.", tupleEvent.getStreamName());
        }
    }

    @Override // com.huawei.streaming.operator.functionstream.JoinFunctionOp, com.huawei.streaming.operator.IOperator
    public void destroy() throws StreamingException {
        super.destroy();
        this.dataSource.destroy();
    }

    private void evaluateDataSource(TupleEvent tupleEvent) throws StreamingException {
        HashMap hashMap = new HashMap();
        if (null != this.cqlExpressions) {
            for (String str : this.cqlExpressions.keySet()) {
                hashMap.put(str, this.cqlExpressions.get(str).evaluate(tupleEvent));
            }
        }
        List<Object[]> evaluate = this.dataSource.evaluate(hashMap);
        for (int i = 0; i < evaluate.size(); i++) {
            this.rightStream.add(new TupleEvent(this.rightStreamName, this.dataSource.getEventType(), evaluate.get(i)));
        }
        TupleEvent tupleEvent2 = new TupleEvent();
        tupleEvent2.setFlagEvent();
        this.rightStream.add(tupleEvent2);
    }
}
