package com.huawei.streaming.cql.executor.operatorinfocreater;

import com.huawei.streaming.api.Application;
import com.huawei.streaming.api.opereators.FunctionStreamOperator;
import com.huawei.streaming.api.opereators.Operator;
import com.huawei.streaming.api.opereators.OperatorTransition;
import com.huawei.streaming.api.streams.Column;
import com.huawei.streaming.api.streams.Schema;
import com.huawei.streaming.config.StreamingConfig;
import com.huawei.streaming.cql.CQLUtils;
import com.huawei.streaming.cql.exception.ExecutorException;
import com.huawei.streaming.event.EventTypeMng;
import com.huawei.streaming.exception.ErrorCode;
import com.huawei.streaming.exception.StreamingException;
import com.huawei.streaming.operator.AbsOperator;
import com.huawei.streaming.operator.IFunctionStreamOperator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/streaming/cql/executor/operatorinfocreater/FunctionStreamInfoCreator.class */
public class FunctionStreamInfoCreator implements OperatorInfoCreator {
    private static final Logger LOG = LoggerFactory.getLogger(FunctionStreamInfoCreator.class);
    private FunctionStreamOperator streamOperator;
    private OperatorTransition transitionIn = null;
    private OperatorTransition transitionOut = null;
    private List<Schema> inputSchemas;
    private List<Schema> outputSchemas;

    @Override // com.huawei.streaming.cql.executor.operatorinfocreater.OperatorInfoCreator
    public AbsOperator createInstance(Application application, Operator operator, EventTypeMng eventTypeMng, Map<String, String> map) throws StreamingException {
        prepare(application, operator);
        checkSchema();
        IFunctionStreamOperator createOperatorInstance = createOperatorInstance();
        setConfig(operator, map, createOperatorInstance);
        com.huawei.streaming.operator.FunctionStreamOperator functionStreamOperator = new com.huawei.streaming.operator.FunctionStreamOperator();
        functionStreamOperator.setInputStreamOperator(createOperatorInstance);
        return OperatorInfoCreatorFactory.buildStreamOperator(operator, functionStreamOperator);
    }

    private void checkSchema() throws StreamingException {
        validateSchemaSize();
        validateColumnType();
    }

    private void validateColumnType() throws StreamingException {
        if (!checkSchemaColumnType(this.inputSchemas.get(0), this.streamOperator.getInputSchema())) {
            StreamingException streamingException = new StreamingException(ErrorCode.SEMANTICANALYZE_INVALID_INPUTSCHEMA, new String[0]);
            LOG.error("Invalid input schema in user defined operator.");
            throw streamingException;
        }
        if (checkSchemaColumnType(this.outputSchemas.get(0), this.streamOperator.getOutputSchema())) {
            return;
        }
        StreamingException streamingException2 = new StreamingException(ErrorCode.SEMANTICANALYZE_INVALID_OUTPUTSCHEMA, new String[0]);
        LOG.error("Invalid output schema in user defined operator.");
        throw streamingException2;
    }

    private void validateSchemaSize() throws StreamingException {
        if (this.inputSchemas.size() != 1) {
            StreamingException streamingException = new StreamingException(ErrorCode.SEMANTICANALYZE_OVER_INPUTSCHEMA, new String[0]);
            LOG.error("Only one input schema is allowed in user defined operator.");
            throw streamingException;
        }
        if (this.outputSchemas.size() != 1) {
            StreamingException streamingException2 = new StreamingException(ErrorCode.SEMANTICANALYZE_OVER_OUTPUTSCHEMA, new String[0]);
            LOG.error("Only one output schema is allowed in user defined operator.");
            throw streamingException2;
        }
    }

    private void prepare(Application application, Operator operator) throws ExecutorException {
        this.streamOperator = (FunctionStreamOperator) operator;
        this.transitionIn = OperatorInfoCreatorFactory.getTransitionIn(application, operator, null);
        this.transitionOut = OperatorInfoCreatorFactory.getTransitionOut(application, operator);
        this.inputSchemas = OperatorInfoCreatorFactory.getSchemasByTransition(application, this.transitionIn);
        this.outputSchemas = OperatorInfoCreatorFactory.getSchemasByTransition(application, this.transitionOut);
    }

    private void setConfig(Operator operator, Map<String, String> map, IFunctionStreamOperator iFunctionStreamOperator) throws StreamingException {
        StreamingConfig streamingConfig = new StreamingConfig();
        streamingConfig.putAll(map);
        if (operator.getArgs() != null) {
            streamingConfig.putAll(operator.getArgs());
        }
        iFunctionStreamOperator.setConfig(streamingConfig);
    }

    private IFunctionStreamOperator createOperatorInstance() throws ExecutorException {
        String operatorClass = this.streamOperator.getOperatorClass();
        try {
            return (IFunctionStreamOperator) Class.forName(operatorClass, true, CQLUtils.getClassLoader()).newInstance();
        } catch (ReflectiveOperationException e) {
            StreamingException executorException = new ExecutorException(ErrorCode.SEMANTICANALYZE_UNKOWN_CLASS, operatorClass);
            LOG.error("Failed to create operator instance.", executorException);
            throw executorException;
        }
    }

    private boolean checkSchemaColumnType(Schema schema, Schema schema2) {
        if (schema == null || schema2 == null) {
            return false;
        }
        List<Column> cols = schema.getCols();
        List<Column> cols2 = schema2.getCols();
        if (cols.size() != cols2.size()) {
            return false;
        }
        for (int i = 0; i < cols.size(); i++) {
            if (!cols.get(i).getType().equals(cols2.get(i).getType())) {
                return false;
            }
        }
        return true;
    }
}
