package org.apache.hadoop.hdfs.server.federation.router;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.class */
public class MountTableRefresherService extends AbstractService {
    private static final String ROUTER_CONNECT_ERROR_MSG = "Router {} connection failed. Mount table cache will not refresh.";
    private static final Logger LOG = LoggerFactory.getLogger(MountTableRefresherService.class);
    private final Router router;
    private MountTableStore mountTableStore;
    private String localAdminAddress;
    private long cacheUpdateTimeout;
    private LoadingCache<String, RouterClient> routerClientsCache;
    private ScheduledExecutorService clientCacheCleanerScheduler;

    public MountTableRefresherService(Router router) {
        super(MountTableRefresherService.class.getSimpleName());
        this.router = router;
    }

    protected void serviceInit(Configuration configuration) throws Exception {
        super.serviceInit(configuration);
        this.mountTableStore = getMountTableStore();
        this.mountTableStore.setRefreshService(this);
        this.localAdminAddress = StateStoreUtils.getHostPortString(this.router.getAdminServerAddress());
        this.cacheUpdateTimeout = configuration.getTimeDuration(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT, RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
        long timeDuration = configuration.getTimeDuration(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME, RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME_DEFAULT, TimeUnit.MILLISECONDS);
        this.routerClientsCache = CacheBuilder.newBuilder().expireAfterWrite(timeDuration, TimeUnit.MILLISECONDS).removalListener(getClientRemover()).build(getClientCreator());
        initClientCacheCleaner(timeDuration);
    }

    private void initClientCacheCleaner(long j) {
        this.clientCacheCleanerScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("MountTableRefresh_ClientsCacheCleaner").setDaemon(true).build());
        this.clientCacheCleanerScheduler.scheduleWithFixedDelay(() -> {
            this.routerClientsCache.cleanUp();
        }, j, j, TimeUnit.MILLISECONDS);
    }

    private RemovalListener<String, RouterClient> getClientRemover() {
        return new RemovalListener<String, RouterClient>() { // from class: org.apache.hadoop.hdfs.server.federation.router.MountTableRefresherService.1
            public void onRemoval(RemovalNotification<String, RouterClient> removalNotification) {
                MountTableRefresherService.this.closeRouterClient((RouterClient) removalNotification.getValue());
            }
        };
    }

    @VisibleForTesting
    protected void closeRouterClient(RouterClient routerClient) {
        try {
            routerClient.close();
        } catch (IOException e) {
            LOG.error("Error while closing RouterClient", e);
        }
    }

    private CacheLoader<String, RouterClient> getClientCreator() {
        return new CacheLoader<String, RouterClient>() { // from class: org.apache.hadoop.hdfs.server.federation.router.MountTableRefresherService.2
            public RouterClient load(String str) throws IOException {
                return MountTableRefresherService.this.createRouterClient(NetUtils.createSocketAddr(str), MountTableRefresherService.this.getConfig());
            }
        };
    }

    @VisibleForTesting
    protected RouterClient createRouterClient(InetSocketAddress inetSocketAddress, Configuration configuration) throws IOException {
        return (RouterClient) SecurityUtil.doAsLoginUser(() -> {
            if (UserGroupInformation.isSecurityEnabled()) {
                UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
            }
            return new RouterClient(inetSocketAddress, configuration);
        });
    }

    protected void serviceStart() throws Exception {
        super.serviceStart();
    }

    protected void serviceStop() throws Exception {
        super.serviceStop();
        this.clientCacheCleanerScheduler.shutdown();
        this.routerClientsCache.invalidateAll();
    }

    private MountTableStore getMountTableStore() throws IOException {
        MountTableStore mountTableStore = (MountTableStore) this.router.getStateStore().getRegisteredRecordStore(MountTableStore.class);
        if (mountTableStore == null) {
            throw new IOException("Mount table state store is not available.");
        }
        return mountTableStore;
    }

    public void refresh() throws StateStoreUnavailableException {
        RouterStore routerStateManager = this.router.getRouterStateManager();
        try {
            routerStateManager.loadCache(true);
        } catch (IOException e) {
            LOG.warn("RouterStore load cache failed,", e);
        }
        List<RouterState> cachedRecords = routerStateManager.getCachedRecords();
        ArrayList arrayList = new ArrayList();
        for (RouterState routerState : cachedRecords) {
            String adminAddress = routerState.getAdminAddress();
            if (adminAddress != null && adminAddress.length() != 0) {
                if (routerState.getStatus() != RouterServiceState.RUNNING) {
                    LOG.info("Router {} is not running. Mount table cache will not refresh.", routerState.getAddress());
                    removeFromCache(adminAddress);
                } else if (isLocalAdmin(adminAddress)) {
                    arrayList.add(getLocalRefresher(adminAddress));
                } else {
                    try {
                        arrayList.add(new MountTableRefresherThread(((RouterClient) this.routerClientsCache.get(adminAddress)).getMountTableManager(), adminAddress));
                    } catch (ExecutionException e2) {
                        LOG.warn(ROUTER_CONNECT_ERROR_MSG, adminAddress, e2);
                    }
                }
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        invokeRefresh(arrayList);
    }

    @VisibleForTesting
    protected MountTableRefresherThread getLocalRefresher(String str) {
        return new MountTableRefresherThread(this.router.getAdminServer(), str);
    }

    private void removeFromCache(String str) {
        this.routerClientsCache.invalidate(str);
    }

    private void invokeRefresh(List<MountTableRefresherThread> list) {
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        for (MountTableRefresherThread mountTableRefresherThread : list) {
            mountTableRefresherThread.setCountDownLatch(countDownLatch);
            mountTableRefresherThread.start();
        }
        try {
            if (!countDownLatch.await(this.cacheUpdateTimeout, TimeUnit.MILLISECONDS)) {
                LOG.warn("Not all router admins updated their cache");
            }
        } catch (InterruptedException e) {
            LOG.error("Mount table cache refresher was interrupted.", e);
        }
        logResult(list);
    }

    private boolean isLocalAdmin(String str) {
        return str.contentEquals(this.localAdminAddress);
    }

    private void logResult(List<MountTableRefresherThread> list) {
        int i = 0;
        int i2 = 0;
        for (MountTableRefresherThread mountTableRefresherThread : list) {
            if (mountTableRefresherThread.isSuccess()) {
                i++;
            } else {
                i2++;
                removeFromCache(mountTableRefresherThread.getAdminAddress());
            }
        }
        LOG.info("Mount table entries cache refresh succesCount={},failureCount={}", Integer.valueOf(i), Integer.valueOf(i2));
    }
}
