package com.huawei.streaming.cql.executor.operatorinfocreater;

import com.huawei.streaming.api.Application;
import com.huawei.streaming.api.opereators.InputStreamOperator;
import com.huawei.streaming.api.opereators.Operator;
import com.huawei.streaming.config.StreamingConfig;
import com.huawei.streaming.cql.CQLUtils;
import com.huawei.streaming.cql.exception.ExecutorException;
import com.huawei.streaming.event.EventTypeMng;
import com.huawei.streaming.exception.ErrorCode;
import com.huawei.streaming.exception.StormRuntimeException;
import com.huawei.streaming.exception.StreamingException;
import com.huawei.streaming.operator.AbsOperator;
import com.huawei.streaming.operator.IInputStreamOperator;
import com.huawei.streaming.operator.InputOperator;
import com.huawei.streaming.serde.StreamSerDe;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/streaming/cql/executor/operatorinfocreater/InputInfoCreator.class */
public class InputInfoCreator implements OperatorInfoCreator {
    private static final Logger LOG = LoggerFactory.getLogger(InputInfoCreator.class);

    @Override // com.huawei.streaming.cql.executor.operatorinfocreater.OperatorInfoCreator
    public AbsOperator createInstance(Application application, Operator operator, EventTypeMng eventTypeMng, Map<String, String> map) throws StreamingException {
        if (!(operator instanceof InputStreamOperator)) {
            LOG.error("CreateInstance unsupport inputStream operator,the type is not InputStreamOperator");
            throw new StormRuntimeException("CreateInstance unsupport inputStream operator,the type is not InputStreamOperator");
        }
        InputStreamOperator inputStreamOperator = (InputStreamOperator) operator;
        StreamingConfig streamingConfig = new StreamingConfig();
        streamingConfig.putAll(map);
        if (operator.getArgs() != null) {
            streamingConfig.putAll(operator.getArgs());
        }
        StreamSerDe createDeserClassInstance = createDeserClassInstance(inputStreamOperator, streamingConfig);
        IInputStreamOperator createSourceReaderInstance = createSourceReaderInstance(inputStreamOperator.getRecordReaderClassName());
        createSourceReaderInstance.setConfig(streamingConfig);
        InputOperator inputOperator = new InputOperator();
        inputOperator.setSerDe(createDeserClassInstance);
        inputOperator.setInputStreamOperator(createSourceReaderInstance);
        return OperatorInfoCreatorFactory.buildStreamOperator(operator, inputOperator);
    }

    private StreamSerDe createDeserClassInstance(InputStreamOperator inputStreamOperator, StreamingConfig streamingConfig) throws ExecutorException {
        String deserializerClassName = inputStreamOperator.getDeserializerClassName();
        if (deserializerClassName == null) {
            return null;
        }
        try {
            StreamSerDe streamSerDe = (StreamSerDe) Class.forName(deserializerClassName, true, CQLUtils.getClassLoader()).newInstance();
            try {
                streamSerDe.setConfig(streamingConfig);
                return streamSerDe;
            } catch (StreamingException e) {
                throw ExecutorException.wrapStreamingException(e);
            }
        } catch (ReflectiveOperationException e2) {
            StreamingException executorException = new ExecutorException(ErrorCode.SEMANTICANALYZE_UNKOWN_CLASS, deserializerClassName);
            LOG.error("Failed to create Deser instance.", executorException);
            throw executorException;
        }
    }

    private IInputStreamOperator createSourceReaderInstance(String str) throws ExecutorException {
        try {
            Object newInstance = Class.forName(str, true, CQLUtils.getClassLoader()).newInstance();
            if (newInstance instanceof IInputStreamOperator) {
                return (IInputStreamOperator) newInstance;
            }
            StreamingException executorException = new ExecutorException(ErrorCode.SEMANTICANALYZE_UNMATCH_OPERATOR, str);
            LOG.error("The '{}' operator type does not match.", str);
            throw executorException;
        } catch (Exception e) {
            StreamingException executorException2 = new ExecutorException(ErrorCode.SEMANTICANALYZE_UNKOWN_CLASS, str);
            LOG.error("Can't find source reader class.'", executorException2);
            throw executorException2;
        }
    }
}
