package org.apache.flume.channel;

import com.google.common.base.Preconditions;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.flume.ChannelException;
import org.apache.flume.ChannelFullException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.annotations.Recyclable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.ChannelCounter;
import org.apache.flume.tools.FlumeSendAlarmMgr;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;

@InterfaceAudience.Public
@InterfaceStability.Stable
@Recyclable
/* loaded from: input_file:org/apache/flume/channel/MemoryFileChannel.class */
public class MemoryFileChannel extends BasicChannelSemantics {
    private static final double byteCapacitySlotSize = 100.0d;
    private static final String COMPRESSION_TYPE = "compression-type";
    private static final int defaultFullCount = 10;
    private String dataDir;
    private static final String FILE_PREFIX = "mf-";
    private String compressionType;
    private static final int CAUSE_ID_39006 = 39006;
    private BlockingQueue<FileEventQueue> queue;
    private ConcurrentHashMap<Long, OutputStreamInfo> threadMap;
    private Semaphore capacityRemaining;
    private Semaphore queueStored;
    private volatile Integer transCapacity;
    private volatile int keepAlive;
    private volatile int byteCapacity;
    private volatile int subqueueByteCapacity;
    private volatile int queueSplitInterval;
    private volatile int lastByteCapacity;
    private volatile int lastCapacity;
    private volatile int byteCapacityBufferPercentage;
    private Semaphore bytesRemaining;
    private ChannelCounter channelCounter;
    private static final Logger LOGGER = LoggerFactory.getLogger(MemoryFileChannel.class);
    private static final Integer defaultCapacity = 50000;
    private static final Integer defaultSubqueueNumber = 2000;
    private static final Integer defaultTransCapacity = 5000;
    private static final Long defaultByteCapacity = Long.valueOf((long) (Runtime.getRuntime().maxMemory() * 0.8d));
    private static final Integer defaultByteCapacityBufferPercentage = 20;
    private static final Integer defaultKeepAlive = 3;
    private static final Integer defaultSubqueueByteCapacity = 20971520;
    private static final Integer defaultSubqueueIntervalMills = 2000;
    private static final Integer defaultWriterBufferSize = 1048576;
    private int channelFullCount = 0;
    private int currentChannelFullCount = 0;
    private FlumeSendAlarmMgr sendAlarmMgr = FlumeSendAlarmMgr.getInstance();
    private boolean isSendAlarm = false;
    private Object queueLock = new Object();

    /* loaded from: input_file:org/apache/flume/channel/MemoryFileChannel$FileEventQueue.class */
    public static class FileEventQueue {
        private String fileName;
        private LinkedList<Event> subQueue;

        public FileEventQueue(String str, LinkedList<Event> linkedList) {
            this.fileName = str;
            this.subQueue = linkedList;
        }

        public long size() {
            return this.subQueue.size();
        }

        public Event poll() {
            if (this.subQueue == null) {
                return null;
            }
            return this.subQueue.poll();
        }

        public void rollback(LinkedBlockingDeque<Event> linkedBlockingDeque) {
            while (!linkedBlockingDeque.isEmpty()) {
                this.subQueue.addFirst(linkedBlockingDeque.removeLast());
            }
        }

        public synchronized void delete() {
            if (new File(this.fileName).delete()) {
                MemoryFileChannel.LOGGER.debug("Delete file:" + this.fileName);
            } else {
                MemoryFileChannel.LOGGER.warn("Delete file:" + this.fileName + " is failed");
            }
        }
    }

    /* loaded from: input_file:org/apache/flume/channel/MemoryFileChannel$MemoryTransaction.class */
    private class MemoryTransaction extends BasicTransactionSemantics {
        private LinkedBlockingDeque<Event> takeList;
        private LinkedBlockingDeque<Event> putList;
        private final ChannelCounter channelCounter;
        private int putByteCounter = 0;
        private int takeByteCounter = 0;
        private long putDataSize = 0;
        private long takeDataSize = 0;
        private List<FileEventQueue> deleteEventQueues = new ArrayList();

        public MemoryTransaction(int i, ChannelCounter channelCounter) {
            this.putList = new LinkedBlockingDeque<>(i);
            this.takeList = new LinkedBlockingDeque<>(i);
            this.channelCounter = channelCounter;
        }

        protected void doPut(Event event) throws InterruptedException {
            this.channelCounter.incrementEventPutAttemptCount();
            int ceil = (int) Math.ceil(MemoryFileChannel.this.estimateEventSize(event) / MemoryFileChannel.byteCapacitySlotSize);
            if (this.putList.offer(event)) {
                this.putByteCounter += ceil;
                this.putDataSize += MemoryFileChannel.this.estimateEventSize(event);
            } else {
                MemoryFileChannel.LOGGER.info("kk putList full");
                increseCountAndSendAlarm();
                throw new ChannelException("Put queue for MemoryTransaction of capacity " + this.putList.size() + " full, consider committing more frequently, increasing capacity or increasing thread count");
            }
        }

        protected Event doTake() throws InterruptedException {
            Event takeFromSubQueue;
            this.channelCounter.incrementEventTakeAttemptCount();
            if (this.takeList.remainingCapacity() == 0) {
                throw new ChannelException("Take list for MemoryTransaction, capacity " + this.takeList.size() + " full, consider committing more frequently, increasing capacity, or increasing thread count");
            }
            if (!MemoryFileChannel.this.queueStored.tryAcquire(MemoryFileChannel.this.keepAlive, TimeUnit.SECONDS) || (takeFromSubQueue = takeFromSubQueue()) == null) {
                return null;
            }
            Preconditions.checkNotNull(takeFromSubQueue, "Queue.poll returned NULL despite semaphore signalling existence of entry");
            this.takeList.put(takeFromSubQueue);
            this.takeByteCounter += (int) Math.ceil(MemoryFileChannel.this.estimateEventSize(takeFromSubQueue) / MemoryFileChannel.byteCapacitySlotSize);
            this.takeDataSize += MemoryFileChannel.this.estimateEventSize(takeFromSubQueue);
            return takeFromSubQueue;
        }

        private Event takeFromSubQueue() throws InterruptedException {
            FileEventQueue peek;
            FileEventQueue peek2;
            synchronized (MemoryFileChannel.this.queueLock) {
                peek = MemoryFileChannel.this.queue.peek();
            }
            if (peek == null) {
                boolean z = false;
                for (OutputStreamInfo outputStreamInfo : MemoryFileChannel.this.threadMap.values()) {
                    if (outputStreamInfo.isExpired() || outputStreamInfo.size() > MemoryFileChannel.this.subqueueByteCapacity) {
                        peek = new FileEventQueue(outputStreamInfo.getFileName(), outputStreamInfo.getSubQueue());
                        try {
                            outputStreamInfo.close();
                            synchronized (MemoryFileChannel.this.queueLock) {
                                MemoryFileChannel.this.queue.put(peek);
                            }
                            z = true;
                            break;
                        } catch (IOException e) {
                            throw new InterruptedException(e.getMessage());
                        }
                    }
                }
                if (!z) {
                    MemoryFileChannel.this.queueStored.release(1);
                    return null;
                }
                MemoryFileChannel.LOGGER.debug("Subqueue reached interval time and create a new subqueue");
            }
            Event poll = peek.poll();
            long size = peek.size();
            if (size == 0) {
                synchronized (MemoryFileChannel.this.queueLock) {
                    MemoryFileChannel.this.queue.poll();
                }
                this.deleteEventQueues.add(peek);
            } else {
                MemoryFileChannel.this.queueStored.release((int) size);
            }
            if (poll == null) {
                synchronized (MemoryFileChannel.this.queueLock) {
                    peek2 = MemoryFileChannel.this.queue.peek();
                }
                if (peek2 == null) {
                    return null;
                }
                poll = peek2.poll();
            }
            return poll;
        }

        protected void doCommit() throws InterruptedException {
            int size = this.takeList.size() - this.putList.size();
            if (size < 0) {
                if (MemoryFileChannel.this.bytesRemaining.availablePermits() < this.putByteCounter) {
                    MemoryFileChannel.LOGGER.info("bytesRemaining full");
                    increseCountAndSendAlarm();
                }
                if (!MemoryFileChannel.this.bytesRemaining.tryAcquire(this.putByteCounter, MemoryFileChannel.this.keepAlive, TimeUnit.SECONDS)) {
                    throw new ChannelException("Cannot commit transaction. Byte capacity allocated to store event body " + (MemoryFileChannel.this.byteCapacity * MemoryFileChannel.byteCapacitySlotSize) + "reached. Please increase heap space/byte capacity allocated to the channel as the sinks may not be keeping up with the sources");
                }
                if (MemoryFileChannel.this.capacityRemaining.availablePermits() < (-size)) {
                    MemoryFileChannel.LOGGER.info("bytesRemaining full");
                    increseCountAndSendAlarm();
                }
                if (!MemoryFileChannel.this.capacityRemaining.tryAcquire(-size, MemoryFileChannel.this.keepAlive, TimeUnit.SECONDS)) {
                    MemoryFileChannel.LOGGER.info("capacityRemaining full");
                    increseCountAndSendAlarm();
                    MemoryFileChannel.this.bytesRemaining.release(this.putByteCounter);
                    throw new ChannelFullException("Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight");
                }
            }
            int size2 = this.putList.size();
            int size3 = this.takeList.size();
            synchronized (MemoryFileChannel.this.queueLock) {
                if (size2 > 0) {
                    saveToFileQueue(this.putList);
                }
                this.putList.clear();
                this.takeList.clear();
            }
            this.channelCounter.addToChannelStoreSize(this.putDataSize);
            releaseBytesRemaining(this.takeByteCounter);
            if (this.channelCounter.getChannelStoreSize() - this.takeDataSize <= 0) {
                this.channelCounter.addToChannelStoreSize(0L);
            } else {
                this.channelCounter.addToChannelStoreSize(-this.takeDataSize);
            }
            this.takeByteCounter = 0;
            this.putByteCounter = 0;
            MemoryFileChannel.this.queueStored.release(size2);
            if (size > 0) {
                releaseCapacityRemaining(size);
            }
            if (size2 > 0) {
                this.channelCounter.addToEventPutSuccessCount(size2);
                initCountAndSendAlarm();
            }
            if (size3 > 0) {
                this.channelCounter.addToEventTakeSuccessCount(size3);
                Iterator<FileEventQueue> it = this.deleteEventQueues.iterator();
                while (it.hasNext()) {
                    it.next().delete();
                }
                this.deleteEventQueues.clear();
            }
            this.channelCounter.setChannelSize(MemoryFileChannel.this.getChannelSize());
        }

        private void releaseBytesRemaining(int i) {
            if (MemoryFileChannel.this.bytesRemaining.availablePermits() + i <= MemoryFileChannel.this.byteCapacity) {
                MemoryFileChannel.this.bytesRemaining.release(i);
            } else {
                MemoryFileChannel.this.bytesRemaining.release(MemoryFileChannel.this.byteCapacity - MemoryFileChannel.this.bytesRemaining.availablePermits());
                MemoryFileChannel.LOGGER.info("Release more bytes. takeByteCounter=" + i + ",byteCapacity=" + MemoryFileChannel.this.byteCapacity);
            }
        }

        private void releaseCapacityRemaining(int i) {
            if (MemoryFileChannel.this.capacityRemaining.availablePermits() + i <= MemoryFileChannel.this.lastCapacity) {
                MemoryFileChannel.this.capacityRemaining.release(i);
            } else {
                MemoryFileChannel.this.capacityRemaining.release(MemoryFileChannel.this.lastCapacity - MemoryFileChannel.this.capacityRemaining.availablePermits());
                MemoryFileChannel.LOGGER.info("Release more capacity. remainingChange=" + i + ",lastCapacity=" + MemoryFileChannel.this.lastCapacity);
            }
        }

        public void saveToFileQueue(LinkedBlockingDeque<Event> linkedBlockingDeque) throws InterruptedException {
            try {
                Thread currentThread = Thread.currentThread();
                OutputStreamInfo outputStreamInfo = MemoryFileChannel.this.threadMap.get(Long.valueOf(currentThread.getId()));
                if (outputStreamInfo == null) {
                    outputStreamInfo = new OutputStreamInfo();
                    MemoryFileChannel.this.threadMap.put(Long.valueOf(currentThread.getId()), outputStreamInfo);
                    MemoryFileChannel.LOGGER.info("Create outputStreamInfo for thread: " + currentThread.getName());
                } else if (outputStreamInfo.isClosed()) {
                    outputStreamInfo.reset();
                }
                while (!linkedBlockingDeque.isEmpty()) {
                    outputStreamInfo.add(linkedBlockingDeque.removeFirst());
                }
            } catch (Exception e) {
                throw new InterruptedException(e.getMessage());
            }
        }

        protected void doRollback() {
            FileEventQueue peek;
            int size = this.takeList.size();
            MemoryFileChannel.LOGGER.info("rollback:" + size);
            synchronized (MemoryFileChannel.this.queueLock) {
                synchronized (MemoryFileChannel.this.queueLock) {
                    peek = MemoryFileChannel.this.queue.peek();
                }
                if (peek == null) {
                    return;
                }
                if (size > 0) {
                    peek.rollback(this.takeList);
                }
                this.putList.clear();
                this.putByteCounter = 0;
                this.takeByteCounter = 0;
                this.putDataSize = 0L;
                this.takeDataSize = 0L;
                MemoryFileChannel.this.queueStored.release(size);
                this.channelCounter.setChannelSize(MemoryFileChannel.this.getChannelSize());
            }
        }

        public boolean remainingCapacity() {
            if (this.takeList.remainingCapacity() != 0 && this.putList.remainingCapacity() != 0) {
                return true;
            }
            MemoryFileChannel.LOGGER.warn("Take/Put list for MemoryTransaction,capacity is full.");
            increseCountAndSendAlarm();
            return false;
        }

        public void increseCountAndSendAlarm() {
            MemoryFileChannel.this.currentChannelFullCount++;
            MemoryFileChannel.LOGGER.debug("currentChannelFullCount:" + MemoryFileChannel.this.currentChannelFullCount + ",channelFullCount:" + MemoryFileChannel.this.channelFullCount);
            if (MemoryFileChannel.this.channelFullCount <= 0 || MemoryFileChannel.this.currentChannelFullCount <= MemoryFileChannel.this.channelFullCount) {
                return;
            }
            MemoryFileChannel.this.sendAlarmMgr.sendChannelFailAlarm(MemoryFileChannel.this.getName(), MemoryFileChannel.CAUSE_ID_39006);
            MemoryFileChannel.this.currentChannelFullCount = MemoryFileChannel.this.channelFullCount;
            MemoryFileChannel.this.isSendAlarm = true;
        }

        private void initCountAndSendAlarm() {
            MemoryFileChannel.this.currentChannelFullCount = 0;
            if (MemoryFileChannel.this.isSendAlarm) {
                MemoryFileChannel.this.sendAlarmMgr.sendChannelNormalAlarm(MemoryFileChannel.this.getName(), MemoryFileChannel.CAUSE_ID_39006);
            }
            MemoryFileChannel.this.isSendAlarm = false;
        }
    }

    /* loaded from: input_file:org/apache/flume/channel/MemoryFileChannel$OutputStreamInfo.class */
    public class OutputStreamInfo {
        private ChunkedOutputStream out;
        private long curSize = 0;
        private LinkedList<Event> subQueue;
        private String fileName;
        private long startMills;

        public OutputStreamInfo() throws IOException {
            reset();
        }

        public void add(Event event) throws UnsupportedEncodingException, IOException {
            byte[] serializer = serializer(event);
            this.out.writeBlock(serializer);
            if (MemoryFileChannel.this.compressionType.equalsIgnoreCase("snappy")) {
                serializer = Snappy.uncompress(serializer);
                event.setBody(serializer);
            }
            this.subQueue.add(event);
            this.curSize += serializer.length;
            this.out.flush();
        }

        private byte[] serializer(Event event) {
            Map headers = event.getHeaders();
            headers.put("body", new String(event.getBody()));
            try {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                try {
                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
                    try {
                        try {
                            objectOutputStream.writeObject(headers);
                            byte[] byteArray = byteArrayOutputStream.toByteArray();
                            objectOutputStream.close();
                            byteArrayOutputStream.close();
                            return byteArray;
                        } catch (Throwable th) {
                            try {
                                objectOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                            throw th;
                        }
                    } catch (IOException e) {
                        MemoryFileChannel.LOGGER.error("Serializer error", e);
                        objectOutputStream.close();
                        byteArrayOutputStream.close();
                        return null;
                    }
                } finally {
                }
            } catch (IOException e2) {
                MemoryFileChannel.LOGGER.error("Unable to open/close the ByteArrayOutputStream bos or ObjectOutputStream oos", e2);
                return null;
            }
        }

        public long size() {
            return this.curSize;
        }

        public LinkedList<Event> getSubQueue() {
            return this.subQueue;
        }

        public String getFileName() {
            return this.fileName;
        }

        public final void reset() throws IOException {
            this.fileName = MemoryFileChannel.this.dataDir + (File.separator + "mf-") + new Date().getTime();
            MemoryFileChannel.LOGGER.debug("Create file:" + this.fileName);
            this.out = new ChunkedOutputStream(new BufferedOutputStream(new FileOutputStream(this.fileName), MemoryFileChannel.defaultWriterBufferSize.intValue()));
            this.subQueue = new LinkedList<>();
            this.curSize = 0L;
            this.startMills = Calendar.getInstance().getTimeInMillis();
        }

        public boolean isExpired() {
            if (Calendar.getInstance().getTimeInMillis() - this.startMills <= MemoryFileChannel.this.queueSplitInterval) {
                return false;
            }
            this.startMills = Calendar.getInstance().getTimeInMillis();
            return true;
        }

        public void close() throws IOException {
            this.startMills = Calendar.getInstance().getTimeInMillis();
            if (this.out != null) {
                this.out.close();
                this.out = null;
            }
        }

        public boolean isClosed() {
            return this.out == null;
        }
    }

    public boolean applyToken() {
        if (this.bytesRemaining.availablePermits() >= this.byteCapacity * 0.2d) {
            return true;
        }
        LOGGER.warn("The available memory is less than 20% of the total.");
        BasicTransactionSemantics transaction = getTransaction();
        return (BasicTransactionSemantics.class.isInstance(transaction) && transaction.remainingCapacity()) ? false : false;
    }

    public void configure(Context context) {
        Integer num;
        Integer num2;
        this.channelFullCount = context.getInteger("channelfullcount", Integer.valueOf(defaultFullCount)).intValue();
        if (this.channelFullCount < 0) {
            throw new IllegalArgumentException("channelFullCount is invalid,it must greater than zero");
        }
        try {
            num = context.getInteger("capacity", defaultCapacity);
        } catch (NumberFormatException e) {
            num = defaultCapacity;
            LOGGER.warn("Invalid capacity specified, initializing channel to default capacity of {}", defaultCapacity);
        }
        if (num.intValue() <= 0) {
            num = defaultCapacity;
            LOGGER.warn("Invalid capacity specified, initializing channel to default capacity of {}", defaultCapacity);
        }
        try {
            num2 = context.getInteger("subqueueNumber", defaultSubqueueNumber);
        } catch (NumberFormatException e2) {
            num2 = defaultSubqueueNumber;
            LOGGER.warn("Invalid subqueue number specified, initializing channel to default subqueue number of {}", defaultSubqueueNumber);
        }
        if (num2.intValue() <= 0) {
            num2 = defaultSubqueueNumber;
            LOGGER.warn("Invalid subqueue number specified, initializing channel to default subqueue number of {}", defaultCapacity);
        }
        this.compressionType = context.getString(COMPRESSION_TYPE, "none");
        try {
            this.transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity);
        } catch (NumberFormatException e3) {
            this.transCapacity = defaultTransCapacity;
            LOGGER.warn("Invalid transation capacity specified, initializing channel to default capacity of {}", defaultTransCapacity);
        }
        if (this.transCapacity.intValue() <= 0) {
            this.transCapacity = defaultTransCapacity;
            LOGGER.warn("Invalid transation capacity specified, initializing channel to default capacity of {}", defaultTransCapacity);
        }
        Preconditions.checkState(this.transCapacity.intValue() <= num.intValue(), "Transaction Capacity of Memory file Channel cannot be higher than the capacity.");
        try {
            this.subqueueByteCapacity = context.getInteger("subqueueByteCapacity", defaultSubqueueByteCapacity).intValue();
        } catch (NumberFormatException e4) {
            this.subqueueByteCapacity = defaultSubqueueByteCapacity.intValue();
            LOGGER.warn("Invalid subqueue capacity specified, initializing channel to default capacity of {}", defaultSubqueueByteCapacity);
        }
        if (this.subqueueByteCapacity <= 0) {
            this.subqueueByteCapacity = defaultSubqueueByteCapacity.intValue();
            LOGGER.warn("Invalid subqueue capacity specified, initializing channel to default capacity of {}", defaultSubqueueByteCapacity);
        }
        try {
            this.queueSplitInterval = context.getInteger("subqueueInterval", defaultSubqueueIntervalMills).intValue();
        } catch (NumberFormatException e5) {
            this.queueSplitInterval = defaultSubqueueIntervalMills.intValue();
            LOGGER.warn("Invalid queue split interval specified, initializing channel to default capacity of {}", defaultSubqueueIntervalMills);
        }
        if (defaultSubqueueByteCapacity.intValue() <= 0) {
            this.queueSplitInterval = defaultSubqueueIntervalMills.intValue();
            LOGGER.warn("Invalid squeue split interval specified, initializing channel to default capacity of {}", defaultSubqueueIntervalMills);
        }
        try {
            this.byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage", defaultByteCapacityBufferPercentage).intValue();
        } catch (NumberFormatException e6) {
            this.byteCapacityBufferPercentage = defaultByteCapacityBufferPercentage.intValue();
        }
        try {
            this.byteCapacity = (int) ((context.getLong("byteCapacity", defaultByteCapacity).longValue() * (1.0d - (this.byteCapacityBufferPercentage * 0.01d))) / byteCapacitySlotSize);
            if (this.byteCapacity < 1) {
                this.byteCapacity = Integer.MAX_VALUE;
            }
        } catch (NumberFormatException e7) {
            this.byteCapacity = (int) ((defaultByteCapacity.longValue() * (1.0d - (this.byteCapacityBufferPercentage * 0.01d))) / byteCapacitySlotSize);
        }
        try {
            this.keepAlive = context.getInteger("keep-alive", defaultKeepAlive).intValue();
        } catch (NumberFormatException e8) {
            this.keepAlive = defaultKeepAlive.intValue();
        }
        this.dataDir = context.getString("dataDir", System.getProperty("user.home").replace('\\', '/') + "/.flume/memoryFileChannel/data").trim();
        if (this.queue != null) {
            try {
                resizeQueue(num2.intValue());
            } catch (InterruptedException e9) {
                Thread.currentThread().interrupt();
            }
        } else {
            synchronized (this.queueLock) {
                this.queue = new ArrayBlockingQueue(num2.intValue());
                this.queueStored = new Semaphore(0);
            }
        }
        if (this.capacityRemaining == null) {
            this.capacityRemaining = new Semaphore(num.intValue());
            this.lastCapacity = num.intValue();
        } else if (num.intValue() > this.lastCapacity) {
            this.capacityRemaining.release(num.intValue() - this.lastCapacity);
            this.lastCapacity = num.intValue();
        } else {
            try {
                if (this.capacityRemaining.tryAcquire(this.lastCapacity - num.intValue(), this.keepAlive, TimeUnit.SECONDS)) {
                    this.lastCapacity = num.intValue();
                } else {
                    LOGGER.warn("Couldn't acquire permits to downsize the byte capacity, resizing has been aborted");
                }
            } catch (InterruptedException e10) {
                Thread.currentThread().interrupt();
            }
        }
        if (this.bytesRemaining == null) {
            this.bytesRemaining = new Semaphore(this.byteCapacity);
            this.lastByteCapacity = this.byteCapacity;
        } else if (this.byteCapacity > this.lastByteCapacity) {
            this.bytesRemaining.release(this.byteCapacity - this.lastByteCapacity);
            this.lastByteCapacity = this.byteCapacity;
        } else {
            try {
                if (this.bytesRemaining.tryAcquire(this.lastByteCapacity - this.byteCapacity, this.keepAlive, TimeUnit.SECONDS)) {
                    this.lastByteCapacity = this.byteCapacity;
                } else {
                    LOGGER.warn("Couldn't acquire permits to downsize the byte capacity, resizing has been aborted");
                }
            } catch (InterruptedException e11) {
                Thread.currentThread().interrupt();
            }
        }
        if (this.channelCounter == null) {
            this.channelCounter = new ChannelCounter(getName());
        }
        if (this.threadMap == null) {
            this.threadMap = new ConcurrentHashMap<>();
        }
    }

    private int getChannelSize() {
        return this.lastCapacity - this.capacityRemaining.availablePermits();
    }

    private int getChannelCapacity() {
        return this.lastCapacity;
    }

    private int getChannelByteCapacity() {
        return this.byteCapacity;
    }

    private void resizeQueue(int i) throws InterruptedException {
        int size;
        synchronized (this.queueLock) {
            size = this.queue.size() + this.queue.remainingCapacity();
        }
        if (size == i) {
            return;
        }
        if (size > i) {
            synchronized (this.queueLock) {
                ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(i);
                arrayBlockingQueue.addAll(this.queue);
                this.queue = arrayBlockingQueue;
            }
            return;
        }
        synchronized (this.queueLock) {
            ArrayBlockingQueue arrayBlockingQueue2 = new ArrayBlockingQueue(i);
            arrayBlockingQueue2.addAll(this.queue);
            this.queue = arrayBlockingQueue2;
        }
    }

    public synchronized void start() {
        LOGGER.info("start");
        File file = new File(this.dataDir);
        Preconditions.checkArgument(file.isDirectory() || file.mkdirs(), "LogDir " + file + " could not be created");
        LOGGER.info("dataDir=" + this.dataDir);
        int i = 0;
        int i2 = 0;
        for (File file2 : file.listFiles(new FileFilter() { // from class: org.apache.flume.channel.MemoryFileChannel.1
            @Override // java.io.FileFilter
            public boolean accept(File file3) {
                return file3.getName().startsWith(MemoryFileChannel.FILE_PREFIX);
            }
        })) {
            if (file2.isFile()) {
                ChunkedInputStream chunkedInputStream = null;
                try {
                    try {
                        chunkedInputStream = new ChunkedInputStream(new BufferedInputStream(new FileInputStream(file2)));
                        LinkedList linkedList = new LinkedList();
                        while (true) {
                            byte[] readBlock = chunkedInputStream.readBlock();
                            byte[] bArr = readBlock;
                            if (readBlock == null) {
                                break;
                            }
                            if (this.compressionType.equalsIgnoreCase("snappy")) {
                                bArr = Snappy.uncompress(bArr);
                            }
                            linkedList.add(unserializer(bArr));
                            i++;
                            i2 += bArr.length;
                        }
                        FileEventQueue fileEventQueue = new FileEventQueue(file2.getCanonicalPath(), linkedList);
                        LOGGER.info("Replayed file:" + file2.getName());
                        this.queue.put(fileEventQueue);
                        this.sendAlarmMgr.sendChannelNormalAlarm(getName(), CAUSE_ID_39006);
                        this.isSendAlarm = false;
                        if (null != chunkedInputStream) {
                            try {
                                chunkedInputStream.close();
                            } catch (IOException e) {
                                LOGGER.error("Close file failed" + file2.getName(), e);
                            }
                        }
                    } catch (Throwable th) {
                        if (null != chunkedInputStream) {
                            try {
                                chunkedInputStream.close();
                            } catch (IOException e2) {
                                LOGGER.error("Close file failed" + file2.getName(), e2);
                                throw th;
                            }
                        }
                        throw th;
                    }
                } catch (Exception e3) {
                    LOGGER.error("reply file:" + file2.getName() + " failed", e3);
                    if (null != chunkedInputStream) {
                        try {
                            chunkedInputStream.close();
                        } catch (IOException e4) {
                            LOGGER.error("Close file failed" + file2.getName(), e4);
                        }
                    }
                }
            }
        }
        int ceil = (int) Math.ceil(i2 / byteCapacitySlotSize);
        LOGGER.info("eventByteSize=" + ceil + ",bytesRemaining=" + this.bytesRemaining.availablePermits() + ",size=" + i + ",capacity=" + this.capacityRemaining.availablePermits());
        if (ceil > 0) {
            try {
                if (ceil > this.byteCapacity) {
                    if (!this.bytesRemaining.tryAcquire(this.byteCapacity, this.keepAlive, TimeUnit.SECONDS)) {
                        LOGGER.error("Space for replay. eventByteSize=" + ceil + ",byteCapacity=" + getChannelByteCapacity());
                    }
                } else if (!this.bytesRemaining.tryAcquire(ceil, this.keepAlive, TimeUnit.SECONDS)) {
                    LOGGER.error("Space for replay. eventByteSize=" + ceil + ",byteCapacity=" + getChannelByteCapacity());
                }
            } catch (InterruptedException e5) {
                LOGGER.error("Acquire permit interrupted when replaying", e5);
            }
        }
        if (i > 0) {
            try {
                if (i > getChannelCapacity()) {
                    if (!this.capacityRemaining.tryAcquire(getChannelCapacity(), this.keepAlive, TimeUnit.SECONDS)) {
                        LOGGER.error("Space for replay. putSize=" + i + ",capacity=" + getChannelCapacity());
                    }
                } else if (!this.capacityRemaining.tryAcquire(i, this.keepAlive, TimeUnit.SECONDS)) {
                    LOGGER.error("Space for replay. putSize=" + i + ",capacity=" + getChannelCapacity());
                }
                LOGGER.info("capacityRemaining permit=" + this.capacityRemaining.availablePermits());
            } catch (InterruptedException e6) {
                LOGGER.error("Acquire permit interrupted when replaying", e6);
            }
        }
        this.channelCounter.start();
        this.channelCounter.setChannelSize(i);
        this.queueStored.release(i);
        this.channelCounter.setChannelCapacity(Long.valueOf(getChannelCapacity()).longValue());
        this.channelCounter.addToEventPutSuccessCount(i2);
        super.start();
    }

    private Event unserializer(byte[] bArr) {
        try {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
            try {
                ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
                try {
                    try {
                        Map map = (Map) objectInputStream.readObject();
                        Event withBody = EventBuilder.withBody(((String) map.get("body")).getBytes());
                        withBody.setHeaders(map);
                        objectInputStream.close();
                        byteArrayInputStream.close();
                        return withBody;
                    } catch (ClassNotFoundException e) {
                        LOGGER.info("Class not found : ", e);
                        objectInputStream.close();
                        byteArrayInputStream.close();
                        return null;
                    }
                } catch (Throwable th) {
                    try {
                        objectInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (IOException e2) {
            LOGGER.info("unserial error : ", e2);
            return null;
        }
    }

    public synchronized void stop() {
        this.channelCounter.setChannelSize(getChannelSize());
        this.channelCounter.stop();
        super.stop();
    }

    protected BasicTransactionSemantics createTransaction() {
        return new MemoryTransaction(this.transCapacity.intValue(), this.channelCounter);
    }

    private long estimateEventSize(Event event) {
        byte[] body = event.getBody();
        if (body == null || body.length == 0) {
            return 1L;
        }
        return body.length;
    }
}
