package com.huawei.hetu.sqlengine.highavailability;

import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import com.huawei.hetu.highavailability.CoordinatorDescriptor;
import com.huawei.hetu.highavailability.CoordinatorRegistry;
import com.huawei.hetu.highavailability.ElectionListener;
import com.huawei.hetu.highavailability.HighAvailability;
import com.huawei.hetu.highavailability.HighAvailabilityConfig;
import com.huawei.hetu.highavailability.HighAvailabilityUtils;
import com.huawei.hetu.highavailability.LeaderElection;
import com.huawei.hetu.highavailability.ServiceRegistry;
import com.huawei.hetu.highavailability.TheListener;
import io.airlift.concurrent.Threads;
import io.airlift.discovery.client.ServiceDescriptor;
import io.airlift.discovery.client.ServiceState;
import io.airlift.http.server.HttpServerConfig;
import io.airlift.http.server.HttpServerInfo;
import io.airlift.log.Logger;
import io.airlift.node.NodeInfo;
import io.airlift.units.Duration;
import io.prestosql.server.InternalCommunicationConfig;
import java.net.URI;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;

/* loaded from: input_file:com/huawei/hetu/sqlengine/highavailability/HighAvailabilityService.class */
public class HighAvailabilityService {
    private static Logger log = Logger.get(HighAvailabilityService.class);
    private Duration updateInterval;
    private ScheduledFuture<?> scheduledFuture;
    private HighAvailability highAvailability;
    private LeaderElection leaderElection;
    private CoordinatorRegistry coordinatorRegistry;
    private ServiceRegistry serviceRegistry;
    private NodeInfo nodeInfo;
    private URI coordinatorUri;
    private URI serviceUri;
    private boolean isHighAvailabilityEnabled;
    private boolean preIsLeadership;
    private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(Threads.daemonThreadsNamed("high-availability-%s"));
    private final String serviceType = "discovery";
    private Set<ElectionListener> listeners = ImmutableSet.of();

    @Inject
    public HighAvailabilityService(HighAvailabilityConfig highAvailabilityConfig, HttpServerConfig httpServerConfig, HttpServerInfo httpServerInfo, InternalCommunicationConfig internalCommunicationConfig, NodeInfo nodeInfo) {
        Objects.requireNonNull(highAvailabilityConfig, "highAvailabilityConfig is null");
        Objects.requireNonNull(httpServerConfig, "httpServerConfig is null");
        Objects.requireNonNull(httpServerInfo, "httpServerInfo is null");
        Objects.requireNonNull(internalCommunicationConfig, "internalCommunicationConfig is null");
        Objects.requireNonNull(nodeInfo, "nodeInfo is null");
        if (!HighAvailabilityUtils.isHighAvailabilityEnabled(highAvailabilityConfig)) {
            log.info("The high availability is not enabled.");
            return;
        }
        this.isHighAvailabilityEnabled = true;
        this.nodeInfo = nodeInfo;
        if (httpServerConfig.isHttpsEnabled()) {
            this.coordinatorUri = httpServerInfo.getHttpsUri();
            if (internalCommunicationConfig.isHttpsRequired()) {
                this.serviceUri = httpServerInfo.getHttpsUri();
            } else {
                this.serviceUri = httpServerInfo.getHttpUri();
            }
        } else if (httpServerConfig.isHttpEnabled()) {
            this.coordinatorUri = httpServerInfo.getHttpUri();
            this.serviceUri = httpServerInfo.getHttpUri();
        } else {
            log.error("Http server disable the http and https, so the URIs of coordinator and discovery server are invalid.");
        }
        try {
            this.highAvailability = HighAvailabilityUtils.createHighAvailability(highAvailabilityConfig);
            this.leaderElection = this.highAvailability.getLeaderElection();
            this.coordinatorRegistry = this.highAvailability.getCoordinatorRegistry();
            this.serviceRegistry = this.highAvailability.getServiceRegistry();
        } catch (Exception e) {
            e.printStackTrace();
        }
        this.updateInterval = highAvailabilityConfig.getZooKeeperUpdateInterval();
        log.info("Construct high availability handler finished.");
    }

    @Inject(optional = true)
    public void setListeners(@TheListener Set<ElectionListener> set) {
        this.listeners = ImmutableSet.copyOf(set);
    }

    public boolean isHighAvailabilityEnabled() {
        return this.isHighAvailabilityEnabled;
    }

    public ServiceRegistry getServiceRegistry() {
        return this.serviceRegistry;
    }

    private void registerToHighAvailability(boolean z) throws Exception {
        this.coordinatorRegistry.registerCoordinator(this.nodeInfo.getNodeId(), new CoordinatorDescriptor(this.nodeInfo.getNodeId(), this.coordinatorUri, z));
        this.serviceRegistry.registerService(this.nodeInfo.getNodeId(), ServiceDescriptor.serviceDescriptor("discovery").setId(UUID.randomUUID()).setNodeInfo(this.nodeInfo).setLocation(this.nodeInfo.getLocation()).setState(ServiceState.RUNNING).addProperty(this.serviceUri.getScheme(), this.serviceUri.toString().toLowerCase(Locale.ENGLISH)).addProperty("isLeader", String.valueOf(z)).build());
    }

    private void unregisterFromHighAvailability() throws Exception {
        this.serviceRegistry.unregisterService(this.nodeInfo.getNodeId());
        this.coordinatorRegistry.unregisterCoordinator(this.nodeInfo.getNodeId());
    }

    public void start() throws Exception {
        log.info("Start high availability handler.");
        if (!this.isHighAvailabilityEnabled) {
            log.info("The high availability is not enabled, no need to start.");
        } else {
            if (this.scheduledFuture != null) {
                return;
            }
            registerToHighAvailability(false);
            this.leaderElection.start();
            this.scheduledFuture = this.executorService.scheduleAtFixedRate(() -> {
                if (isLeadership() != this.preIsLeadership) {
                    try {
                        this.listeners.stream().forEach(electionListener -> {
                            electionListener.roleChange(isLeadership());
                        });
                        registerToHighAvailability(isLeadership());
                        this.preIsLeadership = isLeadership();
                    } catch (Throwable th) {
                        log.error(th, "Unexpected exception from leadership update");
                    }
                }
            }, this.updateInterval.toMillis(), this.updateInterval.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    @PreDestroy
    public void stop() throws Exception {
        releaseLeader();
        this.highAvailability.close();
        log.info("Stop high availability handler.");
    }

    public boolean isLeadership() {
        if (this.leaderElection != null) {
            return this.leaderElection.isLeader();
        }
        return false;
    }

    public void releaseLeader() throws Exception {
        if (this.highAvailability == null) {
            return;
        }
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(true);
            this.scheduledFuture = null;
        }
        if (this.leaderElection != null) {
            this.leaderElection.stop();
            this.leaderElection = null;
        }
        unregisterFromHighAvailability();
        log.info("This Node release the leader.");
    }
}
