package org.apache.hadoop.hive.ql.exec.spark;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.AbstractMapOperator;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.ScriptOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticGroup;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobRef;
import org.apache.hadoop.hive.ql.history.HiveHistory;
import org.apache.hadoop.hive.ql.intercept.rules.RuleException;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.OperationMetrics;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.task.AppManager;
import org.apache.hadoop.hive.task.AppManagerImpl;
import org.apache.hadoop.hive.task.HiveRegistry;
import org.apache.hadoop.util.StringUtils;
import org.apache.spark.SparkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hive/ql/exec/spark/SparkTask.class */
public class SparkTask extends Task<SparkWork> {
    private static final String CLASS_NAME = SparkTask.class.getName();
    private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
    private static final SessionState.LogHelper console = new SessionState.LogHelper(LOG);
    private PerfLogger perfLogger;
    private static final long serialVersionUID = 1;
    private transient int sparkJobID;
    private transient String sparkJobHandleId;
    private transient SparkStatistics sparkStatistics;
    private transient long submitTime;
    private transient long startTime;
    private transient long finishTime;
    private transient int succeededTaskCount;
    private transient int totalTaskCount;
    private transient int failedTaskCount;
    private transient List<Integer> stageIds;
    private transient SparkJobRef jobRef = null;
    private transient boolean isShutdown = false;
    private transient boolean jobKilled = false;
    private static final transient String hoodieCombineFormat = "org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat";

    @Override // org.apache.hadoop.hive.ql.exec.Task
    public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext, CompilationOpContext compilationOpContext) {
        super.initialize(queryState, queryPlan, driverContext, compilationOpContext);
    }

    @Override // org.apache.hadoop.hive.ql.exec.Task
    public int execute(DriverContext driverContext) {
        int errorCode;
        SparkSessionManagerImpl sparkSessionManagerImpl;
        SparkSession sparkSession;
        AppManager appManager = (AppManager) HiveRegistry.getObject("AppManager");
        if (appManager == null) {
            HiveRegistry.register("AppManager", new AppManagerImpl(new HiveConf()));
            appManager = (AppManager) HiveRegistry.getObject("AppManager");
        }
        this.perfLogger = SessionState.getPerfLogger();
        SparkSessionManager sparkSessionManager = null;
        try {
            try {
                if (this.conf.getVar(HiveConf.ConfVars.HIVEINPUTFORMAT).equals("org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat")) {
                    this.conf.setBoolVar(HiveConf.ConfVars.HUDI_HIVE_REALTIME, true);
                }
                printConfigInfo();
                sparkSessionManagerImpl = SparkSessionManagerImpl.getInstance();
                sparkSession = SparkUtilities.getSparkSession(this.conf, sparkSessionManagerImpl);
                SparkWork work = getWork();
                work.setRequiredCounterPrefix(getOperatorCounters());
                this.perfLogger.PerfLogBegin(CLASS_NAME, "SparkSubmitJob");
                this.submitTime = this.perfLogger.getStartTime("SparkSubmitJob").longValue();
                this.jobRef = sparkSession.submit(driverContext, work);
                this.perfLogger.PerfLogEnd(CLASS_NAME, "SparkSubmitJob");
                if (this.jobRef instanceof RemoteSparkJobRef) {
                    ((RemoteSparkJobRef) this.jobRef).sqlRuleHelper = getSqlRuleHelper();
                }
            } catch (Exception e) {
                String str = "Failed to execute spark task, with exception '" + Utilities.getNameMessage(e) + "'";
                console.printError(str, MetaDataFormatUtils.LINE_DELIM + StringUtils.stringifyException(e));
                LOG.error(str, e);
                setException(e);
                errorCode = e instanceof HiveException ? ((HiveException) e).getCanonicalErrorMsg().getErrorCode() : 1;
                this.startTime = this.perfLogger.getEndTime("SparkSubmitToRunning").longValue();
                if (this.startTime < this.submitTime) {
                    this.startTime = this.submitTime;
                }
                this.finishTime = this.perfLogger.getEndTime("SparkRunJob").longValue();
                Utilities.clearWork(this.conf);
                if (0 != 0 && 0 != 0) {
                    errorCode = close(errorCode);
                    try {
                        sparkSessionManager.returnSession(null);
                    } catch (HiveException e2) {
                        LOG.error("Failed to return the session to SessionManager", e2);
                    }
                }
                if (this.jobID != null) {
                    appManager.remove(this.jobID);
                }
            }
            if (driverContext.isShutdown()) {
                LOG.warn("Killing Spark job");
                killJob();
                throw new HiveException("Operation is cancelled.");
            }
            this.sparkJobHandleId = this.jobRef.getJobId();
            addToHistory(HiveHistory.Keys.SPARK_JOB_HANDLE_ID, this.jobRef.getJobId());
            LOG.debug("Starting Spark job with job handle id " + this.sparkJobHandleId);
            this.jobID = this.jobRef.getSparkJobStatus().getAppID();
            if (this.jobID != null) {
                appManager.record(this.jobID);
            }
            errorCode = this.jobRef.monitorJob();
            this.sparkJobID = this.jobRef.getSparkJobStatus().getJobId();
            addToHistory(HiveHistory.Keys.SPARK_JOB_ID, Integer.toString(this.sparkJobID));
            SparkJobStatus sparkJobStatus = this.jobRef.getSparkJobStatus();
            getSparkJobInfo(sparkJobStatus, errorCode);
            recordSparkTask(appManager, sparkJobStatus);
            if (errorCode == 0) {
                this.sparkStatistics = sparkJobStatus.getSparkStatistics();
                if (this.sparkStatistics != null) {
                    printExcessiveGCWarning();
                    LOG.debug(sparkStatisticsToString(this.sparkStatistics, this.sparkJobID));
                }
                LOG.info("Successfully completed Spark job[" + this.sparkJobID + "] with application ID " + this.jobID + " and task ID " + getId());
            } else if (errorCode == 2) {
                LOG.debug("Failed to submit Spark job with job handle id " + this.sparkJobHandleId);
                LOG.info("Failed to submit Spark job for application id " + (Strings.isNullOrEmpty(this.jobID) ? "UNKNOWN" : this.jobID));
                killJob();
            } else if (errorCode == 4) {
                LOG.info("The spark job or one stage of it has too many tasks. Cancelling Spark job " + this.sparkJobID + " with application ID " + this.jobID);
                killJob();
            }
            if (this.jobID == null) {
                this.jobID = sparkJobStatus.getAppID();
            }
            sparkJobStatus.cleanup();
            this.startTime = this.perfLogger.getEndTime("SparkSubmitToRunning").longValue();
            if (this.startTime < this.submitTime) {
                this.startTime = this.submitTime;
            }
            this.finishTime = this.perfLogger.getEndTime("SparkRunJob").longValue();
            Utilities.clearWork(this.conf);
            if (sparkSession != null && sparkSessionManagerImpl != null) {
                errorCode = close(errorCode);
                try {
                    sparkSessionManagerImpl.returnSession(sparkSession);
                } catch (HiveException e3) {
                    LOG.error("Failed to return the session to SessionManager", e3);
                }
            }
            if (this.jobID != null) {
                appManager.remove(this.jobID);
            }
            return errorCode;
        } catch (Throwable th) {
            this.startTime = this.perfLogger.getEndTime("SparkSubmitToRunning").longValue();
            if (this.startTime < this.submitTime) {
                this.startTime = this.submitTime;
            }
            this.finishTime = this.perfLogger.getEndTime("SparkRunJob").longValue();
            Utilities.clearWork(this.conf);
            if (0 != 0 && 0 != 0) {
                close(0);
                try {
                    sparkSessionManager.returnSession(null);
                } catch (HiveException e4) {
                    LOG.error("Failed to return the session to SessionManager", e4);
                }
            }
            if (this.jobID != null) {
                appManager.remove(this.jobID);
            }
            throw th;
        }
    }

    private void recordSparkTask(AppManager appManager, SparkJobStatus sparkJobStatus) {
        int i = 0;
        int i2 = 0;
        SparkStatisticGroup sparkStatisticGroup = null;
        if (sparkJobStatus != null && sparkJobStatus.getSparkStatistics() != null) {
            sparkStatisticGroup = sparkJobStatus.getSparkStatistics().getStatisticGroup(SparkStatisticsNames.SPARK_GROUP_NAME);
        }
        if (sparkStatisticGroup != null) {
            i = Integer.parseInt(sparkStatisticGroup.getSparkStatistic(SparkStatisticsNames.MAP_TASKS).getValue());
            i2 = Integer.parseInt(sparkStatisticGroup.getSparkStatistic(SparkStatisticsNames.INPUT_FILES).getValue());
        } else {
            LOG.warn("SparkTask counters is null");
        }
        SessionState sessionState = SessionState.get();
        String str = "anonymous";
        if (sessionState != null && sessionState.getAuthenticator() != null) {
            String userName = sessionState.getAuthenticator().getUserName();
            str = userName != null ? userName : str;
        }
        String str2 = System.currentTimeMillis() + "";
        String str3 = str;
        if (getQueryState() != null) {
            str3 = getQueryState().getQueryString() != null ? getQueryState().getQueryString() : str;
            str2 = getQueryState().getQueryId() != null ? getQueryState().getQueryId() : str2;
        }
        appManager.recordHqlSplit(str3 + str2, i2);
        appManager.recordHql(str2, str, str3, i);
        LOG.info("Total input files : {}, map tasks : {}", Integer.valueOf(i2), Integer.valueOf(i));
    }

    private void printExcessiveGCWarning() {
        SparkStatisticGroup statisticGroup = this.sparkStatistics.getStatisticGroup(SparkStatisticsNames.SPARK_GROUP_NAME);
        if (statisticGroup != null) {
            long parseLong = Long.parseLong(statisticGroup.getSparkStatistic(SparkStatisticsNames.TASK_DURATION_TIME).getValue());
            long parseLong2 = Long.parseLong(statisticGroup.getSparkStatistic(SparkStatisticsNames.JVM_GC_TIME).getValue());
            if (parseLong2 > parseLong * 0.1d) {
                console.printInfo(String.format("WARNING: Spark Job[%s] Spent %s%% (%s ms / %s ms) of task time in GC", Integer.valueOf(this.sparkJobID), Long.valueOf(Math.round((parseLong2 / parseLong) * 100.0d)), Long.valueOf(parseLong2), Long.valueOf(parseLong)));
            }
        }
    }

    private void addToHistory(HiveHistory.Keys keys, String str) {
        if (SessionState.get() != null) {
            SessionState.get().getHiveHistory().setQueryProperty(this.queryState.getQueryId(), keys, str);
        }
    }

    @VisibleForTesting
    static String sparkStatisticsToString(SparkStatistics sparkStatistics, int i) {
        StringBuilder sb = new StringBuilder();
        sb.append("\n\n");
        sb.append(String.format("=====Spark Job[%d] Statistics=====", Integer.valueOf(i)));
        sb.append("\n\n");
        Iterator<SparkStatisticGroup> statisticGroups = sparkStatistics.getStatisticGroups();
        while (statisticGroups.hasNext()) {
            SparkStatisticGroup next = statisticGroups.next();
            sb.append(next.getGroupName()).append(MetaDataFormatUtils.LINE_DELIM);
            Iterator<SparkStatistic> statistics = next.getStatistics();
            while (statistics.hasNext()) {
                SparkStatistic next2 = statistics.next();
                sb.append(MetaDataFormatUtils.FIELD_DELIM).append(next2.getName()).append(": ").append(next2.getValue()).append(MetaDataFormatUtils.LINE_DELIM);
                if (FileSinkOperator.Counter.RECORDS_OUT.toString().equalsIgnoreCase(next2.getName()) && OperationMetrics.getCurrentOperationMetrics() != null) {
                    OperationMetrics.getCurrentOperationMetrics().setAffectRowCount(Long.valueOf(next2.getValue()).longValue());
                }
            }
        }
        return sb.toString();
    }

    private int close(int i) {
        try {
            Iterator<BaseWork> it = ((SparkWork) this.work).getAllWork().iterator();
            while (it.hasNext()) {
                Iterator<Operator<?>> it2 = it.next().getAllOperators().iterator();
                while (it2.hasNext()) {
                    it2.next().jobClose(this.conf, i == 0);
                }
            }
        } catch (Exception e) {
            if (i == 0) {
                i = 3;
                console.printError("Job Commit failed with exception '" + Utilities.getNameMessage(e) + "'", MetaDataFormatUtils.LINE_DELIM + StringUtils.stringifyException(e));
                setException(e);
            }
        }
        return i;
    }

    @Override // org.apache.hadoop.hive.ql.exec.Task
    public void updateTaskMetrics(Metrics metrics) {
        metrics.incrementCounter("hive_spark_tasks");
    }

    @Override // org.apache.hadoop.hive.ql.exec.Task
    public boolean isMapRedTask() {
        return true;
    }

    @Override // org.apache.hadoop.hive.ql.exec.Task
    public StageType getType() {
        return StageType.MAPRED;
    }

    @Override // org.apache.hadoop.hive.ql.lib.Node
    public String getName() {
        return SparkStatisticsNames.SPARK_GROUP_NAME;
    }

    @Override // org.apache.hadoop.hive.ql.exec.Task
    public Collection<MapWork> getMapWork() {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<BaseWork> it = getWork().getRoots().iterator();
        while (it.hasNext()) {
            newArrayList.add((MapWork) it.next());
        }
        return newArrayList;
    }

    @Override // org.apache.hadoop.hive.ql.exec.Task
    public Operator<? extends OperatorDesc> getReducer(MapWork mapWork) {
        List<BaseWork> children = getWork().getChildren(mapWork);
        if (children.size() == 1 && (children.get(0) instanceof ReduceWork)) {
            return ((ReduceWork) children.get(0)).getReducer();
        }
        return null;
    }

    public int getSparkJobID() {
        return this.sparkJobID;
    }

    public SparkStatistics getSparkStatistics() {
        return this.sparkStatistics;
    }

    public int getSucceededTaskCount() {
        return this.succeededTaskCount;
    }

    public int getTotalTaskCount() {
        return this.totalTaskCount;
    }

    public int getFailedTaskCount() {
        return this.failedTaskCount;
    }

    public List<Integer> getStageIds() {
        return this.stageIds;
    }

    public long getStartTime() {
        return this.startTime;
    }

    public long getSubmitTime() {
        return this.submitTime;
    }

    public long getFinishTime() {
        return this.finishTime;
    }

    public boolean isTaskShutdown() {
        return this.isShutdown;
    }

    @Override // org.apache.hadoop.hive.ql.exec.Task
    public void shutdown() {
        super.shutdown();
        killJob();
        this.isShutdown = true;
    }

    private void killJob() {
        LOG.debug("Killing Spark job with job handle id " + this.sparkJobHandleId);
        boolean z = false;
        if (this.jobRef != null && !this.jobKilled) {
            synchronized (this) {
                if (!this.jobKilled) {
                    this.jobKilled = true;
                    z = true;
                }
            }
        }
        if (z) {
            try {
                this.jobRef.cancelJob();
            } catch (Exception e) {
                LOG.warn("Failed to kill Spark job", e);
            }
        }
    }

    private void printConfigInfo() throws IOException {
        console.printInfo("In order to change the average load for a reducer (in bytes):");
        console.printInfo("  set " + HiveConf.ConfVars.BYTESPERREDUCER.varname + "=<number>");
        console.printInfo("In order to limit the maximum number of reducers:");
        console.printInfo("  set " + HiveConf.ConfVars.MAXREDUCERS.varname + "=<number>");
        console.printInfo("In order to set a constant number of reducers:");
        console.printInfo("  set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "=<number>");
    }

    private Map<String, List<String>> getOperatorCounters() {
        String var = HiveConf.getVar(this.conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
        HashMap hashMap = new HashMap();
        LinkedList linkedList = new LinkedList();
        hashMap.put(var, linkedList);
        linkedList.add(Operator.HIVE_COUNTER_CREATED_FILES);
        for (AbstractMapOperator.Counter counter : AbstractMapOperator.Counter.values()) {
            linkedList.add(counter.toString());
        }
        Iterator<BaseWork> it = getWork().getAllWork().iterator();
        while (it.hasNext()) {
            for (Operator<?> operator : it.next().getAllOperators()) {
                if (operator instanceof FileSinkOperator) {
                    for (FileSinkOperator.Counter counter2 : FileSinkOperator.Counter.values()) {
                        linkedList.add(((FileSinkOperator) operator).getCounterName(counter2));
                    }
                } else if (operator instanceof ReduceSinkOperator) {
                    String str = this.conf.get(Operator.CONTEXT_NAME_KEY, "");
                    for (Operator.Counter counter3 : Operator.Counter.values()) {
                        linkedList.add(Utilities.getVertexCounterName(counter3.name(), str));
                    }
                } else if (operator instanceof ScriptOperator) {
                    for (ScriptOperator.Counter counter4 : ScriptOperator.Counter.values()) {
                        linkedList.add(counter4.toString());
                    }
                } else if (operator instanceof JoinOperator) {
                    for (JoinOperator.SkewkeyTableCounter skewkeyTableCounter : JoinOperator.SkewkeyTableCounter.values()) {
                        linkedList.add(skewkeyTableCounter.toString());
                    }
                }
            }
        }
        return hashMap;
    }

    private void getSparkJobInfo(SparkJobStatus sparkJobStatus, int i) {
        Throwable error;
        try {
            this.stageIds = new ArrayList();
            int[] stageIds = sparkJobStatus.getStageIds();
            if (stageIds != null) {
                for (int i2 : stageIds) {
                    this.stageIds.add(Integer.valueOf(i2));
                }
            }
            Map<String, SparkStageProgress> sparkStageProgress = sparkJobStatus.getSparkStageProgress();
            int i3 = 0;
            int i4 = 0;
            int i5 = 0;
            Iterator<String> it = sparkStageProgress.keySet().iterator();
            while (it.hasNext()) {
                SparkStageProgress sparkStageProgress2 = sparkStageProgress.get(it.next());
                int succeededTaskCount = sparkStageProgress2.getSucceededTaskCount();
                i3 += sparkStageProgress2.getTotalTaskCount();
                i4 += succeededTaskCount;
                i5 += sparkStageProgress2.getFailedTaskCount();
            }
            this.succeededTaskCount = i4;
            this.totalTaskCount = i3;
            this.failedTaskCount = i5;
            if (i != 0 && (error = sparkJobStatus.getError()) != null) {
                if ((error instanceof InterruptedException) || (error instanceof RuleException) || ((error instanceof HiveException) && (error.getCause() instanceof InterruptedException))) {
                    LOG.info("Killing Spark job since query was interrupted");
                    killJob();
                }
                setException(isOOMError(error) ? new HiveException(error, ErrorMsg.SPARK_RUNTIME_OOM) : new HiveException(error, ErrorMsg.SPARK_JOB_RUNTIME_ERROR));
            }
        } catch (Exception e) {
            LOG.error("Failed to get Spark job information", e);
        }
    }

    private boolean isOOMError(Throwable th) {
        while (th != null) {
            if (th instanceof OutOfMemoryError) {
                return true;
            }
            if (th instanceof SparkException) {
                return Throwables.getStackTraceAsString(th).contains("Container killed by YARN for exceeding memory limits");
            }
            th = th.getCause();
        }
        return false;
    }
}
