package org.apache.flume.node;

import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.huawei.flume.om.jmx.FlumeJmxServer;
import com.huawei.flume.pluginmanager.PluginManager;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.SinkRunner;
import org.apache.flume.SourceRunner;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.instrumentation.MonitorService;
import org.apache.flume.instrumentation.MonitoringType;
import org.apache.flume.instrumentation.util.JMXPollUtil;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.lifecycle.LifecycleSupervisor;
import org.apache.flume.node.net.AuthorizationProvider;
import org.apache.flume.node.net.BasicAuthorizationProvider;
import org.apache.flume.node.net.UrlConnectionFactory;
import org.apache.flume.sink.AbstractSingleSinkProcessor;
import org.apache.flume.sink.AbstractSinkProcessor;
import org.apache.flume.tools.FlumeMetricsMgr;
import org.apache.flume.tools.FlumeSendAlarmMgr;
import org.apache.flume.util.SSLUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/node/Application.class */
public class Application {
    public static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
    public static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
    private static final String JMX_SERVER_NAME = "jmxrmi";
    private static PluginManager pluginManager;
    private static final int DEFAULT_INTERVAL = 300;
    private static final int DEFAULT_FILE_INTERVAL = 30;
    private final List<LifecycleAware> components;
    private final LifecycleSupervisor supervisor;
    private MaterializedConfiguration materializedConfiguration;
    private MonitorService monitorServer;
    private ScheduledThreadPoolExecutor metricsExecutor;
    private ExecutorService callTimeoutPool;
    private static final int MON_THREAD_COUNT = 1;
    private static final int RESTART_COMP_THD_COUNT = 1;
    private static final int MON_THREAD_DELAY = 30;
    private static final int MON_THREAD_PERIOD = 30;
    private static final int COMP_DEFAULT_CAPACITY = 30;
    private static final int RESTART_COMP_TIMEOUT = 50;
    private static final String UPDATE_TIME = "UpdateTime";
    private static final String MON_TIME = "MonTime";
    private static final String COMP_TYPE_SPLIT = ".";
    private static final String RESTART_COMP_THD_NAME = "compment-operate-timeout";
    private final ReentrantLock lifecycleLock;
    private static final Logger logger = LoggerFactory.getLogger(Application.class);
    private static final String JMX_REGISTRY_PORT = System.getProperty("com.huawei.flume.jmxremote.port", "21153");
    private static final String JMX_CONNECTOR_SERVER_PORT = System.getProperty("com.huawei.flume.jmxserver.port", "21153");
    private static final String JMX_REGISTRY_IP = System.getProperty("com.huawei.flume.jmxrmi.ip", "127.0.0.1");
    private static FlumeMetricsMgr metricMgr = null;
    private static FlumeSendAlarmMgr sam = null;
    private static FlumeJmxServer jmxServer = null;
    static Object lock = new Object();

    public Application() {
        this(new ArrayList(0));
    }

    public Application(List<LifecycleAware> list) {
        this.metricsExecutor = null;
        this.callTimeoutPool = null;
        this.lifecycleLock = new ReentrantLock();
        this.components = list;
        this.supervisor = new LifecycleSupervisor();
    }

    public void start() {
        try {
            try {
                this.lifecycleLock.lock();
                Iterator<LifecycleAware> it = this.components.iterator();
                while (it.hasNext()) {
                    this.supervisor.supervise(it.next(), new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
                }
                if (null == metricMgr) {
                    metricMgr = FlumeMetricsMgr.getInstance();
                    metricMgr.start();
                    logger.info("Start FlumeMetricsMgr.");
                }
                if (null == jmxServer) {
                    jmxServer = new FlumeJmxServer();
                    jmxServer.start(JMX_SERVER_NAME, JMX_CONNECTOR_SERVER_PORT, JMX_REGISTRY_IP, JMX_REGISTRY_PORT);
                }
                startCompMon();
                this.lifecycleLock.unlock();
            } catch (IOException e) {
                logger.error("Error while start the Application, ", e);
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            this.lifecycleLock.unlock();
            throw th;
        }
    }

    @Subscribe
    public void handleConfigurationEvent(MaterializedConfiguration materializedConfiguration) {
        try {
            try {
                this.lifecycleLock.lockInterruptibly();
                stopAllComponents();
                initializeAllComponents(materializedConfiguration);
                startAllComponents(materializedConfiguration);
                if (this.lifecycleLock.isHeldByCurrentThread()) {
                    this.lifecycleLock.unlock();
                }
            } catch (InterruptedException e) {
                logger.info("Interrupted while trying to handle configuration event");
                if (this.lifecycleLock.isHeldByCurrentThread()) {
                    this.lifecycleLock.unlock();
                }
            }
        } catch (Throwable th) {
            if (this.lifecycleLock.isHeldByCurrentThread()) {
                this.lifecycleLock.unlock();
            }
            throw th;
        }
    }

    public void stop() {
        try {
            this.lifecycleLock.lock();
            stopAllComponents();
            stopCompMon();
            if (null != metricMgr) {
                metricMgr.stop();
            }
            if (null != jmxServer) {
                jmxServer.stop();
            }
            if (this.monitorServer != null) {
                this.monitorServer.stop();
            }
            this.supervisor.stop();
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    private void stopAllComponents() {
        if (this.materializedConfiguration != null) {
            logger.info("Shutting down configuration: {}", this.materializedConfiguration);
            for (Map.Entry<String, SourceRunner> entry : this.materializedConfiguration.getSourceRunners().entrySet()) {
                try {
                    logger.info("Stopping Source " + entry.getKey());
                    this.supervisor.unsupervise(entry.getValue());
                } catch (Exception e) {
                    logger.error("Error while stopping {}", entry.getValue(), e);
                }
            }
            for (Map.Entry<String, SinkRunner> entry2 : this.materializedConfiguration.getSinkRunners().entrySet()) {
                try {
                    logger.info("Stopping Sink " + entry2.getKey());
                    this.supervisor.unsupervise(entry2.getValue());
                } catch (Exception e2) {
                    logger.error("Error while stopping {}", entry2.getValue(), e2);
                }
            }
            for (Map.Entry<String, Channel> entry3 : this.materializedConfiguration.getChannels().entrySet()) {
                try {
                    logger.info("Stopping Channel " + entry3.getKey());
                    this.supervisor.unsupervise(entry3.getValue());
                } catch (Exception e3) {
                    logger.error("Error while stopping {}", entry3.getValue(), e3);
                }
            }
        }
    }

    private void initializeAllComponents(MaterializedConfiguration materializedConfiguration) {
        logger.info("Initializing components");
        Iterator<Channel> it = materializedConfiguration.getChannels().values().iterator();
        while (it.hasNext()) {
            Initializable initializable = (Channel) it.next();
            while (initializable.getLifecycleState() != LifecycleState.START && (initializable instanceof Initializable)) {
                initializable.initialize(materializedConfiguration);
            }
        }
        Iterator<SinkRunner> it2 = materializedConfiguration.getSinkRunners().values().iterator();
        while (it2.hasNext()) {
            AbstractSinkProcessor policy = it2.next().getPolicy();
            if (policy instanceof AbstractSingleSinkProcessor) {
                Initializable sink = ((AbstractSingleSinkProcessor) policy).getSink();
                if (sink instanceof Initializable) {
                    sink.initialize(materializedConfiguration);
                }
            } else if (policy instanceof AbstractSinkProcessor) {
                for (Initializable initializable2 : policy.getSinks()) {
                    if (initializable2 instanceof Initializable) {
                        initializable2.initialize(materializedConfiguration);
                    }
                }
            }
        }
        Iterator<SourceRunner> it3 = materializedConfiguration.getSourceRunners().values().iterator();
        while (it3.hasNext()) {
            Initializable source = it3.next().getSource();
            if (source instanceof Initializable) {
                source.initialize(materializedConfiguration);
            }
        }
    }

    private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
        logger.info("Starting new configuration:{}", materializedConfiguration);
        logger.info("Begin to Start all Components -----------");
        synchronized (lock) {
            if (sam == null) {
                sam = FlumeSendAlarmMgr.getInstance();
                sam.start();
            }
        }
        this.materializedConfiguration = materializedConfiguration;
        for (Map.Entry<String, Channel> entry : materializedConfiguration.getChannels().entrySet()) {
            try {
                logger.info("Starting Channel " + entry.getKey());
                this.supervisor.supervise(entry.getValue(), new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            } catch (Exception e) {
                logger.error("Error while starting {}", entry.getValue(), e);
            }
        }
        for (Channel channel : materializedConfiguration.getChannels().values()) {
            while (channel.getLifecycleState() != LifecycleState.START && !this.supervisor.isComponentInErrorState(channel)) {
                try {
                    logger.info("Waiting for channel: " + channel.getName() + " to start. Sleeping for 500 ms");
                    Thread.sleep(500L);
                } catch (InterruptedException e2) {
                    logger.error("Interrupted while waiting for channel to start.", e2);
                    Throwables.propagate(e2);
                }
            }
        }
        for (Map.Entry<String, SinkRunner> entry2 : materializedConfiguration.getSinkRunners().entrySet()) {
            try {
                logger.info("Starting Sink " + entry2.getKey());
                this.supervisor.supervise(entry2.getValue(), new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            } catch (Exception e3) {
                logger.error("Error while starting {}", entry2.getValue(), e3);
            }
        }
        for (Map.Entry<String, SourceRunner> entry3 : materializedConfiguration.getSourceRunners().entrySet()) {
            try {
                logger.info("Starting Source " + entry3.getKey());
                this.supervisor.supervise(entry3.getValue(), new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            } catch (Exception e4) {
                logger.error("Error while starting {}", entry3.getValue(), e4);
            }
        }
        synchronized (lock) {
            if (pluginManager == null) {
                pluginManager = new PluginManager();
                pluginManager.start();
            }
        }
    }

    private void loadMonitoring() {
        Class<?> cls;
        logger.info("------------load monitoring--------");
        Properties properties = System.getProperties();
        Set<String> stringPropertyNames = properties.stringPropertyNames();
        try {
            if (stringPropertyNames.contains(CONF_MONITOR_CLASS)) {
                String property = properties.getProperty(CONF_MONITOR_CLASS);
                try {
                    cls = MonitoringType.valueOf(property.toUpperCase(Locale.ENGLISH)).getMonitorClass();
                } catch (Exception e) {
                    cls = Class.forName(property);
                }
                this.monitorServer = (MonitorService) cls.getConstructor(new Class[0]).newInstance(new Object[0]);
                Context context = new Context();
                for (String str : stringPropertyNames) {
                    if (str.startsWith(CONF_MONITOR_PREFIX)) {
                        context.put(str.substring(CONF_MONITOR_PREFIX.length()), properties.getProperty(str));
                    }
                }
                this.monitorServer.configure(context);
                this.monitorServer.start();
            }
        } catch (ReflectiveOperationException e2) {
            logger.warn("Error starting monitoring. Monitoring might not be available.", e2);
        }
    }

    public static void main(String[] strArr) {
        Application application;
        Properties loadConfigOpts = loadConfigOpts();
        try {
            SSLUtil.initGlobalSSLParameters();
            Options options = new Options();
            Option option = new Option("n", "name", true, "the name of this agent");
            option.setRequired(true);
            options.addOption(option);
            Option option2 = new Option("f", "conf-file", true, "specify a config file (required if -c, -u, and -z are missing)");
            option2.setRequired(false);
            options.addOption(option2);
            Option option3 = new Option("u", "conf-uri", true, "specify a config uri (required if -c, -f and -z are missing)");
            option3.setRequired(false);
            options.addOption(option3);
            Option option4 = new Option("a", "auth-provider", true, "specify an authorization provider class");
            option4.setRequired(false);
            options.addOption(option4);
            Option option5 = new Option("prov", "conf-provider", true, "specify a configuration provider class (required if -f, -u, and -z are missing)");
            option5.setRequired(false);
            options.addOption(option5);
            Option option6 = new Option("user", "conf-user", true, "user name to access configuration uri");
            option6.setRequired(false);
            options.addOption(option6);
            Option option7 = new Option("pwd", "conf-password", true, "password to access configuration uri");
            option7.setRequired(false);
            options.addOption(option7);
            Option option8 = new Option("i", "poll-interval", true, "number of seconds between checks for a configuration change");
            option8.setRequired(false);
            options.addOption(option8);
            Option option9 = new Option("b", "backup-directory", true, "directory in which to store the backup configuration file");
            option9.setRequired(false);
            options.addOption(option9);
            options.addOption(new Option((String) null, "no-reload-conf", false, "do not reload config file if changed"));
            Option option10 = new Option("z", "zkConnString", true, "specify the ZooKeeper connection to use (required if -c, -f, and -u are missing)");
            option10.setRequired(false);
            options.addOption(option10);
            Option option11 = new Option("p", "zkBasePath", true, "specify the base path in ZooKeeper for agent configs");
            option11.setRequired(false);
            options.addOption(option11);
            options.addOption(new Option("h", "help", false, "display help text"));
            CommandLine parse = new DefaultParser().parse(options, strArr, loadConfigOpts);
            if (parse.hasOption('h')) {
                new HelpFormatter().printHelp("flume-ng agent", options, true);
                return;
            }
            String optionValue = parse.getOptionValue('n');
            boolean z = !parse.hasOption("no-reload-conf");
            boolean z2 = parse.hasOption('z') || parse.hasOption("zkConnString");
            logger.info("--------isZkConfigured is { }--- ", Boolean.valueOf(z2));
            ArrayList arrayList = null;
            ConfigurationProvider configurationProvider = null;
            int i = 30;
            if (parse.hasOption('u') || parse.hasOption("conf-uri")) {
                arrayList = new ArrayList();
                for (String str : parse.getOptionValues("conf-uri")) {
                    if (str.toLowerCase(Locale.ROOT).startsWith(UrlConnectionFactory.HTTP)) {
                        i = DEFAULT_INTERVAL;
                    }
                    arrayList.add(new URI(str));
                }
            } else if (parse.hasOption("f") || parse.hasOption("conf-file")) {
                arrayList = new ArrayList();
                for (String str2 : parse.getOptionValues("conf-file")) {
                    arrayList.add(new File(str2).toURI());
                }
            }
            if (parse.hasOption("prov") || parse.hasOption("conf-provider")) {
                String optionValue2 = parse.getOptionValue("conf-provider");
                try {
                    configurationProvider = (ConfigurationProvider) Application.class.getClassLoader().loadClass(optionValue2).getConstructor(String[].class).newInstance(strArr);
                } catch (ReflectiveOperationException e) {
                    logger.error("Error creating ConfigurationProvider {}", optionValue2, e);
                }
            }
            if (configurationProvider != null) {
                application = new Application(Lists.newArrayList());
                application.handleConfigurationEvent(configurationProvider.getConfiguration());
            } else if (z2) {
                String optionValue3 = parse.getOptionValue('z');
                String optionValue4 = parse.getOptionValue('p');
                if (z) {
                    EventBus eventBus = new EventBus(optionValue + "-event-bus");
                    ArrayList newArrayList = Lists.newArrayList();
                    newArrayList.add(new PollingZooKeeperConfigurationProvider(optionValue, optionValue3, optionValue4, eventBus));
                    application = new Application(newArrayList);
                    eventBus.register(application);
                } else {
                    StaticZooKeeperConfigurationProvider staticZooKeeperConfigurationProvider = new StaticZooKeeperConfigurationProvider(optionValue, optionValue3, optionValue4);
                    application = new Application();
                    application.handleConfigurationEvent(staticZooKeeperConfigurationProvider.getConfiguration());
                }
            } else {
                if (arrayList == null) {
                    throw new ParseException("No configuiration was provided");
                }
                String optionValue5 = parse.getOptionValue("conf-user");
                String optionValue6 = parse.getOptionValue("conf-password");
                String optionValue7 = parse.getOptionValue("poll-interval");
                String optionValue8 = parse.getOptionValue("backup-directory");
                int parseInt = StringUtils.isNotEmpty(optionValue7) ? Integer.parseInt(optionValue7) : 0;
                boolean parseBoolean = Boolean.parseBoolean(parse.getOptionValue("verify-host", "true"));
                AuthorizationProvider authorizationProvider = null;
                String optionValue9 = parse.getOptionValue("auth-provider");
                if (optionValue9 != null) {
                    try {
                        Object newInstance = Class.forName(optionValue9).getDeclaredConstructor(String[].class).newInstance(strArr);
                        if (!(newInstance instanceof AuthorizationProvider)) {
                            logger.error("The supplied authorization provider does not implement AuthorizationProvider");
                            return;
                        }
                        authorizationProvider = (AuthorizationProvider) newInstance;
                    } catch (ReflectiveOperationException e2) {
                        logger.error("Unable to create authorization provider: {}", e2.getMessage());
                        return;
                    }
                }
                if (authorizationProvider == null && StringUtils.isNotEmpty(optionValue5) && StringUtils.isNotEmpty(optionValue6)) {
                    authorizationProvider = new BasicAuthorizationProvider(optionValue5, optionValue6);
                }
                EventBus eventBus2 = null;
                if (z) {
                    eventBus2 = new EventBus(optionValue + "-event-bus");
                    if (parseInt == 0) {
                        parseInt = i;
                    }
                }
                ArrayList arrayList2 = new ArrayList();
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    ConfigurationSource configurationSource = ConfigurationSourceFactory.getConfigurationSource((URI) it.next(), authorizationProvider, parseBoolean);
                    if (configurationSource != null) {
                        arrayList2.add(configurationSource);
                    }
                }
                ArrayList newArrayList2 = Lists.newArrayList();
                UriConfigurationProvider uriConfigurationProvider = new UriConfigurationProvider(optionValue, arrayList2, optionValue8, eventBus2, parseInt);
                newArrayList2.add(uriConfigurationProvider);
                application = new Application(newArrayList2);
                if (eventBus2 != null) {
                    eventBus2.register(application);
                }
                application.handleConfigurationEvent(uriConfigurationProvider.getConfiguration());
            }
            application.start();
            final Application application2 = application;
            Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") { // from class: org.apache.flume.node.Application.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    synchronized (Application.lock) {
                        Application.logger.warn("Received stop agent singal,stopping the agent");
                        application2.stop();
                        if (Application.sam != null) {
                            Application.sam.stop();
                        }
                        Application.logger.warn("stopped the agent");
                    }
                }
            });
        } catch (ParseException | RuntimeException | URISyntaxException e3) {
            logger.error("A fatal error occurred while running. Exception follows.", e3);
        }
    }

    private static Properties loadConfigOpts() {
        Properties properties = new Properties();
        InputStream inputStream = null;
        try {
            inputStream = Files.newInputStream(Paths.get("/etc/flume/flume.opts", new String[0]), new OpenOption[0]);
        } catch (IOException e) {
        }
        if (inputStream == null) {
            inputStream = Application.class.getClassLoader().getResourceAsStream("flume.opts");
        }
        if (inputStream != null) {
            try {
                try {
                    properties.load(inputStream);
                    try {
                        inputStream.close();
                    } catch (IOException e2) {
                    }
                } catch (Exception e3) {
                    logger.warn("Unable to load options file due to: {}", e3.getMessage());
                    try {
                        inputStream.close();
                    } catch (IOException e4) {
                    }
                }
            } catch (Throwable th) {
                try {
                    inputStream.close();
                } catch (IOException e5) {
                }
                throw th;
            }
        }
        return properties;
    }

    private void startCompMon() {
        logger.info("starting compment mon");
        loadMonitoring();
        this.metricsExecutor = new ScheduledThreadPoolExecutor(1);
        this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat(RESTART_COMP_THD_NAME).build());
        this.metricsExecutor.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.flume.node.Application.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Map<String, List<String>> restartComps = Application.this.getRestartComps();
                    if (restartComps != null && restartComps.size() > 0) {
                        Application.this.restartCompments(restartComps);
                    }
                } catch (Throwable th) {
                    Application.logger.error("get restart comps exception:", th);
                }
                try {
                    if (Application.pluginManager != null) {
                        Application.pluginManager.checkPluginsStatus();
                    }
                } catch (Throwable th2) {
                    Application.logger.error("check plugin state exception:", th2);
                }
            }
        }, 30L, 30L, TimeUnit.SECONDS);
        logger.info("started compment mon success");
    }

    /* JADX WARN: Type inference failed for: r0v6, types: [org.apache.flume.node.Application$3] */
    private Map<String, List<String>> getRestartComps() {
        int indexOf;
        Map allMBeans = JMXPollUtil.getAllMBeans();
        if (allMBeans == null || allMBeans.isEmpty()) {
            return null;
        }
        String json = new Gson().toJson(allMBeans, new TypeToken<Map<String, Map<String, String>>>() { // from class: org.apache.flume.node.Application.3
        }.getType());
        if (null != json && !"".equals(json)) {
            logger.info("flume current metrics:" + json);
        }
        allMBeans.keySet();
        HashMap hashMap = new HashMap(30);
        long nanoTime = System.nanoTime();
        StringBuffer stringBuffer = new StringBuffer();
        for (Map.Entry entry : allMBeans.entrySet()) {
            String str = (String) entry.getKey();
            Map map = (Map) entry.getValue();
            String str2 = (String) map.get(UPDATE_TIME);
            String str3 = (String) map.get(MON_TIME);
            if (str2 != null && str3 != null) {
                long longValue = Long.valueOf(str2).longValue();
                int intValue = Integer.valueOf(str3).intValue();
                long j = (nanoTime - longValue) / 1000000;
                if (0 < longValue && 0 < intValue && j > intValue && (indexOf = str.indexOf(COMP_TYPE_SPLIT)) > 0) {
                    String substring = str.substring(0, indexOf);
                    String substring2 = str.substring(indexOf + 1);
                    if (substring2 != null && !"".equals(substring2.trim())) {
                        List list = (List) hashMap.get(substring);
                        if (list == null) {
                            list = new ArrayList();
                        }
                        list.add(substring2);
                        hashMap.put(substring, list);
                        stringBuffer.setLength(0);
                        stringBuffer.append("compment is not running for a long time");
                        stringBuffer.append("compType:").append(substring).append(",compName:").append(substring2).append(",currentTime:").append(nanoTime).append(",updateTime:").append(longValue).append(",monTime:").append(intValue).append(",timeDiff:").append(j);
                        logger.info(stringBuffer.toString());
                    }
                }
            }
        }
        return hashMap;
    }

    private synchronized void restartCompments(Map<String, List<String>> map) {
        List<String> list = map.get(ComponentConfiguration.ComponentType.SOURCE.toString().toUpperCase(Locale.ROOT));
        if (null != list && list.size() > 0) {
            for (String str : list) {
                for (Map.Entry<String, SourceRunner> entry : this.materializedConfiguration.getSourceRunners().entrySet()) {
                    if (str.equals(entry.getKey())) {
                        logger.info("restarting source " + entry.getKey());
                        restartCompment(this.supervisor, (LifecycleAware) entry.getValue());
                    }
                }
            }
        }
        List<String> list2 = map.get(ComponentConfiguration.ComponentType.SINK.toString().toUpperCase(Locale.ROOT));
        if (null == list2 || list2.size() <= 0) {
            return;
        }
        for (String str2 : list2) {
            for (Map.Entry<String, SinkRunner> entry2 : this.materializedConfiguration.getSinkRunners().entrySet()) {
                if (str2.equals(entry2.getKey())) {
                    logger.info("restarting sink " + entry2.getKey());
                    restartCompment(this.supervisor, (LifecycleAware) entry2.getValue());
                }
            }
        }
    }

    private void restartCompment(final LifecycleSupervisor lifecycleSupervisor, final LifecycleAware lifecycleAware) {
        try {
            Future submit = this.callTimeoutPool.submit(new Callable<Void>() { // from class: org.apache.flume.node.Application.4
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    try {
                        lifecycleSupervisor.unsupervise(lifecycleAware);
                        try {
                            lifecycleSupervisor.supervise(lifecycleAware, new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
                            return null;
                        } catch (Exception e) {
                            Application.logger.error("Error while starting {}", lifecycleAware, e);
                            return null;
                        }
                    } catch (Exception e2) {
                        Application.logger.error("Error while stopping {}", lifecycleAware, e2);
                        return null;
                    }
                }
            });
            try {
                try {
                    submit.get(50L, TimeUnit.SECONDS);
                    if (!submit.isDone()) {
                        submit.cancel(true);
                    }
                } catch (Exception e) {
                    logger.error("get the result form future error. {}", e);
                    if (!submit.isDone()) {
                        submit.cancel(true);
                    }
                }
            } catch (Throwable th) {
                if (!submit.isDone()) {
                    submit.cancel(true);
                }
                throw th;
            }
        } catch (Exception e2) {
            logger.error("exectutor occur error:", e2);
        }
    }

    private void stopCompMon() {
        logger.info("stopping compment mon");
        for (ExecutorService executorService : new ExecutorService[]{this.callTimeoutPool, this.metricsExecutor}) {
            if (null != executorService) {
                try {
                    executorService.shutdownNow();
                    while (!executorService.isTerminated()) {
                        executorService.awaitTermination(30L, TimeUnit.SECONDS);
                    }
                } catch (Throwable th) {
                    logger.error("stop compment mon occur exception:", th);
                }
            }
        }
        logger.info("stopped compment mon success");
    }
}
