package org.apache.flume.source;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.SystemClock;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark-project.guava.base.Preconditions;
import org.spark-project.guava.util.concurrent.ThreadFactoryBuilder;

/* loaded from: input_file:org/apache/flume/source/ExecSource.class */
public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ExecSource.class);
    private String shell;
    private String command;
    private SourceCounter sourceCounter;
    private ExecutorService executor;
    private Future<?> runnerFuture;
    private long restartThrottle;
    private boolean restart;
    private boolean logStderr;
    private Integer bufferCount;
    private long batchTimeout;
    private ExecRunnable runner;
    private Charset charset;

    /* loaded from: input_file:org/apache/flume/source/ExecSource$ExecRunnable.class */
    private static class ExecRunnable implements Runnable {
        private final String shell;
        private final String command;
        private final ChannelProcessor channelProcessor;
        private final SourceCounter sourceCounter;
        private volatile boolean restart;
        private final long restartThrottle;
        private final int bufferCount;
        private long batchTimeout;
        private final boolean logStderr;
        private final Charset charset;
        private Process process = null;
        private SystemClock systemClock = new SystemClock();
        private Long lastPushToChannel = Long.valueOf(this.systemClock.currentTimeMillis());
        ScheduledExecutorService timedFlushService;
        ScheduledFuture<?> future;

        public ExecRunnable(String str, String str2, ChannelProcessor channelProcessor, SourceCounter sourceCounter, boolean z, long j, boolean z2, int i, long j2, Charset charset) {
            this.command = str2;
            this.channelProcessor = channelProcessor;
            this.sourceCounter = sourceCounter;
            this.restartThrottle = j;
            this.bufferCount = i;
            this.batchTimeout = j2;
            this.restart = z;
            this.logStderr = z2;
            this.charset = charset;
            this.shell = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            String valueOf;
            do {
                BufferedReader bufferedReader = null;
                final ArrayList arrayList = new ArrayList();
                this.timedFlushService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("timedFlushExecService" + Thread.currentThread().getId() + "-%d").build());
                try {
                    try {
                        if (this.shell != null) {
                            this.process = Runtime.getRuntime().exec(formulateShellCommand(this.shell, this.command));
                        } else {
                            this.process = new ProcessBuilder(this.command.split("\\s+")).start();
                        }
                        BufferedReader bufferedReader2 = new BufferedReader(new InputStreamReader(this.process.getInputStream(), this.charset));
                        StderrReader stderrReader = new StderrReader(new BufferedReader(new InputStreamReader(this.process.getErrorStream(), this.charset)), this.logStderr);
                        stderrReader.setName("StderrReader-[" + this.command + "]");
                        stderrReader.setDaemon(true);
                        stderrReader.start();
                        this.future = this.timedFlushService.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.flume.source.ExecSource.ExecRunnable.1
                            @Override // java.lang.Runnable
                            public void run() {
                                try {
                                    synchronized (arrayList) {
                                        if (!arrayList.isEmpty() && ExecRunnable.this.timeout()) {
                                            ExecRunnable.this.flushEventBatch(arrayList);
                                        }
                                    }
                                } catch (Exception e) {
                                    ExecSource.logger.error("Exception occurred when processing event batch", (Throwable) e);
                                    if (e instanceof InterruptedException) {
                                        Thread.currentThread().interrupt();
                                    }
                                }
                            }
                        }, this.batchTimeout, this.batchTimeout, TimeUnit.MILLISECONDS);
                        while (true) {
                            String readLine = bufferedReader2.readLine();
                            if (readLine == null) {
                                break;
                            }
                            this.sourceCounter.incrementEventReceivedCount();
                            synchronized (arrayList) {
                                arrayList.add(EventBuilder.withBody(readLine.getBytes(this.charset)));
                                if (arrayList.size() >= this.bufferCount || timeout()) {
                                    flushEventBatch(arrayList);
                                }
                            }
                        }
                        synchronized (arrayList) {
                            if (!arrayList.isEmpty()) {
                                flushEventBatch(arrayList);
                            }
                        }
                        if (bufferedReader2 != null) {
                            try {
                                bufferedReader2.close();
                            } catch (IOException e) {
                                ExecSource.logger.error("Failed to close reader for exec source", (Throwable) e);
                            }
                        }
                        valueOf = String.valueOf(kill());
                    } catch (Throwable th) {
                        if (0 != 0) {
                            try {
                                bufferedReader.close();
                            } catch (IOException e2) {
                                ExecSource.logger.error("Failed to close reader for exec source", (Throwable) e2);
                            }
                        }
                        String.valueOf(kill());
                        throw th;
                    }
                } catch (Exception e3) {
                    ExecSource.logger.error("Failed while running command: " + this.command, (Throwable) e3);
                    if (e3 instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    if (0 != 0) {
                        try {
                            bufferedReader.close();
                        } catch (IOException e4) {
                            ExecSource.logger.error("Failed to close reader for exec source", (Throwable) e4);
                        }
                    }
                    valueOf = String.valueOf(kill());
                }
                if (this.restart) {
                    ExecSource.logger.info("Restarting in {}ms, exit code {}", Long.valueOf(this.restartThrottle), valueOf);
                    try {
                        Thread.sleep(this.restartThrottle);
                    } catch (InterruptedException e5) {
                        Thread.currentThread().interrupt();
                    }
                } else {
                    ExecSource.logger.info("Command [" + this.command + "] exited with " + valueOf);
                }
            } while (this.restart);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void flushEventBatch(List<Event> list) {
            this.channelProcessor.processEventBatch(list);
            this.sourceCounter.addToEventAcceptedCount(list.size());
            list.clear();
            this.lastPushToChannel = Long.valueOf(this.systemClock.currentTimeMillis());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean timeout() {
            return this.systemClock.currentTimeMillis() - this.lastPushToChannel.longValue() >= this.batchTimeout;
        }

        private static String[] formulateShellCommand(String str, String str2) {
            String[] split = str.split("\\s+");
            String[] strArr = new String[split.length + 1];
            System.arraycopy(split, 0, strArr, 0, split.length);
            strArr[split.length] = str2;
            return strArr;
        }

        public int kill() {
            int waitFor;
            if (this.process == null) {
                return -1073741824;
            }
            synchronized (this.process) {
                this.process.destroy();
                try {
                    waitFor = this.process.waitFor();
                    if (this.future != null) {
                        this.future.cancel(true);
                    }
                    if (this.timedFlushService != null) {
                        this.timedFlushService.shutdown();
                        while (!this.timedFlushService.isTerminated()) {
                            try {
                                this.timedFlushService.awaitTermination(500L, TimeUnit.MILLISECONDS);
                            } catch (InterruptedException e) {
                                ExecSource.logger.debug("Interrupted while waiting for exec executor service to stop. Just exiting.");
                                Thread.currentThread().interrupt();
                            }
                        }
                    }
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    return Integer.MIN_VALUE;
                }
            }
            return waitFor;
        }

        public void setRestart(boolean z) {
            this.restart = z;
        }
    }

    /* loaded from: input_file:org/apache/flume/source/ExecSource$StderrReader.class */
    private static class StderrReader extends Thread {
        private BufferedReader input;
        private boolean logStderr;

        protected StderrReader(BufferedReader bufferedReader, boolean z) {
            this.input = bufferedReader;
            this.logStderr = z;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                int i = 0;
                while (true) {
                    try {
                        String readLine = this.input.readLine();
                        if (readLine == null) {
                            break;
                        } else if (this.logStderr) {
                            i++;
                            ExecSource.logger.info("StderrLogger[{}] = '{}'", Integer.valueOf(i), readLine);
                        }
                    } catch (IOException e) {
                        ExecSource.logger.info("StderrLogger exiting", (Throwable) e);
                        try {
                            if (this.input != null) {
                                this.input.close();
                            }
                            return;
                        } catch (IOException e2) {
                            ExecSource.logger.error("Failed to close stderr reader for exec source", (Throwable) e2);
                            return;
                        }
                    }
                }
                try {
                    if (this.input != null) {
                        this.input.close();
                    }
                } catch (IOException e3) {
                    ExecSource.logger.error("Failed to close stderr reader for exec source", (Throwable) e3);
                }
            } catch (Throwable th) {
                try {
                    if (this.input != null) {
                        this.input.close();
                    }
                } catch (IOException e4) {
                    ExecSource.logger.error("Failed to close stderr reader for exec source", (Throwable) e4);
                }
                throw th;
            }
        }
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void start() {
        logger.info("Exec source starting with command: {}", this.command);
        this.sourceCounter.start();
        this.executor = Executors.newSingleThreadExecutor();
        this.runner = new ExecRunnable(this.shell, this.command, getChannelProcessor(), this.sourceCounter, this.restart, this.restartThrottle, this.logStderr, this.bufferCount.intValue(), this.batchTimeout, this.charset);
        this.runnerFuture = this.executor.submit(this.runner);
        super.start();
        logger.debug("Exec source started");
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void stop() {
        logger.info("Stopping exec source with command: {}", this.command);
        if (this.runner != null) {
            this.runner.setRestart(false);
            this.runner.kill();
        }
        if (this.runnerFuture != null) {
            logger.debug("Stopping exec runner");
            this.runnerFuture.cancel(true);
            logger.debug("Exec runner stopped");
        }
        this.executor.shutdown();
        while (!this.executor.isTerminated()) {
            logger.debug("Waiting for exec executor service to stop");
            try {
                this.executor.awaitTermination(500L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                logger.debug("Interrupted while waiting for exec executor service to stop. Just exiting.");
                Thread.currentThread().interrupt();
            }
        }
        this.sourceCounter.stop();
        super.stop();
        logger.debug("Exec source with command:{} stopped. Metrics:{}", this.command, this.sourceCounter);
    }

    @Override // org.apache.flume.conf.Configurable
    public void configure(Context context) {
        this.command = context.getString("command");
        Preconditions.checkState(this.command != null, "The parameter command must be specified");
        this.restartThrottle = context.getLong(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE, 10000L).longValue();
        this.restart = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_RESTART, false).booleanValue();
        this.logStderr = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_LOG_STDERR, false).booleanValue();
        this.bufferCount = context.getInteger("batchSize", 20);
        this.batchTimeout = context.getLong(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT, 3000L).longValue();
        this.charset = Charset.forName(context.getString("charset", "UTF-8"));
        this.shell = context.getString(ExecSourceConfigurationConstants.CONFIG_SHELL, null);
        if (this.sourceCounter == null) {
            this.sourceCounter = new SourceCounter(getName());
        }
    }
}
