package com.huawei.streaming.cql.executor;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.huawei.streaming.api.AnnotationUtils;
import com.huawei.streaming.api.opereators.InnerInputSourceOperator;
import com.huawei.streaming.api.opereators.InnerOutputSourceOperator;
import com.huawei.streaming.api.opereators.InputStreamOperator;
import com.huawei.streaming.api.opereators.Operator;
import com.huawei.streaming.api.opereators.OperatorTransition;
import com.huawei.streaming.api.opereators.OutputStreamOperator;
import com.huawei.streaming.api.opereators.serdes.UserDefinedSerDeAPI;
import com.huawei.streaming.api.streams.Schema;
import com.huawei.streaming.application.Application;
import com.huawei.streaming.application.ApplicationFactory;
import com.huawei.streaming.application.DistributeType;
import com.huawei.streaming.config.StreamingConfig;
import com.huawei.streaming.cql.CQLUtils;
import com.huawei.streaming.cql.exception.ApplicationBuildException;
import com.huawei.streaming.cql.exception.ExecutorException;
import com.huawei.streaming.cql.executor.operatorinfocreater.OperatorInfoCreatorFactory;
import com.huawei.streaming.cql.mapping.InputOutputOperatorMapping;
import com.huawei.streaming.cql.semanticanalyzer.BaseAnalyzer;
import com.huawei.streaming.event.Attribute;
import com.huawei.streaming.event.TupleEventType;
import com.huawei.streaming.exception.ErrorCode;
import com.huawei.streaming.exception.StreamingException;
import com.huawei.streaming.operator.AbsOperator;
import com.huawei.streaming.operator.IRichOperator;
import com.huawei.streaming.operator.InputOperator;
import com.huawei.streaming.operator.OutputOperator;
import com.huawei.streaming.serde.StreamSerDe;
import com.huawei.streaming.util.StreamingUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/streaming/cql/executor/ExecutorPlanGenerator.class */
public class ExecutorPlanGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutorPlanGenerator.class);
    private Application executorApp;
    private Map<String, String> systemConfig = Maps.newHashMap();
    private com.huawei.streaming.api.Application apiApplication = null;

    public Application generate(com.huawei.streaming.api.Application application) throws ExecutorException {
        LOG.info("start to generator executor application for app {}", application.getApplicationId());
        this.apiApplication = application;
        createEmptyApplication(application.getApplicationId());
        parseUserDefineds(application);
        parseSchemas();
        parseOperators();
        return this.executorApp;
    }

    private void parseUserDefineds(com.huawei.streaming.api.Application application) {
        if (application.getConfs() != null) {
            this.systemConfig.putAll(application.getConfs());
        }
    }

    private void parseOperators() throws ExecutorException {
        Map<String, AbsOperator> createOperatorInfos = createOperatorInfos(formatOperators());
        combineOperators(createOperatorInfos);
        Iterator<Map.Entry<String, AbsOperator>> it = createOperatorInfos.entrySet().iterator();
        while (it.hasNext()) {
            IRichOperator value = it.next().getValue();
            if (value instanceof InputOperator) {
                this.executorApp.addInputStream(value);
            } else if (value instanceof OutputOperator) {
                this.executorApp.addOutputStream(value);
            } else {
                this.executorApp.addFunctionStream(value);
            }
        }
    }

    private Map<String, Operator> formatOperators() throws ExecutorException {
        return addOperatorsToNewList(formatInputSourceOperators(), formatOutputSourceOperators());
    }

    private Map<String, Operator> addOperatorsToNewList(Map<String, Operator> map, Map<String, Operator> map2) {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.putAll(map);
        newHashMap.putAll(map2);
        for (Operator operator : this.apiApplication.getOperators()) {
            if (!newHashMap.containsKey(operator.getId())) {
                newHashMap.put(operator.getId(), operator);
            }
        }
        return newHashMap;
    }

    private Map<String, Operator> formatInputSourceOperators() throws ExecutorException {
        HashMap newHashMap = Maps.newHashMap();
        for (Operator operator : this.apiApplication.getOperators()) {
            if (operator instanceof InputStreamOperator) {
                newHashMap.put(operator.getId(), operator);
            } else if (operator instanceof InnerInputSourceOperator) {
                InputStreamOperator formatInnerInputStream = formatInnerInputStream(operator);
                newHashMap.put(formatInnerInputStream.getId(), formatInnerInputStream);
            }
        }
        return newHashMap;
    }

    private InputStreamOperator formatInnerInputStream(Operator operator) throws ApplicationBuildException {
        InputStreamOperator inputStreamOperator = new InputStreamOperator(operator.getId(), operator.getParallelNumber());
        inputStreamOperator.setArgs(operator.getArgs());
        Class<? extends StreamSerDe> streamSerDeAnnoationOverClass = AnnotationUtils.getStreamSerDeAnnoationOverClass(((InnerInputSourceOperator) operator).getDeserializer().getClass());
        String platformOperatorByAPI = InputOutputOperatorMapping.getPlatformOperatorByAPI(operator.getClass().getName());
        if (streamSerDeAnnoationOverClass != null) {
            inputStreamOperator.setDeserializerClassName(streamSerDeAnnoationOverClass.getName());
        } else {
            inputStreamOperator.setDeserializerClassName(((UserDefinedSerDeAPI) ((InnerInputSourceOperator) operator).getDeserializer()).getSerDeClazz().getName());
        }
        inputStreamOperator.setRecordReaderClassName(platformOperatorByAPI);
        if (inputStreamOperator.getArgs() == null) {
            inputStreamOperator.setArgs(new TreeMap());
        }
        inputStreamOperator.getArgs().putAll(this.systemConfig);
        TreeMap<String, String> annotationsToConfig = AnnotationUtils.getAnnotationsToConfig(operator);
        if (annotationsToConfig != null && !annotationsToConfig.isEmpty()) {
            inputStreamOperator.getArgs().putAll(annotationsToConfig);
        }
        TreeMap<String, String> annotationsToConfig2 = AnnotationUtils.getAnnotationsToConfig(((InnerInputSourceOperator) operator).getDeserializer());
        if (annotationsToConfig2 != null && !annotationsToConfig2.isEmpty()) {
            inputStreamOperator.getArgs().putAll(annotationsToConfig2);
        }
        return inputStreamOperator;
    }

    private Map<String, Operator> formatOutputSourceOperators() throws ApplicationBuildException {
        HashMap newHashMap = Maps.newHashMap();
        for (Operator operator : this.apiApplication.getOperators()) {
            if (operator instanceof OutputStreamOperator) {
                newHashMap.put(operator.getId(), operator);
            } else if (operator instanceof InnerOutputSourceOperator) {
                OutputStreamOperator formatInnerOutputStream = formatInnerOutputStream(operator);
                newHashMap.put(formatInnerOutputStream.getId(), formatInnerOutputStream);
            }
        }
        return newHashMap;
    }

    private OutputStreamOperator formatInnerOutputStream(Operator operator) throws ApplicationBuildException {
        OutputStreamOperator outputStreamOperator = new OutputStreamOperator(operator.getId(), operator.getParallelNumber());
        outputStreamOperator.setArgs(operator.getArgs());
        Class<? extends StreamSerDe> streamSerDeAnnoationOverClass = AnnotationUtils.getStreamSerDeAnnoationOverClass(((InnerOutputSourceOperator) operator).getSerializer().getClass());
        String platformOperatorByAPI = InputOutputOperatorMapping.getPlatformOperatorByAPI(operator.getClass().getName());
        if (streamSerDeAnnoationOverClass != null) {
            outputStreamOperator.setSerializerClassName(streamSerDeAnnoationOverClass.getName());
        } else {
            outputStreamOperator.setSerializerClassName(((UserDefinedSerDeAPI) ((InnerOutputSourceOperator) operator).getSerializer()).getSerDeClazz().getName());
        }
        outputStreamOperator.setRecordWriterClassName(platformOperatorByAPI);
        if (outputStreamOperator.getArgs() == null) {
            outputStreamOperator.setArgs(new TreeMap());
        }
        outputStreamOperator.getArgs().putAll(this.systemConfig);
        TreeMap<String, String> annotationsToConfig = AnnotationUtils.getAnnotationsToConfig(operator);
        if (annotationsToConfig != null && !annotationsToConfig.isEmpty()) {
            outputStreamOperator.getArgs().putAll(annotationsToConfig);
        }
        TreeMap<String, String> annotationsToConfig2 = AnnotationUtils.getAnnotationsToConfig(((InnerOutputSourceOperator) operator).getSerializer());
        if (annotationsToConfig2 != null && !annotationsToConfig2.isEmpty()) {
            outputStreamOperator.getArgs().putAll(annotationsToConfig2);
        }
        return outputStreamOperator;
    }

    private void combineOperators(Map<String, AbsOperator> map) throws ExecutorException {
        if (this.apiApplication.getOpTransition() == null) {
            return;
        }
        for (OperatorTransition operatorTransition : this.apiApplication.getOpTransition()) {
            String fromOperatorId = operatorTransition.getFromOperatorId();
            String toOperatorId = operatorTransition.getToOperatorId();
            String streamName = operatorTransition.getStreamName();
            DistributeType distributedType = operatorTransition.getDistributedType();
            String distributedFields = operatorTransition.getDistributedFields();
            String schemaName = operatorTransition.getSchemaName();
            if (!StringUtils.isEmpty(distributedFields)) {
                distributedFields = distributedFields.toLowerCase(Locale.US);
            }
            String removeStreamName = ExecutorUtils.removeStreamName(distributedFields);
            TupleEventType tupleEventType = (TupleEventType) this.executorApp.getEventType(schemaName);
            combineFromTransition(map, fromOperatorId, streamName, tupleEventType);
            combineToTransition(map, toOperatorId, streamName, distributedType, removeStreamName, tupleEventType);
        }
    }

    private void combineToTransition(Map<String, AbsOperator> map, String str, String str2, DistributeType distributeType, String str3, TupleEventType tupleEventType) throws ExecutorException {
        AbsOperator absOperator = map.get(str);
        if (StringUtils.isEmpty(str3)) {
            absOperator.setGroupInfo(str2, distributeType, (String[]) null);
        } else {
            absOperator.setGroupInfo(str2, distributeType, str3.split(","));
        }
        StreamingConfig config = absOperator.getConfig();
        StreamingConfig streamingConfig = config == null ? new StreamingConfig() : config;
        streamingConfig.put("streaming.inner.input.stream.name", str2);
        streamingConfig.put("streaming.inner.input.schema", StreamingUtils.serializeSchema(tupleEventType));
        try {
            absOperator.setConfig(streamingConfig);
        } catch (StreamingException e) {
            throw ExecutorException.wrapStreamingException(e);
        }
    }

    private void combineFromTransition(Map<String, AbsOperator> map, String str, String str2, TupleEventType tupleEventType) throws ExecutorException {
        AbsOperator absOperator = map.get(str);
        StreamingConfig config = absOperator.getConfig();
        StreamingConfig streamingConfig = config == null ? new StreamingConfig() : config;
        streamingConfig.put("streaming.inner.output.schema", StreamingUtils.serializeSchema(tupleEventType));
        streamingConfig.put("streaming.inner.output.stream.name", str2);
        try {
            absOperator.setConfig(streamingConfig);
        } catch (StreamingException e) {
            throw ExecutorException.wrapStreamingException(e);
        }
    }

    private Map<String, AbsOperator> createOperatorInfos(Map<String, Operator> map) throws ExecutorException {
        HashMap newHashMap = Maps.newHashMap();
        Iterator<Operator> it = map.values().iterator();
        while (it.hasNext()) {
            AbsOperator createOperatorInfo = createOperatorInfo(it.next());
            newHashMap.put(createOperatorInfo.getOperatorId(), createOperatorInfo);
        }
        return newHashMap;
    }

    private AbsOperator createOperatorInfo(Operator operator) throws ExecutorException {
        return OperatorInfoCreatorFactory.createOperatorInfo(this.apiApplication, operator, this.executorApp.getStreamSchema(), this.systemConfig);
    }

    private void parseSchemas() throws ExecutorException {
        BaseAnalyzer.setSchemaNameInAttributes(this.apiApplication.getSchemas());
        Iterator<Schema> it = this.apiApplication.getSchemas().iterator();
        while (it.hasNext()) {
            try {
                this.executorApp.addEventSchema(parseSchemaToIEvent(it.next()));
            } catch (StreamingException e) {
                throw ExecutorException.wrapStreamingException(e);
            }
        }
    }

    private TupleEventType parseSchemaToIEvent(Schema schema) throws ExecutorException {
        ArrayList newArrayList = Lists.newArrayList();
        for (int i = 0; i < schema.getCols().size(); i++) {
            newArrayList.add(new Attribute(getColumnDataType(schema, i), schema.getCols().get(i).getName()));
        }
        return new TupleEventType(schema.getId(), newArrayList);
    }

    private Class<?> getColumnDataType(Schema schema, int i) throws ExecutorException {
        try {
            return Class.forName(schema.getCols().get(i).getType(), true, CQLUtils.getClassLoader());
        } catch (ClassNotFoundException e) {
            StreamingException executorException = new ExecutorException(ErrorCode.SEMANTICANALYZE_UNSUPPORTED_DATATYPE, schema.getCols().get(i).getType());
            LOG.error("Unsupport data type.", executorException);
            throw executorException;
        }
    }

    private void createEmptyApplication(String str) throws ExecutorException {
        try {
            this.executorApp = ApplicationFactory.createApplication(getApplicationConfig(this.apiApplication.getConfs()), str);
        } catch (StreamingException e) {
            ExecutorException.wrapStreamingException(e);
        }
    }

    private StreamingConfig getApplicationConfig(Map<String, String> map) {
        StreamingConfig streamingConfig = new StreamingConfig();
        if (map != null) {
            streamingConfig.putAll(map);
        }
        return streamingConfig;
    }
}
