package org.apache.hadoop.hive.ql.exec.spark.session;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.hive.spark.client.SparkClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.class */
public class SparkSessionManagerImpl implements SparkSessionManager {
    private static final Logger LOG = LoggerFactory.getLogger(SparkSessionManagerImpl.class);
    private Set<SparkSession> createdSessions = Collections.synchronizedSet(new HashSet());
    private volatile boolean inited = false;
    private volatile boolean initedPool = false;
    private Map<String, BlockingQueue<SparkSession>> sessionPool = new ConcurrentHashMap();
    private static SparkSessionManagerImpl instance;

    public static synchronized SparkSessionManagerImpl getInstance() throws HiveException {
        if (instance == null) {
            instance = new SparkSessionManagerImpl();
        }
        return instance;
    }

    private SparkSessionManagerImpl() {
    }

    @Override // org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager
    public void setup(HiveConf hiveConf) throws HiveException {
        if (this.inited) {
            return;
        }
        synchronized (this) {
            if (!this.inited) {
                LOG.info("Setting up the session manager.");
                try {
                    SparkClientFactory.initialize(HiveSparkClientFactory.initiateSparkConf(hiveConf, null));
                    this.inited = true;
                } catch (IOException e) {
                    throw new HiveException("Error initializing SparkClientFactory", e);
                }
            }
        }
    }

    @Override // org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager
    public SparkSession getSession(SparkSession sparkSession, HiveConf hiveConf, boolean z) throws HiveException {
        setup(hiveConf);
        if (this.initedPool && hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_SPARK_INITIALIZE_DEFAULT_SESSIONS)) {
            SparkSession sparkSessionPoolSession = getSparkSessionPoolSession(sparkSession, hiveConf);
            if (!StringUtils.isBlank(sparkSessionPoolSession.getQueueName())) {
                hiveConf.set("spark.yarn.queue", sparkSessionPoolSession.getQueueName());
            }
            if (sparkSessionPoolSession != null) {
                if (!sparkSessionPoolSession.isOpen() && z) {
                    sparkSessionPoolSession.open(hiveConf);
                }
                return sparkSessionPoolSession;
            }
        }
        if (sparkSession != null) {
            if (!sparkSession.isOpen() && z) {
                sparkSession.open(hiveConf);
            }
            return sparkSession;
        }
        SparkSessionImpl sparkSessionImpl = new SparkSessionImpl();
        if (z) {
            sparkSessionImpl.open(hiveConf);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("New session (%s) is created.", sparkSessionImpl.getSessionId()));
        }
        this.createdSessions.add(sparkSessionImpl);
        return sparkSessionImpl;
    }

    public SparkSession getSparkSessionPoolSession(SparkSession sparkSession, HiveConf hiveConf) throws HiveException {
        int i = 0;
        SparkSession sparkSession2 = null;
        HiveConf conf = SessionState.get().getConf();
        if (conf.getSparkConfigUpdated() || hiveConf.getSparkConfigUpdated()) {
            closeExistingSparkSession(sparkSession);
            hiveConf.setSparkConfigUpdated(false);
            conf.setSparkConfigUpdated(false);
        } else {
            if (canWorkWithSameSession(sparkSession, hiveConf)) {
                LOG.info("Session id {} is already open", sparkSession);
                return sparkSession;
            }
            closeExistingSparkSession(sparkSession);
        }
        String str = hiveConf.get("spark.yarn.queue", "default");
        String var = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION);
        boolean boolVar = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
        if (!var.equals("NONE") || boolVar) {
            return getNewSparkSession(str);
        }
        long timeVar = hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_SERVER2_SPARK_DEFAULT_SESSION_TIMEOUT, TimeUnit.SECONDS);
        BlockingQueue<SparkSession> blockingQueue = this.sessionPool.get(str);
        if (blockingQueue != null) {
            i = blockingQueue.size();
            try {
                sparkSession2 = blockingQueue.poll(timeVar, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                LOG.error("interrupted before being take of BlockingQueue ", e);
            }
        }
        LOG.info("QueueName: {}, headSparkSession: {}, defaultQueuePool: {}, blockingQueueLength: {}", new Object[]{str, sparkSession2, this.sessionPool, Integer.valueOf(i)});
        if (sparkSession2 == null) {
            return getNewSparkSession(str);
        }
        LOG.info("Choosing a session from the defaultQueuePool: {}", sparkSession2);
        return sparkSession2;
    }

    public void closeExistingSparkSession(SparkSession sparkSession) throws HiveException {
        instance.closeSessionIfNotDefault(sparkSession);
        if (sparkSession == null || !sparkSession.isDefault()) {
            return;
        }
        sparkSession.close();
    }

    public SparkSession getNewSparkSession(String str) {
        SparkSessionImpl sparkSessionImpl = new SparkSessionImpl();
        sparkSessionImpl.setQueueName(str);
        this.createdSessions.add(sparkSessionImpl);
        LOG.info("Created a new session for queue: " + str + ", session id: " + sparkSessionImpl.getSessionId());
        return sparkSessionImpl;
    }

    public boolean canWorkWithSameSession(SparkSession sparkSession, HiveConf hiveConf) {
        if (sparkSession == null || hiveConf == null) {
            return false;
        }
        String queueName = sparkSession.getQueueName();
        String str = hiveConf.get("spark.yarn.queue", "default");
        LOG.info("Current queue name is {}, incoming queue name is {}", queueName, str);
        return str.equals(queueName);
    }

    @Override // org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager
    public void returnSession(SparkSession sparkSession) {
    }

    @Override // org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager
    public void returnSessionIfNotDefault(SparkSession sparkSession) throws HiveException {
        if (sparkSession == null) {
            return;
        }
        if (!sparkSession.isDefault()) {
            returnSession(sparkSession);
            return;
        }
        try {
            if (this.sessionPool.get(sparkSession.getQueueName()) != null) {
                this.sessionPool.get(sparkSession.getQueueName()).offer(sparkSession, 100L, TimeUnit.MILLISECONDS);
                LOG.info("The session {} belongs to the pool. Put it back in", sparkSession.getSessionId());
            }
        } catch (InterruptedException e) {
            LOG.error("Interrupted before being put of BlockingQueue.");
        }
    }

    @Override // org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager
    public void closeSession(SparkSession sparkSession) throws HiveException {
        if (sparkSession == null) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Closing session (%s).", sparkSession.getSessionId()));
        }
        sparkSession.close();
        this.createdSessions.remove(sparkSession);
    }

    @Override // org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager
    public void closeSessionIfNotDefault(SparkSession sparkSession) throws HiveException {
        if (sparkSession == null) {
            return;
        }
        if (!sparkSession.isDefault()) {
            closeSession(sparkSession);
        } else {
            LOG.info("Default session can continue to reuse");
            returnSessionIfNotDefault(sparkSession);
        }
    }

    @Override // org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager
    public void shutdown() {
        LOG.info("Closing the session manager.");
        synchronized (this.createdSessions) {
            Iterator<SparkSession> it = this.createdSessions.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.createdSessions.clear();
        }
        this.sessionPool.clear();
        this.initedPool = false;
        this.inited = false;
        SparkClientFactory.stop();
    }

    public void initAndStartSparkSessionPoolManager(HiveConf hiveConf) throws HiveException, InterruptedException {
        if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_SPARK_INITIALIZE_DEFAULT_SESSIONS)) {
            SparkSessionManagerImpl sparkSessionManagerImpl = getInstance();
            LOG.info("Initializing spark session pool manager");
            sparkSessionManagerImpl.startPool(hiveConf);
            LOG.info("Spark session pool manager initialized.");
        }
    }

    public void startPool(HiveConf hiveConf) throws InterruptedException {
        String var = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_SERVER2_SPARK_DEFAULT_QUEUES);
        int intVar = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_SPARK_SESSIONS_PER_DEFAULT_QUEUE);
        String[] split = var.split(",");
        this.initedPool = true;
        LOG.info("start spark session pool");
        for (String str : split) {
            if (!StringUtils.isBlank(str)) {
                ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(intVar);
                for (int i = 0; i < intVar; i++) {
                    SparkSessionImpl sparkSessionImpl = new SparkSessionImpl();
                    sparkSessionImpl.setQueueName(str);
                    sparkSessionImpl.setDefault(true);
                    arrayBlockingQueue.put(sparkSessionImpl);
                    HiveConf hiveConf2 = new HiveConf(hiveConf);
                    hiveConf2.set("spark.yarn.queue", sparkSessionImpl.getQueueName());
                    sparkSessionImpl.setConf(hiveConf2);
                    LOG.info("Created new spark session for queue: {} with session id: {}", str, sparkSessionImpl.getSessionId());
                    this.createdSessions.add(sparkSessionImpl);
                }
                this.sessionPool.put(str, arrayBlockingQueue);
            }
        }
    }

    static {
        ShutdownHookManager.addShutdownHook(new Runnable() { // from class: org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (SparkSessionManagerImpl.instance != null) {
                        SparkSessionManagerImpl.instance.shutdown();
                    }
                } catch (Exception e) {
                }
            }
        });
    }
}
