package com.huawei.streaming.operator;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.huawei.streaming.config.StreamingConfig;
import com.huawei.streaming.event.IEventType;
import com.huawei.streaming.event.TupleEvent;
import com.huawei.streaming.event.TupleEventType;
import com.huawei.streaming.exception.ErrorCode;
import com.huawei.streaming.exception.StreamSerDeException;
import com.huawei.streaming.exception.StreamingException;
import com.huawei.streaming.serde.StreamSerDe;
import com.huawei.streaming.util.StreamingUtils;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/streaming/operator/OutputOperator.class */
public final class OutputOperator extends AbsOperator {
    private static final long serialVersionUID = 6316041458965215046L;
    private static final Logger LOG = LoggerFactory.getLogger(OutputOperator.class);
    private StreamSerDe serde;
    private List<String> inputStreams = Lists.newArrayList();
    private Map<String, IEventType> inputSchemas = Maps.newHashMap();
    private IOutputStreamOperator outputStream;

    public void setOutputStreamOperator(IOutputStreamOperator iOutputStreamOperator) {
        this.outputStream = iOutputStreamOperator;
    }

    public IOutputStreamOperator getOutputStreamOperator() {
        return this.outputStream;
    }

    @Override // com.huawei.streaming.operator.AbsOperator, com.huawei.streaming.operator.Configurable
    public void setConfig(StreamingConfig streamingConfig) throws StreamingException {
        super.setConfig(streamingConfig);
        TupleEventType deSerializeSchema = StreamingUtils.deSerializeSchema((String) streamingConfig.get(StreamingConfig.STREAMING_INNER_INPUT_SCHEMA));
        addInputStream((String) streamingConfig.get(StreamingConfig.STREAMING_INNER_INPUT_STREAM_NAME));
        addInputSchema((String) streamingConfig.get(StreamingConfig.STREAMING_INNER_INPUT_STREAM_NAME), deSerializeSchema);
        if (!streamingConfig.containsKey(StreamingConfig.STREAMING_INNER_INPUT_SCHEMA) || null == this.serde) {
            return;
        }
        this.serde.setSchema(deSerializeSchema);
    }

    @Override // com.huawei.streaming.operator.AbsOperator
    public final void initialize() throws StreamingException {
        initializeSerDe();
        this.outputStream.setSerDe(getSerDe());
        this.outputStream.initialize();
    }

    public StreamSerDe getSerDe() {
        return this.serde;
    }

    public void setSerDe(StreamSerDe streamSerDe) {
        this.serde = streamSerDe;
    }

    @Override // com.huawei.streaming.operator.AbsOperator
    public final void setOutputStream(String str) {
    }

    @Override // com.huawei.streaming.operator.IRichOperator
    public final String getOutputStream() {
        return null;
    }

    @Override // com.huawei.streaming.operator.AbsOperator
    public final void setOutputSchema(IEventType iEventType) {
    }

    @Override // com.huawei.streaming.operator.IRichOperator
    public final IEventType getOutputSchema() {
        return null;
    }

    @Override // com.huawei.streaming.operator.AbsOperator
    public void setInputStream(List<String> list) {
        this.inputStreams = list;
    }

    @Override // com.huawei.streaming.operator.IRichOperator
    public List<String> getInputStream() {
        return this.inputStreams;
    }

    @Override // com.huawei.streaming.operator.AbsOperator
    public void setInputSchema(Map<String, IEventType> map) {
        this.inputSchemas = map;
    }

    @Override // com.huawei.streaming.operator.IRichOperator
    public Map<String, IEventType> getInputSchema() {
        return this.inputSchemas;
    }

    @Override // com.huawei.streaming.operator.IOperator
    public void execute(String str, TupleEvent tupleEvent) throws StreamingException {
        this.outputStream.execute(str, tupleEvent);
    }

    @Override // com.huawei.streaming.operator.IOperator
    public void destroy() throws StreamingException {
        this.outputStream.destroy();
    }

    public void addInputStream(String str) {
        if (StringUtils.isEmpty(str) || this.inputStreams.contains(str)) {
            return;
        }
        this.inputStreams.add(str);
    }

    public void addInputSchema(String str, IEventType iEventType) {
        if (iEventType != null) {
            this.inputSchemas.put(str, iEventType);
        }
    }

    private void initializeSerDe() throws StreamingException {
        if (getSerDe() == null) {
            StreamingException streamingException = new StreamingException(ErrorCode.SEMANTICANALYZE_UNKNOWN_SERDE, new String[0]);
            LOG.error(ErrorCode.SEMANTICANALYZE_UNKNOWN_SERDE.getFullMessage(new String[0]));
            throw streamingException;
        }
        try {
            getSerDe().initialize();
        } catch (StreamSerDeException e) {
            throw StreamingException.wrapException(e);
        }
    }
}
