package org.apache.flume.source.scribe;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.scribe.Scribe;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/source/scribe/ScribeSource.class */
public class ScribeSource extends AbstractSource implements EventDrivenSource, Configurable {
    private static final Logger LOG = LoggerFactory.getLogger(ScribeSource.class);
    public static final String SCRIBE_CATEGORY = "category";
    private static final int DEFAULT_PORT = 1499;
    private static final int DEFAULT_WORKERS = 5;
    private static final int DEFAULT_MAX_READ_BUFFER_BYTES = 16384000;
    private TServer server;
    private int port;
    private int workers;
    private int maxReadBufferBytes;
    private SourceCounter sourceCounter;

    /* loaded from: input_file:org/apache/flume/source/scribe/ScribeSource$Receiver.class */
    class Receiver implements Scribe.Iface {
        Receiver() {
        }

        @Override // org.apache.flume.source.scribe.Scribe.Iface
        public ResultCode Log(List<LogEntry> list) throws TException {
            if (list != null) {
                ScribeSource.this.sourceCounter.addToEventReceivedCount(list.size());
                try {
                    ArrayList arrayList = new ArrayList(list.size());
                    for (LogEntry logEntry : list) {
                        HashMap hashMap = new HashMap(1, 1.0f);
                        String category = logEntry.getCategory();
                        if (category != null) {
                            hashMap.put(ScribeSource.SCRIBE_CATEGORY, category);
                        }
                        arrayList.add(EventBuilder.withBody(logEntry.getMessage().getBytes(), hashMap));
                    }
                    if (arrayList.size() > 0) {
                        ScribeSource.this.getChannelProcessor().processEventBatch(arrayList);
                    }
                    ScribeSource.this.sourceCounter.addToEventAcceptedCount(list.size());
                    return ResultCode.OK;
                } catch (Exception e) {
                    ScribeSource.LOG.warn("Scribe source handling failure", e);
                }
            }
            return ResultCode.TRY_LATER;
        }
    }

    /* loaded from: input_file:org/apache/flume/source/scribe/ScribeSource$Startup.class */
    private class Startup extends Thread {
        private Startup() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                Scribe.Processor processor = new Scribe.Processor(new Receiver());
                THsHaServer.Args args = new THsHaServer.Args(new TNonblockingServerSocket(ScribeSource.this.port));
                args.maxWorkerThreads(ScribeSource.this.workers);
                args.processor(processor);
                args.transportFactory(new TFramedTransport.Factory(ScribeSource.this.maxReadBufferBytes));
                args.protocolFactory(new TBinaryProtocol.Factory(false, false));
                args.maxReadBufferBytes = ScribeSource.this.maxReadBufferBytes;
                ScribeSource.this.server = new THsHaServer(args);
                ScribeSource.LOG.info("Starting Scribe Source on port " + ScribeSource.this.port);
                ScribeSource.this.server.serve();
            } catch (Exception e) {
                ScribeSource.LOG.warn("Scribe failed", e);
            }
        }
    }

    public void configure(Context context) {
        this.port = context.getInteger("port", Integer.valueOf(DEFAULT_PORT)).intValue();
        this.maxReadBufferBytes = context.getInteger("maxReadBufferBytes", Integer.valueOf(DEFAULT_MAX_READ_BUFFER_BYTES)).intValue();
        if (this.maxReadBufferBytes <= 0) {
            this.maxReadBufferBytes = DEFAULT_MAX_READ_BUFFER_BYTES;
        }
        this.workers = context.getInteger("workerThreads", Integer.valueOf(DEFAULT_WORKERS)).intValue();
        if (this.workers <= 0) {
            this.workers = DEFAULT_WORKERS;
        }
        if (this.sourceCounter == null) {
            this.sourceCounter = new SourceCounter(getName());
        }
    }

    public void start() {
        new Startup().start();
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
        }
        if (!this.server.isServing()) {
            throw new IllegalStateException("Failed initialization of ScribeSource");
        }
        this.sourceCounter.start();
        super.start();
    }

    public void stop() {
        LOG.info("Scribe source stopping");
        if (this.server != null) {
            this.server.stop();
        }
        this.sourceCounter.stop();
        super.stop();
        LOG.info("Scribe source stopped. Metrics:{}", this.sourceCounter);
    }
}
