package org.apache.proxy.service;

import io.netty.util.concurrent.MultithreadEventExecutorGroup;
import io.netty.util.concurrent.ScheduledFuture;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.proxy.service.client.Job;
import org.apache.proxy.service.client.JobContext;
import org.apache.proxy.service.client.SparkClient;
import org.apache.proxy.service.client.SparkClientFactory;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv$;
import org.apache.thrift.transport.TTransportException;

/* loaded from: input_file:org/apache/proxy/service/SparkClientManager.class */
public class SparkClientManager {
    public static final Log LOG = LogFactory.getLog(ThriftCLIProxyService.class.getName());
    private SparkConf sparkConf;
    private HiveConf hiveConf;
    private MultithreadEventExecutorGroup eventExecutor;
    ThriftServerIdleMonitor thriftServerIdleMonitor;
    private long healthCheckInterval;
    private int healthCheckMaxRetry;
    private long healthCheckWaitTime;
    private boolean isStarted = false;
    private Map<ThriftServiceId, ThriftServerInstance> services = new ConcurrentHashMap();
    private Map<ThriftServiceId, ScheduledFuture<?>> futures = new ConcurrentHashMap();
    private List<SparkClientEvent> listeners = new CopyOnWriteArrayList();

    /* loaded from: input_file:org/apache/proxy/service/SparkClientManager$SparkClientEvent.class */
    public interface SparkClientEvent {
        void close(String str, String str2, ThriftServiceId thriftServiceId);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/proxy/service/SparkClientManager$ThriftServerIdleMonitor.class */
    public class ThriftServerIdleMonitor implements Runnable {
        private long cleanTime;
        private Map<ThriftServiceId, Long> idleTime = new ConcurrentHashMap();

        public ThriftServerIdleMonitor() {
            this.cleanTime = SparkClientManager.this.sparkConf.getTimeAsMs("spark.thriftserver.proxy.sessionWaitTime", "180s");
            if (this.cleanTime < 0) {
                SparkClientManager.LOG.warn("invalid session wait time : " + this.cleanTime + "( < 0 )");
                SparkClientManager.LOG.warn("set session wait time : 180000");
                this.cleanTime = 180000L;
            }
        }

        public void addIdleThriftService(ThriftServiceId thriftServiceId, long j) {
            this.idleTime.put(thriftServiceId, Long.valueOf(j));
        }

        public void removeIdleThriftService(ThriftServiceId thriftServiceId) {
            this.idleTime.remove(thriftServiceId);
        }

        @Override // java.lang.Runnable
        public void run() {
            if (SparkClientManager.this.isStarted) {
                for (ThriftServiceId thriftServiceId : (ThriftServiceId[]) this.idleTime.keySet().toArray(new ThriftServiceId[this.idleTime.size()])) {
                    if (!SparkClientManager.this.isStarted) {
                        return;
                    }
                    long currentTimeMillis = System.currentTimeMillis();
                    Long l = this.idleTime.get(thriftServiceId);
                    if (SparkClientManager.LOG.isDebugEnabled()) {
                        SparkClientManager.LOG.debug("Begin to do session monitor now. client : " + thriftServiceId + ". Begin time : " + l + ", current time : " + currentTimeMillis);
                    }
                    if (l != null && currentTimeMillis - this.cleanTime >= l.longValue()) {
                        SparkClientManager.LOG.debug("stopping client, current time : " + currentTimeMillis + ", begin time : " + l + ", clean time : " + this.cleanTime);
                        if (this.idleTime.remove(thriftServiceId) != null) {
                            SparkClientManager.this.stopClient(thriftServiceId);
                        }
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/proxy/service/SparkClientManager$ThriftServerMonitor.class */
    public class ThriftServerMonitor implements Runnable {
        private String queueName;
        private SparkClient sparkClient;
        private ThriftServiceId serviceId;
        private boolean stopped = false;

        public ThriftServerMonitor(ThriftServerInstance thriftServerInstance) {
            this.queueName = thriftServerInstance.getQueueName();
            this.sparkClient = thriftServerInstance.getSparkClient();
            this.serviceId = thriftServerInstance.getServiceId();
        }

        private boolean healthCheck() {
            boolean z = false;
            int i = 0;
            while (!z && i < SparkClientManager.this.healthCheckMaxRetry) {
                try {
                    this.sparkClient.run(new Job<Boolean>() { // from class: org.apache.proxy.service.SparkClientManager.ThriftServerMonitor.1
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // org.apache.proxy.service.client.Job
                        public Boolean call(JobContext jobContext) throws Exception {
                            jobContext.sparkSession().sql("healthcheck");
                            return true;
                        }
                    }).get(SparkClientManager.this.healthCheckWaitTime, TimeUnit.MILLISECONDS);
                    z = true;
                } catch (Exception e) {
                    try {
                        Thread.sleep(SparkClientManager.this.healthCheckInterval);
                    } catch (InterruptedException e2) {
                        SparkClientManager.LOG.warn("health check retry is interrupted, retry times : " + i);
                    }
                    i++;
                    SparkClientManager.LOG.warn("Health check failed.Begin to retry : " + i);
                }
            }
            return z;
        }

        @Override // java.lang.Runnable
        public void run() {
            SparkClientManager.LOG.debug("Begin to do health check now.");
            if (this.stopped || !healthCheck()) {
                SparkClientManager.LOG.debug("stopping client, current time : " + System.currentTimeMillis());
                this.stopped = true;
                SparkClientManager.this.stopClient(this.serviceId);
            }
        }
    }

    public synchronized void start() {
        if (this.isStarted) {
            return;
        }
        this.isStarted = true;
    }

    public synchronized void stop() {
        if (this.isStarted) {
            this.isStarted = false;
        }
    }

    public SparkClientManager(MultithreadEventExecutorGroup multithreadEventExecutorGroup, SparkConf sparkConf, HiveConf hiveConf) {
        this.eventExecutor = multithreadEventExecutorGroup;
        this.sparkConf = sparkConf;
        this.hiveConf = hiveConf;
        this.healthCheckInterval = sparkConf.getTimeAsMs("spark.thriftserver.proxy.healthcheck.period", "60s");
        if (this.healthCheckInterval <= 0) {
            LOG.warn("invalid health check period : " + this.healthCheckInterval + "( <= 0 )");
            LOG.warn("set health check period : 60000");
            this.healthCheckInterval = 60000L;
        }
        this.healthCheckMaxRetry = sparkConf.getInt("spark.thriftserver.proxy.healthcheck.recheckTimes", 3);
        if (this.healthCheckMaxRetry < 0) {
            LOG.warn("invalid health check recheck times : " + this.healthCheckMaxRetry + "( < 0 )");
            LOG.warn("set health check recheck times : 3");
            this.healthCheckMaxRetry = 3;
        }
        this.healthCheckWaitTime = sparkConf.getTimeAsMs("spark.thriftserver.proxy.healthcheck.waitTime", "10s");
        if (this.healthCheckWaitTime < 0) {
            LOG.warn("invalid health check wait time : " + this.healthCheckWaitTime + "( < 0 )");
            LOG.warn("set wait time : 10000");
            this.healthCheckWaitTime = 10000L;
        }
        this.thriftServerIdleMonitor = new ThriftServerIdleMonitor();
        this.eventExecutor.scheduleWithFixedDelay(this.thriftServerIdleMonitor, 10L, 10L, TimeUnit.SECONDS);
    }

    public boolean isActiveThriftService(ThriftServiceId thriftServiceId) {
        ThriftServerInstance thriftServerInstance = this.services.get(thriftServiceId);
        return thriftServerInstance != null && thriftServerInstance.getSparkClient().isActive();
    }

    public String getAppId(ThriftServiceId thriftServiceId) {
        return this.services.get(thriftServiceId).getAppId();
    }

    public void addListener(SparkClientEvent sparkClientEvent) {
        this.listeners.add(sparkClientEvent);
    }

    public void removeListener(SparkClientEvent sparkClientEvent) {
        this.listeners.remove(sparkClientEvent);
    }

    private SparkClient createSparkClient(String str, String str2) throws TTransportException {
        try {
            Map<String, String> initiateSparkConf = HiveSparkClientFactory.initiateSparkConf(this.hiveConf);
            initiateSparkConf.put("spark.yarn.queue", str2);
            SparkClientFactory.initialize(initiateSparkConf);
            initiateSparkConf.put("queue.user", str);
            return SparkClientFactory.createClient(initiateSparkConf, this.hiveConf);
        } catch (Throwable th) {
            throw new TTransportException(1, "Could not create a new Spark client: " + th.getMessage(), th);
        }
    }

    public ThriftServiceId createThriftServerInstance(String str, String str2) throws TTransportException {
        SparkClient createSparkClient = createSparkClient(str, str2);
        try {
            String[] split = ((String) createSparkClient.run(new Job<String>() { // from class: org.apache.proxy.service.SparkClientManager.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.proxy.service.client.Job
                public String call(JobContext jobContext) throws Exception {
                    String str3 = SparkEnv$.MODULE$.get().conf().get("spark.thriftServer.appid");
                    String str4 = SparkEnv$.MODULE$.get().conf().get("spark.thriftServer.serverInstanceURI");
                    if (str4 == null || str3 == null || str3.isEmpty() || str4.isEmpty()) {
                        throw new IllegalArgumentException("Could not get appid or uri.");
                    }
                    return str3 + "\n" + str4;
                }
            }).get(60L, TimeUnit.SECONDS)).split("\n");
            String str3 = split[0];
            String[] split2 = split[1].split(":");
            String str4 = split2[0];
            if (str4.equals("0.0.0.0")) {
                str4 = "unknown-ip-addr";
            }
            ThriftServiceId thriftServiceId = new ThriftServiceId(str4, Integer.parseInt(split2[1]));
            ThriftServerInstance thriftServerInstance = new ThriftServerInstance(str, str2, thriftServiceId, createSparkClient, str3);
            startThriftServerMonitor(thriftServerInstance);
            this.services.put(thriftServiceId, thriftServerInstance);
            LOG.debug("Creating a new Spark client: " + thriftServiceId);
            return thriftServiceId;
        } catch (Throwable th) {
            if (createSparkClient != null) {
                createSparkClient.stop();
            }
            throw new TTransportException(1, "Could not create a new Spark client: " + th.getMessage(), th);
        }
    }

    private void startThriftServerMonitor(ThriftServerInstance thriftServerInstance) {
        this.futures.put(thriftServerInstance.getServiceId(), this.eventExecutor.scheduleWithFixedDelay(new ThriftServerMonitor(thriftServerInstance), this.healthCheckInterval, this.healthCheckInterval, TimeUnit.MILLISECONDS));
    }

    private void stopThriftServerMonitor(final ThriftServiceId thriftServiceId) {
        this.eventExecutor.execute(new Runnable() { // from class: org.apache.proxy.service.SparkClientManager.2
            @Override // java.lang.Runnable
            public void run() {
                ScheduledFuture scheduledFuture = (ScheduledFuture) SparkClientManager.this.futures.get(thriftServiceId);
                if (scheduledFuture != null) {
                    scheduledFuture.cancel(false);
                }
            }
        });
    }

    public void stopClient(ThriftServiceId thriftServiceId) {
        ThriftServerInstance remove = this.services.remove(thriftServiceId);
        if (remove != null) {
            LOG.info("Stopping the ThriftService: " + thriftServiceId);
            stopThriftServerMonitor(thriftServiceId);
            Iterator<SparkClientEvent> it = this.listeners.iterator();
            while (it.hasNext()) {
                it.next().close(remove.getUserName(), remove.getQueueName(), remove.getServiceId());
            }
            removeIdleThriftService(remove.getServiceId());
            remove.getSparkClient().stop();
        }
    }

    public void addIdleThriftService(ThriftServiceId thriftServiceId, long j) {
        this.thriftServerIdleMonitor.addIdleThriftService(thriftServiceId, j);
    }

    public void removeIdleThriftService(ThriftServiceId thriftServiceId) {
        this.thriftServerIdleMonitor.removeIdleThriftService(thriftServiceId);
    }
}
