package org.apache.flume.source.taildirtokafka;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flume.Context;
import org.apache.flume.FlumeException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.taildirtokafka.ReliableTaildirBuilder;
import org.apache.flume.util.DateUtils;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/source/taildirtokafka/TaildirExSource.class */
public class TaildirExSource extends AbstractSource implements PollableSource, Configurable {
    private boolean byteOffsetHeader;
    private Map<String, String> filePaths;
    private Map<String, String> oldfilePaths;
    private Table<String, String, String> headerTable;
    private ScheduledExecutorService idleFileChecker;
    private int idleTimeout;
    private int lineMaxLength;
    private String positionFilePath;
    private String directory;
    private String positionFileBackupPath;
    private ScheduledExecutorService replaceFileDateName;
    private ReliableTaildirExEventReader reader;
    private boolean skipToEnd;
    private SourceCounter sourceCounter;
    private int writePosInterval;
    private String timeoffset;
    private boolean isMetricLog;
    private boolean hasTimestamp;
    private Long backoffSleepIncrement;
    private Long maxBackOffSleepInterval;
    private int batchSize;
    private List<ProducerRecord<String, byte[]>> outputRecords;
    private Producer<String, byte[]> producer;
    private static final Logger LOGGER = LoggerFactory.getLogger(TaildirExSource.class);
    private static final Gson GSON = new Gson();
    private String topic = null;
    private Integer partitionId = null;
    private Properties kafkaProps = new Properties();
    private int retryInterval = KafkaConstants.DEFAULT_BATCH_SIZE;
    private int maxRetryInterval = 5000;
    private int checkIdleInterval = 5000;
    private int writePosInitDelay = 5000;
    private List<Long> existingInodes = new CopyOnWriteArrayList();
    private List<Long> idleInodes = new CopyOnWriteArrayList();

    /* loaded from: input_file:org/apache/flume/source/taildirtokafka/TaildirExSource$IdleFileCheckerRunnable.class */
    private class IdleFileCheckerRunnable implements Runnable {
        private IdleFileCheckerRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                for (TailFileEx tailFileEx : TaildirExSource.this.reader.getTailFiles().values()) {
                    if (tailFileEx.getLastUpdated() + TaildirExSource.this.idleTimeout < currentTimeMillis && tailFileEx.getRaf() != null) {
                        TaildirExSource.this.idleInodes.add(Long.valueOf(tailFileEx.getInode()));
                    }
                }
            } catch (Throwable th) {
                TaildirExSource.LOGGER.error("Uncaught exception in IdleFileChecker thread.");
            }
        }
    }

    /* loaded from: input_file:org/apache/flume/source/taildirtokafka/TaildirExSource$ReplaceFileDateNameRunnable.class */
    private class ReplaceFileDateNameRunnable implements Runnable {
        private ReplaceFileDateNameRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            TaildirExSource.this.replaceFileName();
        }
    }

    public void setProducer(Producer<String, byte[]> producer) {
        this.producer = producer;
    }

    public void configure(Context context) {
        String string = context.getString(TaildirExSourceConfigurationConstants.FILE_GROUPS);
        Preconditions.checkNotNull(string, "Missing param: filegroups");
        this.filePaths = selectByKeys(context.getSubProperties(TaildirExSourceConfigurationConstants.FILE_GROUPS_PREFIX), string.split("\\s+"));
        this.oldfilePaths = selectByKeys(context.getSubProperties(TaildirExSourceConfigurationConstants.FILE_GROUPS_PREFIX), string.split("\\s+"));
        Preconditions.checkState(!this.filePaths.isEmpty(), "Mapping for tailing files is empty or invalid: 'filegroups.'");
        String property = System.getProperty("user.home");
        this.positionFilePath = context.getString(TaildirExSourceConfigurationConstants.POSITION_FILE, (property == null ? "" : property.replace('\\', '/')) + "/.flume/taildir_position.json");
        getBackupJson();
        this.headerTable = getTable(context, TaildirExSourceConfigurationConstants.HEADERS_PREFIX);
        this.skipToEnd = context.getBoolean("skipToEnd", Boolean.FALSE).booleanValue();
        this.byteOffsetHeader = context.getBoolean("byteOffsetHeader", Boolean.FALSE).booleanValue();
        this.idleTimeout = context.getInteger("idleTimeout", 120000).intValue();
        this.writePosInterval = context.getInteger("writePosInterval", 3000).intValue();
        this.lineMaxLength = context.getInteger("lineMaxLength", 800000).intValue();
        this.hasTimestamp = context.getBoolean(TaildirExSourceConfigurationConstants.HASTIMESTAMP, Boolean.FALSE).booleanValue();
        if (this.sourceCounter == null) {
            this.sourceCounter = new SourceCounter(getName());
        }
        this.timeoffset = context.getString(TaildirExSourceConfigurationConstants.TIMEOFFSET, "");
        if (!this.timeoffset.isEmpty()) {
            DateUtils.replaceFilePath(this.timeoffset, this.filePaths, this.oldfilePaths);
        }
        this.isMetricLog = context.getBoolean(TaildirExSourceConfigurationConstants.METRICLOG, Boolean.FALSE).booleanValue();
        kafkaConfigure(context);
    }

    private void kafkaConfigure(Context context) {
        this.kafkaProps = KafkaUtil.getKafkaProperties(context);
        this.batchSize = context.getInteger("kafka.producer.batch.size", Integer.valueOf(KafkaConstants.DEFAULT_BATCH_SIZE)).intValue();
        LOGGER.debug("Using batch size: {}", Integer.valueOf(this.batchSize));
        this.outputRecords = new ArrayList(this.batchSize);
        this.topic = context.getString("kafka.producer.topic", KafkaConstants.DEFAULT_TOPIC);
        this.partitionId = context.getInteger("kafka.producer.partition.id");
    }

    private void getBackupJson() {
        File file = new File(this.positionFilePath);
        String parent = file.getParent();
        String name = file.getName();
        String str = parent + "/backup_jason";
        File file2 = new File(str);
        boolean z = false;
        if (!file2.exists()) {
            z = file2.mkdirs();
        }
        this.positionFileBackupPath = str + "/" + name;
        if (z) {
            LOGGER.info("already mkdir json file backup directory");
        }
    }

    public PollableSource.Status process() {
        PollableSource.Status status = PollableSource.Status.READY;
        try {
            this.existingInodes.clear();
            this.existingInodes.addAll(this.reader.updateTailFiles(false));
            Iterator<Long> it = this.existingInodes.iterator();
            while (it.hasNext()) {
                TailFileEx tailFileEx = this.reader.getTailFiles().get(Long.valueOf(it.next().longValue()));
                if (tailFileEx.isNeedTail()) {
                    tailFileProcess(tailFileEx, true);
                }
            }
            closeTailFiles();
            try {
                TimeUnit.MILLISECONDS.sleep(this.retryInterval);
            } catch (InterruptedException e) {
                LOGGER.info("Interrupted while sleeping");
            }
        } catch (IOException e2) {
            LOGGER.error("Unable to tail files IOException.");
            status = PollableSource.Status.BACKOFF;
        } catch (InterruptedException e3) {
            LOGGER.error("Unable to tail files InterruptedException.");
            status = PollableSource.Status.BACKOFF;
        }
        return status;
    }

    public long getBackOffSleepIncrement() {
        return 0L;
    }

    public long getMaxBackOffSleepInterval() {
        return 0L;
    }

    public void start() {
        LOGGER.info("TaildirSource source starting with directory: {}", this.filePaths);
        instantiateKafkaProducer();
        try {
            this.reader = new ReliableTaildirBuilder.Builder().setFilePaths(this.filePaths).setHeaderTable(this.headerTable).setPositionFilePath(this.positionFilePath).setPositionFileBackupPath(this.positionFileBackupPath).setSkipToEnd(this.skipToEnd).setAddByteOffset(this.byteOffsetHeader).setLineMaxLength(this.lineMaxLength).setMetricLog(this.isMetricLog).build();
            this.idleFileChecker = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("idleFileChecker").build());
            this.idleFileChecker.scheduleWithFixedDelay(new IdleFileCheckerRunnable(), this.idleTimeout, this.checkIdleInterval, TimeUnit.MILLISECONDS);
            if (!this.timeoffset.isEmpty()) {
                this.replaceFileDateName = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("replaceFileDateName").build());
                long j = 0;
                String[] split = this.timeoffset.split("-");
                if (split[0].equals("D")) {
                    j = 86400;
                } else if (split[0].equals("H")) {
                    j = 3600;
                } else if (split[0].equals("M")) {
                    j = 60;
                }
                this.replaceFileDateName.scheduleWithFixedDelay(new ReplaceFileDateNameRunnable(), DateUtils.timeInterval(split[0]), j, TimeUnit.SECONDS);
            }
            super.start();
            LOGGER.debug("TaildirSource started");
            this.sourceCounter.start();
        } catch (IOException e) {
            throw new FlumeException("Error instantiating ReliableTaildirEventReader.");
        }
    }

    private void instantiateKafkaProducer() {
        LOGGER.info("Starting Kafka Producer for: {}", getName());
        try {
            this.producer = KafkaUtil.initProducer(this.kafkaProps);
            this.producer.initTransactions();
            LOGGER.info("Topic = {}", this.topic);
        } catch (Exception e) {
            if (null != this.producer) {
                try {
                    this.producer.close();
                } catch (Exception e2) {
                    LOGGER.error("starting Kafka occur exception:", e2);
                }
            }
            throw new RuntimeException(e);
        }
    }

    public void stop() {
        try {
            super.stop();
            for (ExecutorService executorService : new ExecutorService[]{this.idleFileChecker, this.replaceFileDateName}) {
                if (executorService != null) {
                    executorService.shutdown();
                    if (!executorService.awaitTermination(1L, TimeUnit.SECONDS)) {
                        executorService.shutdownNow();
                    }
                }
            }
            writePosition(this.positionFilePath);
            writePosition(this.positionFileBackupPath);
            this.reader.close();
            this.producer.close();
        } catch (IOException e) {
            LOGGER.info("Stop IO failed.");
        } catch (InterruptedException e2) {
            LOGGER.info("Interrupted while awaiting termination.");
        }
        this.sourceCounter.stop();
        LOGGER.info("Taildir source {} stopped. Metrics: {}", getName(), this.sourceCounter);
    }

    public String toString() {
        return String.format("Taildir source: { positionFile: %s, skipToEnd: %s, byteOffsetHeader: %s, idleTimeout: %s, writePosInterval: %s }", this.positionFilePath, Boolean.valueOf(this.skipToEnd), Boolean.valueOf(this.byteOffsetHeader), Integer.valueOf(this.idleTimeout), Integer.valueOf(this.writePosInterval));
    }

    @VisibleForTesting
    protected SourceCounter getSourceCounter() {
        return this.sourceCounter;
    }

    private void closeTailFiles() throws IOException, InterruptedException {
        Iterator<Long> it = this.idleInodes.iterator();
        while (it.hasNext()) {
            long longValue = it.next().longValue();
            TailFileEx tailFileEx = this.reader.getTailFiles().get(Long.valueOf(longValue));
            if (tailFileEx.getRaf() != null) {
                tailFileProcess(tailFileEx, false);
                tailFileEx.close();
                Logger logger = LOGGER;
                String path = tailFileEx.getPath();
                tailFileEx.getPos();
                logger.info("Closed file: " + path + ", inode: " + longValue + ", pos: " + logger);
            }
        }
        this.idleInodes.clear();
    }

    private Table<String, String, String> getTable(Context context, String str) {
        HashBasedTable create = HashBasedTable.create();
        for (Map.Entry entry : context.getSubProperties(str).entrySet()) {
            String[] split = ((String) entry.getKey()).split("\\.", 2);
            create.put(split[0], split[1], entry.getValue());
        }
        return create;
    }

    private Map<String, String> selectByKeys(Map<String, String> map, String[] strArr) {
        HashMap newHashMap = Maps.newHashMap();
        for (String str : strArr) {
            if (map.containsKey(str)) {
                newHashMap.put(str, map.get(str));
            }
        }
        return newHashMap;
    }

    private void tailFileProcess(TailFileEx tailFileEx, boolean z) throws IOException, InterruptedException {
        while (true) {
            this.reader.setCurrentFile(tailFileEx);
            this.outputRecords = this.reader.readProducerRecords(this.batchSize, z, this.hasTimestamp, this.topic, this.partitionId);
            if (this.outputRecords.isEmpty()) {
                return;
            }
            this.sourceCounter.addToEventReceivedCount(this.outputRecords.size());
            this.sourceCounter.incrementAppendBatchReceivedCount();
            try {
                sendToKafka();
                this.reader.commit();
                writePosition(this.positionFilePath);
                writePosition(this.positionFileBackupPath);
                this.retryInterval = KafkaConstants.DEFAULT_BATCH_SIZE;
                this.sourceCounter.addToEventAcceptedCount(this.outputRecords.size());
                this.sourceCounter.incrementAppendBatchAcceptedCount();
                this.sourceCounter.addToEventAcceptedCount(this.reader.getReadedSize());
            } catch (KafkaException e) {
                this.producer.abortTransaction();
                LOGGER.error("Failed while reading file: " + tailFileEx.getPath() + "\n Kafka transaction set to abort.", e);
                TimeUnit.MILLISECONDS.sleep(this.retryInterval);
                this.retryInterval <<= 1;
                this.retryInterval = Math.min(this.retryInterval, this.maxRetryInterval);
            } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e2) {
                LOGGER.error("Failed while reading file: " + tailFileEx.getPath() + "\n Kafka producer has been closed.", e2);
                stop();
                return;
            }
            if (this.outputRecords.size() < this.batchSize) {
                return;
            }
        }
    }

    private void sendToKafka() {
        this.producer.beginTransaction();
        long nanoTime = System.nanoTime();
        Logger logger = LOGGER;
        this.outputRecords.size();
        logger.info("start to send records at " + nanoTime + ", and the size of records is " + logger);
        Iterator<ProducerRecord<String, byte[]>> it = this.outputRecords.iterator();
        while (it.hasNext()) {
            this.producer.send(it.next());
        }
        LOGGER.info("end of sending records at " + System.nanoTime());
        this.producer.commitTransaction();
    }

    private String toPosInfoJson() {
        ArrayList newArrayList = Lists.newArrayList();
        for (Long l : this.existingInodes) {
            TailFileEx tailFileEx = this.reader.getTailFiles().get(l);
            newArrayList.add(ImmutableMap.of("inode", l, "pos", Long.valueOf(tailFileEx.getPos()), "file", tailFileEx.getPath()));
        }
        return new Gson().toJson(newArrayList);
    }

    private void replaceFileName() {
        DateUtils.replaceFilePath(this.timeoffset, this.filePaths, this.oldfilePaths);
        try {
            this.reader = null;
            this.reader = new ReliableTaildirBuilder.Builder().setFilePaths(this.filePaths).setHeaderTable(this.headerTable).setPositionFilePath(this.positionFilePath).setPositionFileBackupPath(this.positionFileBackupPath).setSkipToEnd(this.skipToEnd).setAddByteOffset(this.byteOffsetHeader).setLineMaxLength(this.lineMaxLength).build();
        } catch (IOException e) {
            throw new FlumeException("Error instantiating ReliableTaildirEventReader.");
        }
    }

    /* JADX WARN: Removed duplicated region for block: B:105:0x0132 A[EXC_TOP_SPLITTER, SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:111:0x0118 A[EXC_TOP_SPLITTER, SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:77:0x0184 A[EXC_TOP_SPLITTER, SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:82:0x016a A[EXC_TOP_SPLITTER, SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void writePosition(java.lang.String r6) {
        /*
            Method dump skipped, instructions count: 413
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flume.source.taildirtokafka.TaildirExSource.writePosition(java.lang.String):void");
    }
}
