package org.apache.flume.source.taildirtokafka;

import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import com.google.gson.stream.JsonReader;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.client.avro.ReliableEventReader;
import org.apache.flume.tools.PlatformDetect;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.nio.ch.FileKey;

/* loaded from: input_file:org/apache/flume/source/taildirtokafka/ReliableTaildirExEventReader.class */
public final class ReliableTaildirExEventReader implements ReliableEventReader {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReliableTaildirExEventReader.class);
    private boolean addByteOffset;
    private final Table<String, String, String> headerTable;
    private int lineMaxLength;
    private final Table<String, File, Pattern> tailFileTable;
    private long updateTime;
    private boolean isMetricLog;
    private long acceptedSize = 0;
    private boolean committed = true;
    private TailFileEx currentFile = null;
    private Map<Long, TailFileEx> tailFiles = Maps.newHashMap();

    public ReliableTaildirExEventReader(Map<String, String> map, Table<String, String, String> table, String str, String str2, boolean z, boolean z2, int i, boolean z3) throws IOException {
        Preconditions.checkNotNull(map);
        Preconditions.checkNotNull(str);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Initializing {} with directory={}, metaDir={}", new Object[]{ReliableTaildirExEventReader.class.getSimpleName(), map});
        }
        HashBasedTable create = HashBasedTable.create();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            File file = new File(entry.getValue());
            File parentFile = file.getParentFile();
            Preconditions.checkNotNull(parentFile);
            Preconditions.checkState(parentFile.exists(), "Directory does not exist: " + parentFile.getAbsolutePath());
            create.put(entry.getKey(), parentFile, Pattern.compile(file.getName()));
        }
        LOGGER.info("tailFileTable: " + create.toString());
        LOGGER.info("headerTable: " + table.toString());
        this.tailFileTable = create;
        this.headerTable = table;
        this.addByteOffset = z2;
        this.lineMaxLength = i;
        this.isMetricLog = z3;
        updateTailFiles(z);
        LOGGER.info("Updating position from position file: " + str);
        loadPositionFile(this.tailFiles, str, str2);
    }

    private void loadPositionFile(Map<Long, TailFileEx> map, String str, String str2) throws IOException {
        BufferedReader bufferedReader = null;
        InputStreamReader inputStreamReader = null;
        FileInputStream fileInputStream = null;
        JsonReader jsonReader = null;
        try {
            try {
                fileInputStream = new FileInputStream(str);
                inputStreamReader = new InputStreamReader(fileInputStream, "UTF-8");
                bufferedReader = new BufferedReader(inputStreamReader);
                jsonReader = new JsonReader(bufferedReader);
                jsonReader.beginArray();
                while (jsonReader.hasNext()) {
                    Long l = null;
                    Long l2 = null;
                    String str3 = null;
                    jsonReader.beginObject();
                    while (jsonReader.hasNext()) {
                        String nextName = jsonReader.nextName();
                        if ("inode".equals(nextName)) {
                            l = Long.valueOf(jsonReader.nextLong());
                        } else if ("pos".equals(nextName)) {
                            l2 = Long.valueOf(jsonReader.nextLong());
                        } else if ("file".equals(nextName)) {
                            str3 = jsonReader.nextString();
                        }
                    }
                    jsonReader.endObject();
                    Iterator it = Arrays.asList(l, l2, str3).iterator();
                    while (it.hasNext()) {
                        Preconditions.checkNotNull(it.next(), "Detected missing value in position file. inode: " + l + ", pos: " + l2 + ", path: " + str3);
                    }
                    TailFileEx tailFileEx = map.get(l);
                    if (tailFileEx != null && l != null && l2 != null && tailFileEx.updatePos(str3, l.longValue(), l2.longValue())) {
                        map.put(l, tailFileEx);
                    }
                }
                jsonReader.endArray();
                IOUtils.closeQuietly(jsonReader);
                IOUtils.closeQuietly(bufferedReader);
                IOUtils.closeQuietly(inputStreamReader);
                IOUtils.closeQuietly(fileInputStream);
            } catch (FileNotFoundException e) {
                LOGGER.info("File not found: " + str + ", not updating position");
                IOUtils.closeQuietly(jsonReader);
                IOUtils.closeQuietly(bufferedReader);
                IOUtils.closeQuietly(inputStreamReader);
                IOUtils.closeQuietly(fileInputStream);
            } catch (IOException e2) {
                LOGGER.error("Failed loading positionFile: " + str);
                rewritejson(str2, str);
                IOUtils.closeQuietly(jsonReader);
                IOUtils.closeQuietly(bufferedReader);
                IOUtils.closeQuietly(inputStreamReader);
                IOUtils.closeQuietly(fileInputStream);
            }
        } catch (Throwable th) {
            IOUtils.closeQuietly(jsonReader);
            IOUtils.closeQuietly(bufferedReader);
            IOUtils.closeQuietly(inputStreamReader);
            IOUtils.closeQuietly(fileInputStream);
            throw th;
        }
    }

    private void rewritejson(String str, String str2) throws IOException {
        File file = new File(str);
        FileInputStream fileInputStream = null;
        FileOutputStream fileOutputStream = null;
        if (!file.exists() || file.length() == 0) {
            return;
        }
        File file2 = new File(str2);
        if (file2.exists() && file2.delete()) {
            LOGGER.info("corrupted file is deleted " + str2);
        }
        try {
            try {
                fileInputStream = new FileInputStream(file);
                fileOutputStream = FileUtils.openOutputStream(file2);
                byte[] bArr = new byte[1024];
                while (true) {
                    int read = fileInputStream.read(bArr);
                    if (read <= 0) {
                        break;
                    } else {
                        fileOutputStream.write(bArr, 0, read);
                    }
                }
                if (null != fileOutputStream) {
                    try {
                        fileOutputStream.close();
                    } finally {
                        if (null != fileInputStream) {
                            try {
                                fileInputStream.close();
                            } catch (Exception e) {
                                LOGGER.error("close FileInputStream fail...");
                            }
                        }
                    }
                }
            } catch (IOException e2) {
                LOGGER.error("replace json from backup json fail...");
                if (null != fileOutputStream) {
                    try {
                        fileOutputStream.close();
                    } finally {
                        if (null != fileInputStream) {
                            try {
                                fileInputStream.close();
                            } catch (Exception e3) {
                                LOGGER.error("close FileInputStream fail...");
                            }
                        }
                    }
                }
            }
            LOGGER.info("copy file successful " + str2);
            throw new IOException();
        } catch (Throwable th) {
            if (null != fileOutputStream) {
                try {
                    fileOutputStream.close();
                } finally {
                    if (null != fileInputStream) {
                        try {
                            fileInputStream.close();
                        } catch (Exception e4) {
                            LOGGER.error("close FileInputStream fail...");
                        }
                    }
                }
            }
            if (null != fileInputStream) {
                try {
                    fileInputStream.close();
                } catch (Exception e5) {
                    LOGGER.error("close FileInputStream fail...");
                    throw th;
                }
            }
            throw th;
        }
    }

    public void close() throws IOException {
        for (TailFileEx tailFileEx : this.tailFiles.values()) {
            if (tailFileEx.getRaf() != null) {
                tailFileEx.getRaf().close();
            }
        }
    }

    public void commit() throws IOException {
        if (this.committed || this.currentFile == null) {
            return;
        }
        this.currentFile.setPos(this.currentFile.getLineReadPos());
        this.currentFile.setLastUpdated(this.updateTime);
        this.committed = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getReadedSize() {
        return this.acceptedSize;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<Long, TailFileEx> getTailFiles() {
        return this.tailFiles;
    }

    public Event readEvent() throws IOException {
        return null;
    }

    public List<Event> readEvents(int i) throws IOException {
        return null;
    }

    public List<ProducerRecord<String, byte[]>> readProducerRecords(int i, boolean z, boolean z2, String str, Integer num) throws IOException {
        if (this.currentFile == null) {
            throw new IllegalStateException("current file does not exist. ");
        }
        if (!this.committed) {
            LOGGER.info("Last read was never committed - resetting position");
            this.currentFile.updateFilePos(this.currentFile.getPos());
        }
        List<FileContent> readRecords = this.currentFile.readRecords(i, z);
        List<FileContent> arrayList = new ArrayList();
        if (readRecords.isEmpty()) {
            return Lists.newLinkedList();
        }
        if (this.isMetricLog) {
            Pattern compile = Pattern.compile(".*flume current metrics:.*");
            for (FileContent fileContent : readRecords) {
                String str2 = new String(fileContent.content, Charsets.UTF_8);
                if (compile.matcher(str2).matches()) {
                    String[] split = str2.split("\\|");
                    if (split[3] != null && !split[3].equals("")) {
                        fileContent.content = split[3].replace("flume current metrics:", "").replace(" ", "").getBytes("utf-8");
                        arrayList.add(fileContent);
                    }
                }
            }
        } else {
            arrayList = readRecords;
        }
        List<FileContent> arrayList2 = new ArrayList();
        if (z2) {
            for (FileContent fileContent2 : arrayList) {
                fileContent2.content = (new String(fileContent2.content, "utf-8") + ",\"SourceTimestamp\":" + System.currentTimeMillis()).getBytes("utf-8");
                arrayList2.add(fileContent2);
            }
        } else {
            arrayList2 = arrayList;
        }
        this.acceptedSize = 0L;
        this.committed = false;
        ArrayList arrayList3 = new ArrayList(i);
        Iterator<FileContent> it = arrayList2.iterator();
        while (it.hasNext()) {
            arrayList3.add(generateProducerRecord(it.next(), str, num));
        }
        return arrayList3;
    }

    private ProducerRecord<String, byte[]> generateProducerRecord(FileContent fileContent, String str, Integer num) {
        Map<String, String> headers = this.currentFile.getHeaders();
        String str2 = headers.get("kafka.producer.topic");
        String str3 = isExists(str2).booleanValue() ? str2 : str;
        String str4 = headers.get("kafka.producer.partition.id");
        Integer valueOf = isExists(str4).booleanValue() ? Integer.valueOf(str4) : num;
        if (this.addByteOffset) {
            headers.put("byteoffset", String.valueOf(fileContent.posTmp));
        }
        return valueOf != null ? new ProducerRecord<>(str3, valueOf, (Object) null, fileContent.content, KafkaUtil.generateKafkaHeaders(headers)) : new ProducerRecord<>(str3, (Integer) null, (Object) null, fileContent.content, KafkaUtil.generateKafkaHeaders(headers));
    }

    private Boolean isExists(String str) {
        return Boolean.valueOf((str == null || str.isEmpty()) ? false : true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setCurrentFile(TailFileEx tailFileEx) {
        this.currentFile = tailFileEx;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<Long> updateTailFiles(boolean z) throws IOException {
        this.updateTime = System.currentTimeMillis();
        ArrayList newArrayList = Lists.newArrayList();
        for (Table.Cell cell : this.tailFileTable.cellSet()) {
            Map<String, String> row = this.headerTable.row((String) cell.getRowKey());
            Iterator<File> it = getMatchFiles((File) cell.getColumnKey(), (Pattern) cell.getValue()).iterator();
            while (it.hasNext()) {
                newArrayList.add(Long.valueOf(updateTailFileEx(z, row, it.next())));
            }
        }
        return newArrayList;
    }

    private long updateTailFileEx(boolean z, Map<String, String> map, File file) throws IOException {
        long inode = getInode(file);
        TailFileEx tailFileEx = this.tailFiles.get(Long.valueOf(inode));
        if (tailFileEx == null) {
            tailFileEx = openFile(file, map, inode, z ? file.length() : 0L);
        } else {
            boolean z2 = tailFileEx.getLastUpdated() < file.lastModified();
            if (z2) {
                if (tailFileEx.getRaf() == null) {
                    tailFileEx = openFile(file, map, inode, tailFileEx.getPos());
                }
                if (file.length() < tailFileEx.getPos()) {
                    LOGGER.info("Pos " + tailFileEx.getPos() + " is larger than file size! Restarting from pos 0, file: " + tailFileEx.getPath() + ", inode: " + inode);
                    tailFileEx.updatePos(tailFileEx.getPath(), inode, 0L);
                }
            }
            tailFileEx.setNeedTail(z2);
        }
        this.tailFiles.put(Long.valueOf(inode), tailFileEx);
        return inode;
    }

    private long getInode(File file) throws IOException {
        if (!PlatformDetect.isWindows()) {
            return ((Long) Files.getAttribute(file.toPath(), "unix:ino", new LinkOption[0])).longValue();
        }
        FileInputStream fileInputStream = new FileInputStream(file);
        Throwable th = null;
        try {
            try {
                long hashCode = FileKey.create(fileInputStream.getFD()).hashCode();
                if (fileInputStream != null) {
                    if (0 != 0) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileInputStream.close();
                    }
                }
                return hashCode;
            } finally {
            }
        } catch (Throwable th3) {
            if (fileInputStream != null) {
                if (th != null) {
                    try {
                        fileInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileInputStream.close();
                }
            }
            throw th3;
        }
    }

    private List<File> getMatchFiles(File file, Pattern pattern) {
        RegxFileFilter regxFileFilter = new RegxFileFilter(pattern);
        ArrayList newArrayList = Lists.newArrayList();
        File[] listFiles = file.listFiles(regxFileFilter);
        if (listFiles != null && listFiles.length > 0) {
            Iterator it = Arrays.asList(listFiles).iterator();
            while (it.hasNext()) {
                newArrayList.add((File) it.next());
            }
        }
        return newArrayList;
    }

    private TailFileEx openFile(File file, Map<String, String> map, long j, long j2) {
        try {
            if (!file.exists()) {
                LOGGER.error("source file is empty");
            }
            LOGGER.info("Opening file: " + file + ", inode: " + j + ", pos: " + j2);
            return new TailFileEx(file, map, j, j2, this.lineMaxLength);
        } catch (IOException e) {
            throw new FlumeException("Failed opening file: " + file);
        }
    }
}
