package org.apache.flume.source.taildir;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.taildir.ReliableTaildirEventReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/source/taildir/TaildirSource.class */
public class TaildirSource extends AbstractSource implements PollableSource, Configurable {
    private static final Logger logger = LoggerFactory.getLogger(TaildirSource.class);
    private Map<String, String> filePaths;
    private Table<String, String, String> headerTable;
    private int batchSize;
    private String positionFilePath;
    private boolean skipToEnd;
    private boolean byteOffsetHeader;
    private SourceCounter sourceCounter;
    private ReliableTaildirEventReader reader;
    private ScheduledExecutorService idleFileChecker;
    private ScheduledExecutorService positionWriter;
    private int idleTimeout;
    private int writePosInterval;
    private boolean cachePatternMatching;
    private boolean fileHeader;
    private String fileHeaderKey;
    private int retryInterval = TaildirSourceConfigurationConstants.DEFAULT_BATCH_SIZE;
    private int maxRetryInterval = 5000;
    private int checkIdleInterval = 5000;
    private int writePosInitDelay = 5000;
    private List<Long> existingInodes = new CopyOnWriteArrayList();
    private List<Long> idleInodes = new CopyOnWriteArrayList();
    private int monTime = 0;

    /* loaded from: input_file:org/apache/flume/source/taildir/TaildirSource$PositionWriterRunnable.class */
    private class PositionWriterRunnable implements Runnable {
        private PositionWriterRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            TaildirSource.this.writePosition();
        }
    }

    /* loaded from: input_file:org/apache/flume/source/taildir/TaildirSource$idleFileCheckerRunnable.class */
    private class idleFileCheckerRunnable implements Runnable {
        private idleFileCheckerRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                for (TailFile tailFile : TaildirSource.this.reader.getTailFiles().values()) {
                    if (tailFile.getLastUpdated() + TaildirSource.this.idleTimeout < currentTimeMillis && tailFile.getRaf() != null) {
                        TaildirSource.this.idleInodes.add(Long.valueOf(tailFile.getInode()));
                    }
                }
            } catch (Throwable th) {
                TaildirSource.logger.error("Uncaught exception in IdleFileChecker thread", th);
            }
        }
    }

    public synchronized void start() {
        logger.info("{} TaildirSource source starting with directory: {}", getName(), this.filePaths);
        try {
            this.reader = new ReliableTaildirEventReader.Builder().filePaths(this.filePaths).headerTable(this.headerTable).positionFilePath(this.positionFilePath).skipToEnd(this.skipToEnd).addByteOffset(this.byteOffsetHeader).cachePatternMatching(this.cachePatternMatching).annotateFileName(this.fileHeader).fileNameHeader(this.fileHeaderKey).build();
            this.idleFileChecker = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("idleFileChecker").build());
            this.idleFileChecker.scheduleWithFixedDelay(new idleFileCheckerRunnable(), this.idleTimeout, this.checkIdleInterval, TimeUnit.MILLISECONDS);
            this.positionWriter = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("positionWriter").build());
            this.positionWriter.scheduleWithFixedDelay(new PositionWriterRunnable(), this.writePosInitDelay, this.writePosInterval, TimeUnit.MILLISECONDS);
            super.start();
            logger.debug("TaildirSource started");
            this.sourceCounter.setMonTime(this.monTime);
            this.sourceCounter.start();
        } catch (IOException e) {
            throw new FlumeException("Error instantiating ReliableTaildirEventReader", e);
        }
    }

    public synchronized void stop() {
        try {
            super.stop();
            for (ExecutorService executorService : new ExecutorService[]{this.idleFileChecker, this.positionWriter}) {
                executorService.shutdown();
                if (!executorService.awaitTermination(1L, TimeUnit.SECONDS)) {
                    executorService.shutdownNow();
                }
            }
            writePosition();
            this.reader.close();
        } catch (IOException e) {
            logger.info("Failed: " + e.getMessage(), e);
        } catch (InterruptedException e2) {
            logger.info("Interrupted while awaiting termination", e2);
        }
        this.sourceCounter.stop();
        logger.info("Taildir source {} stopped. Metrics: {}", getName(), this.sourceCounter);
    }

    public String toString() {
        return String.format("Taildir source: { positionFile: %s, skipToEnd: %s, byteOffsetHeader: %s, idleTimeout: %s, writePosInterval: %s }", this.positionFilePath, Boolean.valueOf(this.skipToEnd), Boolean.valueOf(this.byteOffsetHeader), Integer.valueOf(this.idleTimeout), Integer.valueOf(this.writePosInterval));
    }

    public synchronized void configure(Context context) {
        String string = context.getString(TaildirSourceConfigurationConstants.FILE_GROUPS);
        Preconditions.checkState(string != null, "Missing param: filegroups");
        this.filePaths = selectByKeys(context.getSubProperties(TaildirSourceConfigurationConstants.FILE_GROUPS_PREFIX), string.split("\\s+"));
        Preconditions.checkState(!this.filePaths.isEmpty(), "Mapping for tailing files is empty or invalid: 'filegroups.'");
        this.positionFilePath = context.getString(TaildirSourceConfigurationConstants.POSITION_FILE, System.getProperty("user.home").replace('\\', '/') + TaildirSourceConfigurationConstants.DEFAULT_POSITION_FILE);
        try {
            Files.createDirectories(Paths.get(this.positionFilePath, new String[0]).getParent(), new FileAttribute[0]);
            this.headerTable = getTable(context, TaildirSourceConfigurationConstants.HEADERS_PREFIX);
            this.batchSize = context.getInteger(TaildirSourceConfigurationConstants.BATCH_SIZE, Integer.valueOf(TaildirSourceConfigurationConstants.DEFAULT_BATCH_SIZE)).intValue();
            this.skipToEnd = context.getBoolean(TaildirSourceConfigurationConstants.SKIP_TO_END, false).booleanValue();
            this.byteOffsetHeader = context.getBoolean(TaildirSourceConfigurationConstants.BYTE_OFFSET_HEADER, false).booleanValue();
            this.idleTimeout = context.getInteger(TaildirSourceConfigurationConstants.IDLE_TIMEOUT, Integer.valueOf(TaildirSourceConfigurationConstants.DEFAULT_IDLE_TIMEOUT)).intValue();
            this.writePosInterval = context.getInteger(TaildirSourceConfigurationConstants.WRITE_POS_INTERVAL, Integer.valueOf(TaildirSourceConfigurationConstants.DEFAULT_WRITE_POS_INTERVAL)).intValue();
            this.cachePatternMatching = context.getBoolean(TaildirSourceConfigurationConstants.CACHE_PATTERN_MATCHING, true).booleanValue();
            this.fileHeader = context.getBoolean(TaildirSourceConfigurationConstants.FILENAME_HEADER, false).booleanValue();
            this.fileHeaderKey = context.getString(TaildirSourceConfigurationConstants.FILENAME_HEADER_KEY, TaildirSourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY);
            if (this.sourceCounter == null) {
                this.sourceCounter = new SourceCounter(getName());
            }
            this.monTime = context.getInteger(TaildirSourceConfigurationConstants.MON_TIME, 0).intValue();
            this.sourceCounter.setMonTime(this.monTime);
        } catch (IOException e) {
            throw new FlumeException("Error creating positionFile parent directories", e);
        }
    }

    private Map<String, String> selectByKeys(Map<String, String> map, String[] strArr) {
        HashMap newHashMap = Maps.newHashMap();
        for (String str : strArr) {
            if (map.containsKey(str)) {
                newHashMap.put(str, map.get(str));
            }
        }
        return newHashMap;
    }

    private Table<String, String, String> getTable(Context context, String str) {
        HashBasedTable create = HashBasedTable.create();
        UnmodifiableIterator it = context.getSubProperties(str).entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            String[] split = ((String) entry.getKey()).split("\\.", 2);
            create.put(split[0], split[1], entry.getValue());
        }
        return create;
    }

    @VisibleForTesting
    protected SourceCounter getSourceCounter() {
        return this.sourceCounter;
    }

    public PollableSource.Status process() {
        PollableSource.Status status = PollableSource.Status.READY;
        this.sourceCounter.setUpdateTime();
        try {
            this.existingInodes.clear();
            this.existingInodes.addAll(this.reader.updateTailFiles());
            Iterator<Long> it = this.existingInodes.iterator();
            while (it.hasNext()) {
                TailFile tailFile = this.reader.getTailFiles().get(Long.valueOf(it.next().longValue()));
                if (tailFile.needTail()) {
                    tailFileProcess(tailFile, true);
                }
            }
            this.sourceCounter.setUpdateTime();
            closeTailFiles();
            try {
                TimeUnit.MILLISECONDS.sleep(this.retryInterval);
            } catch (InterruptedException e) {
                logger.info("Interrupted while sleeping");
            }
            this.sourceCounter.setUpdateTime();
        } catch (Throwable th) {
            logger.error("Unable to tail files", th);
            status = PollableSource.Status.BACKOFF;
            this.sourceCounter.setUpdateTime();
        }
        return status;
    }

    private void tailFileProcess(TailFile tailFile, boolean z) throws IOException, InterruptedException {
        while (true) {
            this.reader.setCurrentFile(tailFile);
            List<Event> readEvents = this.reader.readEvents(this.batchSize, z);
            if (readEvents.isEmpty()) {
                return;
            }
            this.sourceCounter.addToEventReceivedCount(readEvents.size());
            this.sourceCounter.incrementAppendBatchReceivedCount();
            try {
                getChannelProcessor().processEventBatch(readEvents);
                this.reader.commit();
                this.retryInterval = TaildirSourceConfigurationConstants.DEFAULT_BATCH_SIZE;
                this.sourceCounter.addToEventAcceptedCount(readEvents.size());
                this.sourceCounter.incrementAppendBatchAcceptedCount();
                this.sourceCounter.AddToSizeAcceptedCount(this.reader.getReadedSize());
            } catch (ChannelException e) {
                logger.warn("The channel is full or unexpected failure. The source will try again after " + this.retryInterval + " ms");
                TimeUnit.MILLISECONDS.sleep(this.retryInterval);
                this.retryInterval <<= 1;
                this.retryInterval = Math.min(this.retryInterval, this.maxRetryInterval);
            }
            if (readEvents.size() < this.batchSize) {
                return;
            }
        }
    }

    private void closeTailFiles() throws IOException, InterruptedException {
        Iterator<Long> it = this.idleInodes.iterator();
        while (it.hasNext()) {
            long longValue = it.next().longValue();
            TailFile tailFile = this.reader.getTailFiles().get(Long.valueOf(longValue));
            if (tailFile.getRaf() != null) {
                tailFileProcess(tailFile, false);
                tailFile.close();
                logger.info("Closed file: " + tailFile.getPath() + ", inode: " + longValue + ", pos: " + tailFile.getPos());
            }
        }
        this.idleInodes.clear();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writePosition() {
        FileWriter fileWriter = null;
        try {
            try {
                fileWriter = new FileWriter(new File(this.positionFilePath));
                if (!this.existingInodes.isEmpty()) {
                    fileWriter.write(toPosInfoJson());
                }
                if (fileWriter != null) {
                    try {
                        fileWriter.close();
                    } catch (IOException e) {
                        logger.error("Error: " + e.getMessage(), e);
                    }
                }
            } catch (Throwable th) {
                logger.error("Failed writing positionFile", th);
                if (fileWriter != null) {
                    try {
                        fileWriter.close();
                    } catch (IOException e2) {
                        logger.error("Error: " + e2.getMessage(), e2);
                    }
                }
            }
        } catch (Throwable th2) {
            if (fileWriter != null) {
                try {
                    fileWriter.close();
                } catch (IOException e3) {
                    logger.error("Error: " + e3.getMessage(), e3);
                    throw th2;
                }
            }
            throw th2;
        }
    }

    private String toPosInfoJson() {
        ArrayList newArrayList = Lists.newArrayList();
        for (Long l : this.existingInodes) {
            TailFile tailFile = this.reader.getTailFiles().get(l);
            newArrayList.add(ImmutableMap.of("inode", l, "pos", Long.valueOf(tailFile.getPos()), TaildirSourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY, tailFile.getPath()));
        }
        return new Gson().toJson(newArrayList);
    }
}
