package com.huawei.streaming.cql.executor;

import com.google.common.collect.Lists;
import com.huawei.streaming.api.Application;
import com.huawei.streaming.api.UserFunction;
import com.huawei.streaming.config.StreamingConfig;
import com.huawei.streaming.cql.DriverContext;
import com.huawei.streaming.cql.exception.ExecutorException;
import com.huawei.streaming.cql.executor.executorplanvalidater.ExecutorPlanChecker;
import com.huawei.streaming.cql.executor.mergeuserdefinds.JarFilter;
import com.huawei.streaming.cql.executor.mergeuserdefinds.Merger;
import com.huawei.streaming.cql.executor.pyhsicplanvalidater.PhysicPlanChecker;
import com.huawei.streaming.cql.hooks.ExecutorHook;
import com.huawei.streaming.exception.ErrorCode;
import com.huawei.streaming.exception.StreamingException;
import com.huawei.streaming.exception.StreamingRuntimeException;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.tools.ant.BuildException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/streaming/cql/executor/PhysicalPlanExecutor.class */
public class PhysicalPlanExecutor implements ExecutorHook {
    private static final Logger LOG = LoggerFactory.getLogger(PhysicalPlanExecutor.class);
    private static final long ONE_KB = 1024;
    private static final long ONE_MB = 1048576;
    private ExecutorPlanGenerator generator;
    private ExecutorPlanChecker executorChecker;
    private List<ExecutorHook> executorHooks;
    private DriverContext driverContext = null;
    private String userPackagedJar = null;
    private StreamingConfig config;

    public PhysicalPlanExecutor() {
        this.generator = null;
        this.executorChecker = null;
        this.executorHooks = null;
        this.config = null;
        this.executorChecker = new ExecutorPlanChecker();
        this.generator = new ExecutorPlanGenerator();
        this.executorHooks = Lists.newArrayList();
        this.config = new StreamingConfig();
    }

    public void execute(String str) throws ExecutorException {
        execute(PhysicalPlanLoader.load(str).getApploication());
    }

    public void execute(Application application) throws ExecutorException {
        LOG.info("start to execute application {}", application.getApplicationId());
        boolean z = true;
        if (DriverContext.getFunctions().get() == null) {
            this.driverContext = new DriverContext();
            z = false;
        }
        try {
            try {
                try {
                    parseUserDefineds(application, z);
                    submit(generatorPlan(application));
                    if (this.userPackagedJar != null) {
                        LOG.info("delete user packed jar after submit");
                        FileUtils.deleteQuietly(new File(this.userPackagedJar));
                    }
                    if (this.driverContext != null) {
                        this.driverContext.clean();
                    }
                } catch (StreamingException e) {
                    throw ExecutorException.wrapStreamingException(e);
                }
            } catch (StreamingRuntimeException e2) {
                throw ExecutorException.wrapStreamingRunTimeException(e2);
            }
        } catch (Throwable th) {
            if (this.userPackagedJar != null) {
                LOG.info("delete user packed jar after submit");
                FileUtils.deleteQuietly(new File(this.userPackagedJar));
            }
            if (this.driverContext != null) {
                this.driverContext.clean();
            }
            throw th;
        }
    }

    @Override // com.huawei.streaming.cql.hooks.ExecutorHook
    public void preExecute(Application application) {
        for (int i = 0; i < this.executorHooks.size(); i++) {
            this.executorHooks.get(i).preExecute(application);
        }
    }

    @Override // com.huawei.streaming.cql.hooks.ExecutorHook
    public void preSubmit(com.huawei.streaming.application.Application application) {
        for (int i = 0; i < this.executorHooks.size(); i++) {
            this.executorHooks.get(i).preSubmit(application);
        }
    }

    private void submit(com.huawei.streaming.application.Application application) throws ExecutorException {
        LOG.info("start to submit application {}", application.getAppName());
        if (this.userPackagedJar != null) {
            application.setUserPackagedJar(this.userPackagedJar);
        }
        try {
            application.launch();
        } catch (StreamingException e) {
            throw ExecutorException.wrapStreamingException(e);
        }
    }

    private com.huawei.streaming.application.Application generatorPlan(Application application) throws ExecutorException {
        preExecute(application);
        new PhysicPlanChecker().check(application);
        com.huawei.streaming.application.Application generate = this.generator.generate(application);
        preSubmit(generate);
        this.executorChecker.check(generate);
        return generate;
    }

    private void parseUserDefineds(Application application, boolean z) throws StreamingException {
        String[] userFiles = application.getUserFiles();
        if (!z) {
            addJars(userFiles);
            registerFunctions(application.getUserFunctions());
        }
        packageJar(application);
        checkUserJarSize();
    }

    private void checkUserJarSize() throws StreamingException {
        try {
            File canonicalFile = new File(this.userPackagedJar).getCanonicalFile();
            if (!canonicalFile.getPath().startsWith(new File(this.config.getStringValue("streaming.template.directory")).getCanonicalPath())) {
                LOG.error("Invalid user jar path, not in config template path.");
                throw new StreamingException(ErrorCode.UNKNOWN_SERVER_COMMON_ERROR, new String[0]);
            }
            if (!canonicalFile.exists()) {
                LOG.error("Can't found submitted jar.");
                throw new ExecutorException(ErrorCode.UNKNOWN_SERVER_COMMON_ERROR, new String[0]);
            }
            if (!canonicalFile.isFile()) {
                LOG.error("Submitted jar is not a jar file.");
                throw new ExecutorException(ErrorCode.UNKNOWN_SERVER_COMMON_ERROR, new String[0]);
            }
            if (!JarFilter.isJarFile(this.userPackagedJar)) {
                LOG.error("Submitted jar is not a jar file.");
                throw new ExecutorException(ErrorCode.UNKNOWN_SERVER_COMMON_ERROR, new String[0]);
            }
            int intValue = this.config.getIntValue("streaming.userfile.maxsize");
            if (canonicalFile.length() > intValue * ONE_MB) {
                StreamingException executorException = new ExecutorException(ErrorCode.SEMANTICANALYZE_USERFILE_OVER_MAXSIZE, String.valueOf(intValue));
                LOG.error("Submitted jar size than max size.");
                throw executorException;
            }
        } catch (IOException e) {
            LOG.error("Failed to get canonical pathname for io error.");
            throw new ExecutorException(ErrorCode.UNKNOWN_SERVER_COMMON_ERROR, new String[0]);
        } catch (SecurityException e2) {
            StreamingException streamingException = new StreamingException(ErrorCode.UNKNOWN_SERVER_COMMON_ERROR, new String[0]);
            LOG.error("Failed to get canonical pathname for cannot be accessed.", streamingException);
            throw streamingException;
        }
    }

    private void packageJar(Application application) throws StreamingException {
        String str = application.getApplicationId() + ".";
        String replace = UUID.randomUUID().toString().replace("-", "");
        try {
            String canonicalPath = new File(this.config.getStringValue("streaming.template.directory")).getCanonicalPath();
            this.userPackagedJar = canonicalPath + File.separator + str + replace + ".jar";
            try {
                new Merger().merge(application, canonicalPath, this.userPackagedJar);
            } catch (IOException e) {
                StreamingException executorException = new ExecutorException(ErrorCode.UNKNOWN_SERVER_COMMON_ERROR, new String[0]);
                LOG.error("Failed to merge all user files to one jar for io error.", executorException);
                throw executorException;
            } catch (BuildException e2) {
                LOG.error("Failed to merge all user files to one jar", e2);
                throw new ExecutorException((Throwable) e2, ErrorCode.UNKNOWN_SERVER_COMMON_ERROR, new String[0]);
            }
        } catch (IOException e3) {
            StreamingException streamingException = new StreamingException(ErrorCode.UNKNOWN_SERVER_COMMON_ERROR, new String[0]);
            LOG.error("Failed to get canonical pathname for io error.", streamingException);
            throw streamingException;
        } catch (SecurityException e4) {
            StreamingException streamingException2 = new StreamingException(ErrorCode.UNKNOWN_SERVER_COMMON_ERROR, new String[0]);
            LOG.error("Failed to get canonical pathname for cannot be accessed.", streamingException2);
            throw streamingException2;
        }
    }

    private void registerFunctions(List<UserFunction> list) {
        if (list == null) {
            return;
        }
        Iterator<UserFunction> it = list.iterator();
        while (it.hasNext()) {
            this.driverContext.addUserDefoundFunctions(it.next());
        }
    }

    private void addJars(String[] strArr) throws ExecutorException {
        if (strArr == null) {
            return;
        }
        for (String str : strArr) {
            if (JarFilter.isJarFile(str)) {
                this.driverContext.addJar(str);
            }
        }
    }
}
