package org.apache.flume.source.exectokafka;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.SystemClock;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/source/exectokafka/ExecToKafkaSource.class */
public class ExecToKafkaSource extends AbstractSource implements EventDrivenSource, Configurable {
    private static final Logger logger = LoggerFactory.getLogger(ExecToKafkaSource.class);
    private String shell;
    private String command;
    private SourceCounter sourceCounter;
    private ExecutorService executor;
    private Future<?> runnerFuture;
    private long restartThrottle;
    private boolean restart;
    private boolean logStderr;
    private long batchTimeout;
    private ExecRunnable runner;
    private Charset charset;
    private int batchSize;
    private List<ProducerRecord<String, String>> outputRecords;
    private Producer<String, String> producer;
    private String topic = null;
    private Integer partitionId = null;
    private Properties kafkaProps = new Properties();

    /* loaded from: input_file:org/apache/flume/source/exectokafka/ExecToKafkaSource$ExecRunnable.class */
    private static class ExecRunnable implements Runnable {
        private final String shell;
        private final String command;
        private final String topic;
        private final Integer partitionId;
        private final Producer<String, String> producer;
        private final List<ProducerRecord<String, String>> outputRecords;
        private final ChannelProcessor channelProcessor;
        private final SourceCounter sourceCounter;
        private volatile boolean restart;
        private final long restartThrottle;
        private final int batchSize;
        private long batchTimeout;
        private final boolean logStderr;
        private final Charset charset;
        private Process process = null;
        private SystemClock systemClock = new SystemClock();
        private Long lastSendToKafka = Long.valueOf(this.systemClock.currentTimeMillis());
        ScheduledExecutorService timedFlushService;
        ScheduledFuture<?> future;

        public ExecRunnable(String str, String str2, ChannelProcessor channelProcessor, SourceCounter sourceCounter, boolean z, long j, boolean z2, int i, long j2, Charset charset, Producer<String, String> producer, String str3, Integer num, List<ProducerRecord<String, String>> list) {
            this.command = str2;
            this.channelProcessor = channelProcessor;
            this.sourceCounter = sourceCounter;
            this.restartThrottle = j;
            this.batchSize = i;
            this.batchTimeout = j2;
            this.restart = z;
            this.logStderr = z2;
            this.charset = charset;
            this.shell = str;
            this.producer = producer;
            this.topic = str3;
            this.partitionId = num;
            this.outputRecords = list;
        }

        @Override // java.lang.Runnable
        public void run() {
            String valueOf;
            do {
                if (null == this.channelProcessor) {
                    ExecToKafkaSource.logger.error("channelProcessor is null");
                }
                this.producer.initTransactions();
                BufferedReader bufferedReader = null;
                this.timedFlushService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("timedFlushExecService" + Thread.currentThread().getId() + "-%d").build());
                try {
                    try {
                        try {
                            try {
                                try {
                                    try {
                                        try {
                                        } catch (AuthorizationException e) {
                                            ExecToKafkaSource.logger.error("Failed while running command: " + this.command + "\n Kafka producer has been closed.", e);
                                            if (bufferedReader != null) {
                                                try {
                                                    bufferedReader.close();
                                                    this.producer.close();
                                                } catch (IOException e2) {
                                                    ExecToKafkaSource.logger.error("Failed to close reader for exec source", e2);
                                                }
                                            }
                                            valueOf = String.valueOf(kill());
                                        }
                                    } catch (ProducerFencedException e3) {
                                        ExecToKafkaSource.logger.error("Failed while running command: " + this.command + "\n Kafka producer has been closed.", e3);
                                        if (0 != 0) {
                                            try {
                                                bufferedReader.close();
                                                this.producer.close();
                                            } catch (IOException e4) {
                                                ExecToKafkaSource.logger.error("Failed to close reader for exec source", e4);
                                            }
                                        }
                                        valueOf = String.valueOf(kill());
                                    }
                                } catch (Exception e5) {
                                    ExecToKafkaSource.logger.error("Failed while running command: " + this.command, e5);
                                    if (0 != 0) {
                                        try {
                                            bufferedReader.close();
                                            this.producer.close();
                                        } catch (IOException e6) {
                                            ExecToKafkaSource.logger.error("Failed to close reader for exec source", e6);
                                        }
                                    }
                                    valueOf = String.valueOf(kill());
                                }
                            } catch (KafkaException e7) {
                                this.producer.abortTransaction();
                                ExecToKafkaSource.logger.error("Failed while running command: " + this.command + "\n Kafka transaction set to abort.", e7);
                                if (0 != 0) {
                                    try {
                                        bufferedReader.close();
                                        this.producer.close();
                                    } catch (IOException e8) {
                                        ExecToKafkaSource.logger.error("Failed to close reader for exec source", e8);
                                    }
                                }
                                valueOf = String.valueOf(kill());
                            }
                        } catch (OutOfOrderSequenceException e9) {
                            ExecToKafkaSource.logger.error("Failed while running command: " + this.command + "\n Kafka producer has been closed.", e9);
                            if (0 != 0) {
                                try {
                                    bufferedReader.close();
                                    this.producer.close();
                                } catch (IOException e10) {
                                    ExecToKafkaSource.logger.error("Failed to close reader for exec source", e10);
                                }
                            }
                            valueOf = String.valueOf(kill());
                        }
                    } catch (InterruptedException e11) {
                        ExecToKafkaSource.logger.error("Failed while running command: " + this.command, e11);
                        Thread.currentThread().interrupt();
                        if (0 != 0) {
                            try {
                                bufferedReader.close();
                                this.producer.close();
                            } catch (IOException e12) {
                                ExecToKafkaSource.logger.error("Failed to close reader for exec source", e12);
                            }
                        }
                        valueOf = String.valueOf(kill());
                    }
                    if (preProcessCommand()) {
                        if (0 != 0) {
                            try {
                                bufferedReader.close();
                                this.producer.close();
                            } catch (IOException e13) {
                                ExecToKafkaSource.logger.error("Failed to close reader for exec source", e13);
                            }
                        }
                        String.valueOf(kill());
                        return;
                    }
                    bufferedReader = new BufferedReader(new InputStreamReader(this.process.getInputStream(), this.charset));
                    StderrReader stderrReader = new StderrReader(new BufferedReader(new InputStreamReader(this.process.getErrorStream(), this.charset)), this.logStderr);
                    stderrReader.setName("StderrReader-[" + this.command + "]");
                    stderrReader.setDaemon(true);
                    stderrReader.start();
                    this.future = this.timedFlushService.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.flume.source.exectokafka.ExecToKafkaSource.ExecRunnable.1
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                synchronized (ExecRunnable.this.outputRecords) {
                                    if (!ExecRunnable.this.outputRecords.isEmpty() && ExecRunnable.this.timeout()) {
                                        ExecRunnable.this.sendToKafka();
                                    }
                                }
                            } catch (InterruptedException e14) {
                                ExecToKafkaSource.logger.error("Exception occured when processing event batch", e14);
                                Thread.currentThread().interrupt();
                            } catch (Exception e15) {
                                ExecToKafkaSource.logger.error("Exception occured when processing event batch", e15);
                            }
                        }
                    }, this.batchTimeout, this.batchTimeout, TimeUnit.MILLISECONDS);
                    generateKafkaMessage(bufferedReader);
                    synchronized (this.outputRecords) {
                        if (!this.outputRecords.isEmpty()) {
                            sendToKafka();
                        }
                    }
                    if (bufferedReader != null) {
                        try {
                            bufferedReader.close();
                            this.producer.close();
                        } catch (IOException e14) {
                            ExecToKafkaSource.logger.error("Failed to close reader for exec source", e14);
                        }
                    }
                    valueOf = String.valueOf(kill());
                    if (this.restart) {
                        ExecToKafkaSource.logger.info("Restarting in {}ms, exit code {}", Long.valueOf(this.restartThrottle), valueOf);
                        try {
                            Thread.sleep(this.restartThrottle);
                        } catch (InterruptedException e15) {
                            Thread.currentThread().interrupt();
                        }
                    } else {
                        ExecToKafkaSource.logger.info("Command [" + this.command + "] exited with " + valueOf);
                    }
                } catch (Throwable th) {
                    if (bufferedReader != null) {
                        try {
                            bufferedReader.close();
                            this.producer.close();
                        } catch (IOException e16) {
                            ExecToKafkaSource.logger.error("Failed to close reader for exec source", e16);
                        }
                    }
                    String.valueOf(kill());
                    throw th;
                }
            } while (this.restart);
        }

        private boolean preProcessCommand() throws IOException {
            if (this.shell == null) {
                this.process = new ProcessBuilder(this.command.split("\\s+")).start();
                return false;
            }
            if (Pattern.matches(".*[@|&;]+.*", this.command)) {
                ExecToKafkaSource.logger.error("Illegal command, \"@\",\"&\" , \"|\" and \";\" are not allowed", this.command);
                return true;
            }
            this.process = Runtime.getRuntime().exec(formulateShellCommand(this.shell, this.command));
            return false;
        }

        private void generateKafkaMessage(BufferedReader bufferedReader) throws IOException {
            while (true) {
                String readLine = bufferedReader.readLine();
                if (readLine == null) {
                    return;
                }
                synchronized (this.outputRecords) {
                    this.sourceCounter.incrementEventReceivedCount();
                    this.outputRecords.add(generateProducerRecord(readLine));
                    if (this.outputRecords.size() >= this.batchSize || timeout()) {
                        try {
                            sendToKafka();
                        } catch (InterruptedException e) {
                            ExecToKafkaSource.logger.error(this + ": Interrupted during sendToKafka", e);
                        }
                    }
                }
            }
        }

        private ProducerRecord<String, String> generateProducerRecord(String str) {
            return this.partitionId != null ? new ProducerRecord<>(this.topic, this.partitionId, (Object) null, str) : new ProducerRecord<>(this.topic, str);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void sendToKafka() throws InterruptedException {
            this.producer.beginTransaction();
            ExecToKafkaSource.logger.info("start to send records at " + System.nanoTime());
            Iterator<ProducerRecord<String, String>> it = this.outputRecords.iterator();
            while (it.hasNext()) {
                this.producer.send(it.next());
            }
            this.outputRecords.clear();
            ExecToKafkaSource.logger.info("end of sending records at " + System.nanoTime());
            this.lastSendToKafka = Long.valueOf(this.systemClock.currentTimeMillis());
            this.producer.commitTransaction();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean timeout() {
            return this.systemClock.currentTimeMillis() - this.lastSendToKafka.longValue() >= this.batchTimeout;
        }

        private static String[] formulateShellCommand(String str, String str2) {
            String[] split = str.split("\\s+");
            String[] strArr = new String[split.length + 1];
            System.arraycopy(split, 0, strArr, 0, split.length);
            strArr[split.length] = str2;
            return strArr;
        }

        public int kill() {
            int waitFor;
            if (this.process == null) {
                return -1073741824;
            }
            synchronized (this.process) {
                this.process.destroy();
                try {
                    waitFor = this.process.waitFor();
                    if (this.future != null) {
                        this.future.cancel(true);
                    }
                    if (this.timedFlushService != null) {
                        this.timedFlushService.shutdown();
                        while (!this.timedFlushService.isTerminated()) {
                            try {
                                this.timedFlushService.awaitTermination(500L, TimeUnit.MILLISECONDS);
                            } catch (InterruptedException e) {
                                ExecToKafkaSource.logger.debug("Interrupted while waiting for exec executor service to stop. Just exiting.");
                                Thread.currentThread().interrupt();
                            }
                        }
                    }
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    return Integer.MIN_VALUE;
                }
            }
            return waitFor;
        }

        public void setRestart(boolean z) {
            this.restart = z;
        }
    }

    /* loaded from: input_file:org/apache/flume/source/exectokafka/ExecToKafkaSource$StderrReader.class */
    private static class StderrReader extends Thread {
        private BufferedReader input;
        private boolean logStderr;

        protected StderrReader(BufferedReader bufferedReader, boolean z) {
            this.input = bufferedReader;
            this.logStderr = z;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            int i = 0;
            while (true) {
                try {
                    try {
                        String readLine = this.input.readLine();
                        if (readLine == null) {
                            break;
                        } else if (this.logStderr) {
                            i++;
                            ExecToKafkaSource.logger.info("StderrLogger[{}] = '{}'", Integer.valueOf(i), readLine.replaceAll("[\r\n]", ""));
                        }
                    } catch (IOException e) {
                        ExecToKafkaSource.logger.info("StderrLogger exiting", e);
                        try {
                            if (this.input != null) {
                                this.input.close();
                            }
                            return;
                        } catch (IOException e2) {
                            ExecToKafkaSource.logger.error("Failed to close stderr reader for exec source", e2);
                            return;
                        }
                    }
                } catch (Throwable th) {
                    try {
                        if (this.input != null) {
                            this.input.close();
                        }
                    } catch (IOException e3) {
                        ExecToKafkaSource.logger.error("Failed to close stderr reader for exec source", e3);
                    }
                    throw th;
                }
            }
            try {
                if (this.input != null) {
                    this.input.close();
                }
            } catch (IOException e4) {
                ExecToKafkaSource.logger.error("Failed to close stderr reader for exec source", e4);
            }
        }
    }

    public void start() {
        logger.info("Exec source starting with command:{}", this.command);
        this.executor = Executors.newSingleThreadExecutor();
        instantiateKafkaProducer();
        this.runner = new ExecRunnable(this.shell, this.command, getChannelProcessor(), this.sourceCounter, this.restart, this.restartThrottle, this.logStderr, this.batchSize, this.batchTimeout, this.charset, this.producer, this.topic, this.partitionId, this.outputRecords);
        this.runnerFuture = this.executor.submit(this.runner);
        this.sourceCounter.start();
        super.start();
        logger.debug("Exec source started");
    }

    private void instantiateKafkaProducer() {
        logger.info("Starting Kafka Producer for: {}", getName());
        try {
            this.producer = KafkaUtil.initProducer(this.kafkaProps);
            logger.info("Topic = {}", this.topic);
        } catch (Exception e) {
            if (null != this.producer) {
                try {
                    this.producer.close();
                } catch (Exception e2) {
                    logger.error("starting Kafka occur exception:", e2);
                }
            }
            throw new RuntimeException(e);
        }
    }

    public void stop() {
        logger.info("Stopping exec source with command:{}", this.command);
        if (this.runner != null) {
            this.runner.setRestart(false);
            this.runner.kill();
        }
        if (this.runnerFuture != null) {
            logger.debug("Stopping exec runner");
            this.runnerFuture.cancel(true);
            logger.debug("Exec runner stopped");
        }
        this.executor.shutdown();
        while (!this.executor.isShutdown()) {
            logger.debug("Waiting for exec executor service to stop");
            try {
                this.runnerFuture.cancel(true);
                this.executor.shutdownNow();
                this.executor.awaitTermination(500L, TimeUnit.MILLISECONDS);
                this.executor.awaitTermination(500L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                logger.debug("Interrupted while waiting for exec executor service to stop. Just exiting.");
                Thread.currentThread().interrupt();
            }
        }
        this.producer.close();
        this.sourceCounter.stop();
        super.stop();
        logger.debug("Exec source with command:{} stopped. Metrics:{}", this.command, this.sourceCounter);
    }

    public void configure(Context context) {
        this.command = context.getString("command");
        Preconditions.checkState(this.command != null, "The parameter command must be specified");
        kafkaConfigure(context);
        this.restartThrottle = context.getLong(ExecToKafkaSourceConfigurationConstants.CONFIG_RESTART_THROTTLE, Long.valueOf(ExecToKafkaSourceConfigurationConstants.DEFAULT_RESTART_THROTTLE)).longValue();
        this.restart = context.getBoolean(ExecToKafkaSourceConfigurationConstants.CONFIG_RESTART, false).booleanValue();
        this.logStderr = context.getBoolean(ExecToKafkaSourceConfigurationConstants.CONFIG_LOG_STDERR, false).booleanValue();
        this.batchTimeout = context.getLong(ExecToKafkaSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT, Long.valueOf(ExecToKafkaSourceConfigurationConstants.DEFAULT_BATCH_TIME_OUT)).longValue();
        this.charset = Charset.forName(context.getString(ExecToKafkaSourceConfigurationConstants.CHARSET, "UTF-8"));
        this.shell = context.getString(ExecToKafkaSourceConfigurationConstants.CONFIG_SHELL, (String) null);
        if (this.sourceCounter == null) {
            this.sourceCounter = new SourceCounter(getName());
        }
    }

    private void kafkaConfigure(Context context) {
        this.kafkaProps = KafkaUtil.getKafkaProperties(context);
        this.batchSize = context.getInteger("kafka.producer.batch.size", 100).intValue();
        logger.debug("Using batch size: {}", Integer.valueOf(this.batchSize));
        this.outputRecords = new ArrayList(this.batchSize);
        this.topic = context.getString("kafka.producer.topic", KafkaConstants.DEFAULT_TOPIC);
        this.partitionId = context.getInteger("kafka.producer.partition.id");
    }
}
