package org.apache.flume.source;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.SyslogUtils;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark-project.guava.base.Preconditions;
import org.spark-project.guava.base.Throwables;
import org.spark-project.guava.collect.Lists;

/* loaded from: input_file:org/apache/flume/source/MultiportSyslogTCPSource.class */
public class MultiportSyslogTCPSource extends AbstractSource implements EventDrivenSource, Configurable {
    public static final Logger logger = LoggerFactory.getLogger((Class<?>) MultiportSyslogTCPSource.class);
    private String host;
    private NioSocketAcceptor acceptor;
    private Integer numProcessors;
    private int maxEventSize;
    private int batchSize;
    private int readBufferSize;
    private String portHeader;
    private Charset defaultCharset;
    private ThreadSafeDecoder defaultDecoder;
    private Set<String> keepFields;
    private List<Integer> ports = Lists.newArrayList();
    private SourceCounter sourceCounter = null;
    private final ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/flume/source/MultiportSyslogTCPSource$LineSplitter.class */
    static class LineSplitter {
        private static final byte NEWLINE = 10;
        private final int maxLineLength;

        public LineSplitter(int i) {
            this.maxLineLength = i;
        }

        public boolean parseLine(IoBuffer ioBuffer, IoBuffer ioBuffer2, ParsedBuffer parsedBuffer) {
            parsedBuffer.buffer = null;
            parsedBuffer.incomplete = false;
            ioBuffer.mark();
            int position = ioBuffer2.position();
            boolean z = false;
            while (!z && ioBuffer.hasRemaining() && position < this.maxLineLength) {
                if (ioBuffer.get() == 10) {
                    z = true;
                }
                position++;
            }
            if (z) {
                int position2 = ioBuffer.position();
                ioBuffer.reset();
                int position3 = ioBuffer.position();
                if (ioBuffer2.position() <= 0) {
                    parsedBuffer.buffer = ioBuffer.getSlice((position2 - position3) - 1);
                    ioBuffer.get();
                    return true;
                }
                byte[] bArr = new byte[position2 - position3];
                ioBuffer.get(bArr);
                ioBuffer2.put(bArr);
                int position4 = ioBuffer2.position() - 1;
                ioBuffer2.flip();
                parsedBuffer.buffer = ioBuffer2.getSlice(position4);
                ioBuffer2.clear();
                return true;
            }
            if (position != this.maxLineLength) {
                if (ioBuffer.hasRemaining()) {
                    throw new IllegalStateException("unexpected buffer state: msgPos=" + position + ", buf.hasRemaining=" + ioBuffer.hasRemaining() + ", savedBuf.hasRemaining=" + ioBuffer2.hasRemaining() + ", seenNewline=" + z + ", maxLen=" + this.maxLineLength);
                }
                int position5 = ioBuffer.position();
                ioBuffer.reset();
                byte[] bArr2 = new byte[position5 - ioBuffer.position()];
                ioBuffer.get(bArr2);
                ioBuffer2.put(bArr2);
                return false;
            }
            int position6 = ioBuffer.position();
            ioBuffer.reset();
            int position7 = ioBuffer.position();
            if (ioBuffer2.position() > 0) {
                byte[] bArr3 = new byte[position6 - position7];
                ioBuffer.get(bArr3);
                ioBuffer2.put(bArr3);
                ioBuffer2.flip();
                parsedBuffer.buffer = ioBuffer2.getSlice(position);
                ioBuffer2.clear();
            } else {
                parsedBuffer.buffer = ioBuffer.getSlice(position);
            }
            MultiportSyslogTCPSource.logger.warn("Event size larger than specified event size: {}. Consider increasing the max event size.", Integer.valueOf(this.maxLineLength));
            parsedBuffer.incomplete = true;
            return true;
        }
    }

    /* loaded from: input_file:org/apache/flume/source/MultiportSyslogTCPSource$MultiportSyslogHandler.class */
    static class MultiportSyslogHandler extends IoHandlerAdapter {
        private static final String SAVED_BUF = "savedBuffer";
        private final ChannelProcessor channelProcessor;
        private final int maxEventSize;
        private final int batchSize;
        private final SourceCounter sourceCounter;
        private final String portHeader;
        private final SyslogParser syslogParser = new SyslogParser();
        private final LineSplitter lineSplitter;
        private final ThreadSafeDecoder defaultDecoder;
        private final ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets;
        private Set<String> keepFields;

        public MultiportSyslogHandler(int i, int i2, ChannelProcessor channelProcessor, SourceCounter sourceCounter, String str, ThreadSafeDecoder threadSafeDecoder, ConcurrentMap<Integer, ThreadSafeDecoder> concurrentMap, Set<String> set) {
            this.channelProcessor = channelProcessor;
            this.sourceCounter = sourceCounter;
            this.maxEventSize = i;
            this.batchSize = i2;
            this.portHeader = str;
            this.defaultDecoder = threadSafeDecoder;
            this.portCharsets = concurrentMap;
            this.keepFields = set;
            this.lineSplitter = new LineSplitter(i);
        }

        @Override // org.apache.mina.core.service.IoHandlerAdapter, org.apache.mina.core.service.IoHandler
        public void exceptionCaught(IoSession ioSession, Throwable th) throws Exception {
            MultiportSyslogTCPSource.logger.error("Error in syslog message handler", th);
            if (th instanceof Error) {
                Throwables.propagate(th);
            }
        }

        @Override // org.apache.mina.core.service.IoHandlerAdapter, org.apache.mina.core.service.IoHandler
        public void sessionCreated(IoSession ioSession) {
            MultiportSyslogTCPSource.logger.info("Session created: {}", ioSession);
            ioSession.setAttribute(SAVED_BUF, IoBuffer.allocate(this.maxEventSize, false));
        }

        @Override // org.apache.mina.core.service.IoHandlerAdapter, org.apache.mina.core.service.IoHandler
        public void sessionOpened(IoSession ioSession) {
            MultiportSyslogTCPSource.logger.debug("Session opened: {}", ioSession);
        }

        @Override // org.apache.mina.core.service.IoHandlerAdapter, org.apache.mina.core.service.IoHandler
        public void sessionClosed(IoSession ioSession) {
            MultiportSyslogTCPSource.logger.info("Session closed: {}", ioSession);
        }

        @Override // org.apache.mina.core.service.IoHandlerAdapter, org.apache.mina.core.service.IoHandler
        public void messageReceived(IoSession ioSession, Object obj) {
            IoBuffer ioBuffer = (IoBuffer) obj;
            IoBuffer ioBuffer2 = (IoBuffer) ioSession.getAttribute(SAVED_BUF);
            ParsedBuffer parsedBuffer = new ParsedBuffer();
            ArrayList newArrayList = Lists.newArrayList();
            CharsetDecoder charsetDecoder = this.defaultDecoder.get();
            int port = ((InetSocketAddress) ioSession.getLocalAddress()).getPort();
            if (this.portCharsets.containsKey(Integer.valueOf(port))) {
                charsetDecoder = this.portCharsets.get(Integer.valueOf(port)).get();
            }
            while (ioBuffer.hasRemaining()) {
                newArrayList.clear();
                for (int i = 0; i < this.batchSize && ioBuffer.hasRemaining(); i++) {
                    if (this.lineSplitter.parseLine(ioBuffer, ioBuffer2, parsedBuffer)) {
                        Event parseEvent = parseEvent(parsedBuffer, charsetDecoder);
                        if (this.portHeader != null) {
                            parseEvent.getHeaders().put(this.portHeader, String.valueOf(port));
                        }
                        newArrayList.add(parseEvent);
                    } else {
                        MultiportSyslogTCPSource.logger.trace("Parsed null event");
                    }
                }
                if (newArrayList.isEmpty()) {
                    MultiportSyslogTCPSource.logger.trace("Empty set!");
                    return;
                }
                int size = newArrayList.size();
                this.sourceCounter.addToEventReceivedCount(size);
                try {
                    this.channelProcessor.processEventBatch(newArrayList);
                    this.sourceCounter.addToEventAcceptedCount(size);
                } catch (Throwable th) {
                    MultiportSyslogTCPSource.logger.error("Error writing to channel, event dropped", th);
                    if (th instanceof Error) {
                        Throwables.propagate(th);
                    }
                }
            }
        }

        Event parseEvent(ParsedBuffer parsedBuffer, CharsetDecoder charsetDecoder) {
            Event withBody;
            try {
                String string = parsedBuffer.buffer.getString(charsetDecoder);
                MultiportSyslogTCPSource.logger.trace("Seen raw event: {}", string);
                try {
                    withBody = this.syslogParser.parseMessage(string, charsetDecoder.charset(), this.keepFields);
                    if (parsedBuffer.incomplete) {
                        withBody.getHeaders().put(SyslogUtils.EVENT_STATUS, SyslogUtils.SyslogStatus.INCOMPLETE.getSyslogStatus());
                    }
                } catch (IllegalArgumentException e) {
                    withBody = EventBuilder.withBody(string, charsetDecoder.charset());
                    withBody.getHeaders().put(SyslogUtils.EVENT_STATUS, SyslogUtils.SyslogStatus.INVALID.getSyslogStatus());
                    MultiportSyslogTCPSource.logger.debug("Error parsing syslog event", (Throwable) e);
                }
                return withBody;
            } catch (Throwable th) {
                MultiportSyslogTCPSource.logger.info("Error decoding line with charset (" + charsetDecoder.charset() + "). Exception follows.", th);
                if (th instanceof Error) {
                    Throwables.propagate(th);
                }
                byte[] bArr = new byte[parsedBuffer.buffer.remaining()];
                parsedBuffer.buffer.get(bArr);
                Event withBody2 = EventBuilder.withBody(bArr);
                withBody2.getHeaders().put(SyslogUtils.EVENT_STATUS, SyslogUtils.SyslogStatus.INVALID.getSyslogStatus());
                return withBody2;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flume/source/MultiportSyslogTCPSource$ParsedBuffer.class */
    public static class ParsedBuffer {
        public IoBuffer buffer = null;
        public boolean incomplete = false;

        ParsedBuffer() {
        }
    }

    /* loaded from: input_file:org/apache/flume/source/MultiportSyslogTCPSource$ThreadSafeDecoder.class */
    static class ThreadSafeDecoder extends ThreadLocal<CharsetDecoder> {
        private final Charset charset;

        public ThreadSafeDecoder(Charset charset) {
            this.charset = charset;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public CharsetDecoder initialValue() {
            return this.charset.newDecoder();
        }
    }

    @Override // org.apache.flume.conf.Configurable
    public void configure(Context context) {
        String string = context.getString(SyslogSourceConfigurationConstants.CONFIG_PORTS);
        Preconditions.checkNotNull(string, "Must define config parameter for MultiportSyslogTCPSource: ports");
        for (String str : string.split("\\s+")) {
            this.ports.add(Integer.valueOf(Integer.parseInt(str)));
        }
        this.host = context.getString("host");
        this.numProcessors = context.getInteger(SyslogSourceConfigurationConstants.CONFIG_NUMPROCESSORS);
        this.maxEventSize = context.getInteger(SyslogSourceConfigurationConstants.CONFIG_EVENTSIZE, SyslogUtils.DEFAULT_SIZE).intValue();
        String string2 = context.getString(SyslogSourceConfigurationConstants.CONFIG_CHARSET, "UTF-8");
        try {
            this.defaultCharset = Charset.forName(string2);
            this.defaultDecoder = new ThreadSafeDecoder(this.defaultCharset);
            this.portCharsets.clear();
            Iterator it = context.getSubProperties(SyslogSourceConfigurationConstants.CONFIG_PORT_CHARSET_PREFIX).entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                String str2 = (String) entry.getKey();
                String str3 = (String) entry.getValue();
                Integer valueOf = Integer.valueOf(Integer.parseInt(str2));
                Preconditions.checkNotNull(valueOf, "Invalid port number in config");
                try {
                    this.portCharsets.put(valueOf, new ThreadSafeDecoder(Charset.forName(str3)));
                } catch (Exception e) {
                    throw new IllegalArgumentException("Unable to parse charset string (" + str3 + ") from port configuration.", e);
                }
            }
            this.batchSize = context.getInteger("batchSize", 100).intValue();
            this.portHeader = context.getString(SyslogSourceConfigurationConstants.CONFIG_PORT_HEADER);
            this.readBufferSize = context.getInteger(SyslogSourceConfigurationConstants.CONFIG_READBUF_SIZE, 1024).intValue();
            this.keepFields = SyslogUtils.chooseFieldsToKeep(context.getString(SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS, "none"));
            if (this.sourceCounter == null) {
                this.sourceCounter = new SourceCounter(getName());
            }
        } catch (Exception e2) {
            throw new IllegalArgumentException("Unable to parse charset string (" + string2 + ") from port configuration.", e2);
        }
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void start() {
        InetSocketAddress inetSocketAddress;
        logger.info("Starting {}...", this);
        if (this.numProcessors != null) {
            this.acceptor = new NioSocketAcceptor(this.numProcessors.intValue());
        } else {
            this.acceptor = new NioSocketAcceptor();
        }
        this.acceptor.setReuseAddress(true);
        this.acceptor.getSessionConfig().setReadBufferSize(this.readBufferSize);
        this.acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
        this.acceptor.setHandler(new MultiportSyslogHandler(this.maxEventSize, this.batchSize, getChannelProcessor(), this.sourceCounter, this.portHeader, this.defaultDecoder, this.portCharsets, this.keepFields));
        Iterator<Integer> it = this.ports.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            try {
                inetSocketAddress = this.host != null ? new InetSocketAddress(this.host, intValue) : new InetSocketAddress(intValue);
                this.acceptor.bind(inetSocketAddress);
            } catch (IOException e) {
                logger.error("Could not bind to address: " + String.valueOf(inetSocketAddress), (Throwable) e);
            }
        }
        this.sourceCounter.start();
        super.start();
        logger.info("{} started.", this);
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void stop() {
        logger.info("Stopping {}...", this);
        this.acceptor.unbind();
        this.acceptor.dispose();
        this.sourceCounter.stop();
        super.stop();
        logger.info("{} stopped. Metrics: {}", this, this.sourceCounter);
    }

    @Override // org.apache.flume.source.AbstractSource
    public String toString() {
        return "Multiport Syslog TCP source " + getName();
    }
}
