package org.apache.flume.sink.kafka;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.instrumentation.kafka.KafkaSinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.tools.FlumeSendAlarmMgr;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/sink/kafka/KafkaSink.class */
public class KafkaSink extends AbstractSink implements Configurable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSink.class);
    private KafkaProducer<String, byte[]> producer;
    private String topic;
    private int batchSize;
    private List<Future<RecordMetadata>> kafkaFutures;
    private KafkaSinkCounter counter;
    private boolean useAvroEventFormat;
    private Properties kafkaProps = new Properties();
    private String partitionHeader = null;
    private Integer staticPartitionId = null;
    private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer = Optional.absent();
    private Optional<ByteArrayOutputStream> tempOutStream = Optional.absent();
    private int sinkFailCount = 0;
    private int sinkCurrentFailCount = 0;
    private FlumeSendAlarmMgr sendAlarmMgr = FlumeSendAlarmMgr.getInstance();
    private boolean isSendAlarm = false;
    private final int CAUSE_ID_39009 = 39009;
    private int monTime = 0;
    private BinaryEncoder encoder = null;

    public String getTopic() {
        return this.topic;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public Sink.Status process() throws EventDeliveryException {
        String str;
        this.counter.setUpdateTime();
        Sink.Status status = Sink.Status.READY;
        Channel channel = getChannel();
        Transaction transaction = null;
        long j = 0;
        try {
            try {
                long j2 = 0;
                Transaction transaction2 = channel.getTransaction();
                transaction2.begin();
                this.kafkaFutures.clear();
                long nanoTime = System.nanoTime();
                while (true) {
                    if (j2 >= this.batchSize) {
                        break;
                    }
                    Event take = channel.take();
                    if (take != null) {
                        byte[] body = take.getBody();
                        Map headers = take.getHeaders();
                        String str2 = (String) headers.get(KafkaSinkConstants.TOPIC_HEADER);
                        if (str2 == null) {
                            str2 = this.topic;
                        }
                        String str3 = (String) headers.get(KafkaSinkConstants.KEY_HEADER);
                        if (StringUtils.isEmpty(str3)) {
                            str3 = String.valueOf(Math.random());
                        }
                        if (logger.isDebugEnabled()) {
                            logger.debug("{Event} " + str2 + " : " + str3 + " : " + new String(body, "UTF-8"));
                            logger.debug("event #{}", Long.valueOf(j2));
                        }
                        long currentTimeMillis = System.currentTimeMillis();
                        Integer num = null;
                        try {
                            if (this.staticPartitionId != null) {
                                num = this.staticPartitionId;
                            }
                            if (this.partitionHeader != null && (str = (String) take.getHeaders().get(this.partitionHeader)) != null) {
                                num = Integer.valueOf(Integer.parseInt(str));
                            }
                            this.kafkaFutures.add(this.producer.send(num != null ? new ProducerRecord(str2, num, str3, serializeEvent(take, this.useAvroEventFormat)) : new ProducerRecord(str2, str3, serializeEvent(take, this.useAvroEventFormat)), new SinkCallback(currentTimeMillis)));
                            j += body.length;
                            j2++;
                        } catch (NumberFormatException e) {
                            throw new EventDeliveryException("Non integer partition id specified", e);
                        } catch (Exception e2) {
                            throw new EventDeliveryException("Could not send event", e2);
                        }
                    } else if (j2 == 0) {
                        status = Sink.Status.BACKOFF;
                        this.counter.incrementBatchEmptyCount();
                    } else {
                        this.counter.incrementBatchUnderflowCount();
                    }
                }
                if (j2 == this.batchSize) {
                    this.counter.incrementBatchCompleteCount();
                }
                this.producer.flush();
                if (j2 > 0) {
                    Iterator<Future<RecordMetadata>> it = this.kafkaFutures.iterator();
                    while (it.hasNext()) {
                        it.next().get();
                    }
                    this.counter.addToKafkaEventSendTimer((System.nanoTime() - nanoTime) / 1000000);
                    this.counter.addToEventDrainSuccessCount(Long.valueOf(this.kafkaFutures.size()).longValue());
                    initCountAndSendAlarm();
                }
                transaction2.commit();
                this.counter.addToSizeCompleteCount(j);
                initCountAndSendAlarm();
                if (transaction2 != null) {
                    transaction2.close();
                    this.counter.setUpdateTime();
                }
                return status;
            } catch (Throwable th) {
                this.counter.incrementConnectionFailedCount();
                increseCountAndSendAlarm();
                logger.error("Failed to publish events", th);
                Sink.Status status2 = Sink.Status.BACKOFF;
                if (0 != 0) {
                    try {
                        this.kafkaFutures.clear();
                        transaction.rollback();
                        this.counter.incrementRollbackCount();
                    } catch (Exception e3) {
                        logger.error("Transaction rollback failed", e3);
                        throw Throwables.propagate(e3);
                    }
                }
                throw new EventDeliveryException("Failed to publish events", th);
            }
        } catch (Throwable th2) {
            if (0 != 0) {
                transaction.close();
                this.counter.setUpdateTime();
            }
            throw th2;
        }
    }

    private void increseCountAndSendAlarm() {
        this.sinkCurrentFailCount++;
        if (this.sinkFailCount <= 0 || this.sinkCurrentFailCount <= this.sinkFailCount || this.isSendAlarm) {
            if (this.sinkCurrentFailCount > this.sinkFailCount) {
                this.sinkCurrentFailCount = this.sinkFailCount;
            }
        } else {
            this.sendAlarmMgr.sendSinkFailAlarm(getName(), 39009);
            this.sinkCurrentFailCount = this.sinkFailCount;
            this.isSendAlarm = true;
        }
    }

    private void initCountAndSendAlarm() {
        this.sinkCurrentFailCount = 0;
        if (this.isSendAlarm) {
            this.sendAlarmMgr.sendSinkNormalAlarm(getName(), 39009);
        }
        this.isSendAlarm = false;
    }

    public synchronized void start() {
        try {
            this.producer = new KafkaProducer<>(this.kafkaProps);
            this.counter.start();
            this.counter.setMonTime(this.monTime);
            super.start();
            this.sendAlarmMgr.sendSinkNormalAlarm(getName(), 39009);
        } catch (Exception e) {
            if (null != this.producer) {
                try {
                    this.producer.close();
                } catch (Exception e2) {
                    logger.error("Kaka sink stopped occur exception:", e2);
                }
            }
            throw new RuntimeException(e);
        }
    }

    public synchronized void stop() {
        if (null != this.producer) {
            try {
                this.producer.close();
            } catch (Exception e) {
                logger.error("Kafka Sink {} stopped,occur exception:{}", getName(), e);
            }
        }
        if (null != this.counter) {
            this.counter.stop();
        }
        logger.info("Kafka Sink {} stopped. Metrics: {}", getName(), this.counter);
        super.stop();
    }

    public void configure(Context context) {
        translateOldProps(context);
        String string = context.getString(KafkaSinkConstants.TOPIC_CONFIG);
        if (string == null || string.isEmpty()) {
            string = KafkaSinkConstants.DEFAULT_TOPIC;
            logger.warn("Topic was not specified. Using {} as the topic.", string);
        } else {
            logger.info("Using the static topic {}. This may be overridden by event headers", string);
        }
        this.topic = string;
        this.batchSize = context.getInteger(KafkaSinkConstants.BATCH_SIZE, Integer.valueOf(KafkaSinkConstants.DEFAULT_BATCH_SIZE)).intValue();
        if (logger.isDebugEnabled()) {
            logger.debug("Using batch size: {}", Integer.valueOf(this.batchSize));
        }
        this.useAvroEventFormat = context.getBoolean(KafkaSinkConstants.AVRO_EVENT, false).booleanValue();
        this.partitionHeader = context.getString(KafkaSinkConstants.PARTITION_HEADER_NAME);
        this.staticPartitionId = context.getInteger(KafkaSinkConstants.STATIC_PARTITION_CONF);
        if (logger.isDebugEnabled()) {
            logger.debug("useFlumeEventFormat set to: {}", Boolean.valueOf(this.useAvroEventFormat));
        }
        this.kafkaFutures = new LinkedList();
        String string2 = context.getString(KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServer());
        if (string2 == null || string2.isEmpty()) {
            throw new ConfigurationException("Bootstrap Servers must be specified ");
        }
        this.kafkaProps = KafkaSinkUtil.getKafkaProperties(context);
        setProducerProps(context, string2);
        logger.info("Producer properties: {}", this.kafkaProps.toString());
        if (this.counter == null) {
            this.counter = new KafkaSinkCounter(getName());
        }
        this.monTime = context.getInteger(KafkaSinkConstants.MON_TIME, 0).intValue();
        this.counter.setMonTime(this.monTime);
        this.sinkFailCount = context.getInteger("failcount", 10).intValue();
        if (this.sinkFailCount < 0) {
            throw new IllegalArgumentException("sinkFailCount is invalid,it must greater than zero");
        }
    }

    /* JADX WARN: Finally extract failed */
    private String getBootstrapServer() {
        String property = System.getProperty("flume.conf.dir");
        if (null == property) {
            property = System.getenv("user.dir");
        }
        File file = new File(property + File.separator + KafkaSinkConstants.BROKER_PROPERTIES);
        if (!file.exists()) {
            return "";
        }
        FileInputStream fileInputStream = null;
        try {
            try {
                try {
                    fileInputStream = new FileInputStream(file);
                    Properties properties = new Properties();
                    properties.load(fileInputStream);
                    String property2 = properties.getProperty("bootstrap.servers");
                    try {
                        fileInputStream.close();
                    } catch (IOException e) {
                        logger.info("Failed to close fis.");
                    }
                    return property2;
                } catch (FileNotFoundException e2) {
                    logger.info("The broker properties file is not found.");
                    try {
                        fileInputStream.close();
                        return "";
                    } catch (IOException e3) {
                        logger.info("Failed to close fis.");
                        return "";
                    }
                }
            } catch (IOException e4) {
                logger.info("IOException occurs when read the broker properties file.");
                try {
                    fileInputStream.close();
                    return "";
                } catch (IOException e5) {
                    logger.info("Failed to close fis.");
                    return "";
                }
            }
        } catch (Throwable th) {
            try {
                fileInputStream.close();
            } catch (IOException e6) {
                logger.info("Failed to close fis.");
            }
            throw th;
        }
    }

    private void translateOldProps(Context context) {
        String string;
        String string2;
        if (!context.containsKey(KafkaSinkConstants.TOPIC_CONFIG)) {
            context.put(KafkaSinkConstants.TOPIC_CONFIG, context.getString(KafkaSinkConstants.TOPIC_HEADER));
            logger.warn("{} is deprecated. Please use the parameter {}", KafkaSinkConstants.TOPIC_HEADER, KafkaSinkConstants.TOPIC_CONFIG);
        }
        if (!context.containsKey(KafkaSinkConstants.BATCH_SIZE) && (string2 = context.getString(KafkaSinkConstants.OLD_BATCH_SIZE)) != null && !string2.isEmpty()) {
            context.put(KafkaSinkConstants.BATCH_SIZE, string2);
            logger.warn("{} is deprecated. Please use the parameter {}", KafkaSinkConstants.OLD_BATCH_SIZE, KafkaSinkConstants.BATCH_SIZE);
        }
        if (!context.containsKey("kafka.producer.acks") && (string = context.getString(KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY)) != null && !string.isEmpty()) {
            context.put("kafka.producer.acks", string);
            logger.warn("{} is deprecated. Please use the parameter {}", KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY, "kafka.producer.acks");
        }
        if (context.containsKey(KafkaSinkConstants.KEY_SERIALIZER_KEY)) {
            logger.warn("{} is deprecated. Flume now uses the latest Kafka producer which implements a different interface for serializers. Please use the parameter {}", KafkaSinkConstants.KEY_SERIALIZER_KEY, "kafka.producer.key.serializer");
        }
        if (context.containsKey(KafkaSinkConstants.MESSAGE_SERIALIZER_KEY)) {
            logger.warn("{} is deprecated. Flume now uses the latest Kafka producer which implements a different interface for serializers. Please use the parameter {}", KafkaSinkConstants.MESSAGE_SERIALIZER_KEY, "kafka.producer.value.serializer");
        }
    }

    private void setProducerProps(Context context, String str) {
        this.kafkaProps.put(KafkaSinkConstants.DEFAULT_REQUIRED_ACKS, KafkaSinkConstants.DEFAULT_ACKS);
        this.kafkaProps.put("key.serializer", KafkaSinkConstants.DEFAULT_KEY_SERIALIZER);
        this.kafkaProps.put("value.serializer", KafkaSinkConstants.DEFAULT_VALUE_SERIAIZER);
        this.kafkaProps.putAll(context.getSubProperties(KafkaSinkConstants.KAFKA_PRODUCER_PREFIX));
        this.kafkaProps.put("bootstrap.servers", str);
    }

    protected Properties getKafkaProps() {
        return this.kafkaProps;
    }

    private byte[] serializeEvent(Event event, boolean z) throws IOException {
        byte[] body;
        if (z) {
            if (!this.tempOutStream.isPresent()) {
                this.tempOutStream = Optional.of(new ByteArrayOutputStream());
            }
            if (!this.writer.isPresent()) {
                this.writer = Optional.of(new SpecificDatumWriter(AvroFlumeEvent.class));
            }
            ((ByteArrayOutputStream) this.tempOutStream.get()).reset();
            AvroFlumeEvent avroFlumeEvent = new AvroFlumeEvent(toCharSeqMap(event.getHeaders()), ByteBuffer.wrap(event.getBody()));
            this.encoder = EncoderFactory.get().directBinaryEncoder((OutputStream) this.tempOutStream.get(), this.encoder);
            ((SpecificDatumWriter) this.writer.get()).write(avroFlumeEvent, this.encoder);
            this.encoder.flush();
            body = ((ByteArrayOutputStream) this.tempOutStream.get()).toByteArray();
        } else {
            body = event.getBody();
        }
        return body;
    }

    private static Map<CharSequence, CharSequence> toCharSeqMap(Map<String, String> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue());
        }
        return hashMap;
    }
}
