package org.apache.kafka.clients.producer.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/clients/producer/internals/Sender.class */
public class Sender implements Runnable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Sender.class);
    private final KafkaClient client;
    private final RecordAccumulator accumulator;
    private final Metadata metadata;
    private final boolean guaranteeMessageOrder;
    private final int maxRequestSize;
    private final short acks;
    private final int retries;
    private final Time time;
    private volatile boolean running = true;
    private volatile boolean forceClose;
    private final SenderMetrics sensors;
    private String clientId;
    private final int requestTimeout;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/producer/internals/Sender$SenderMetrics.class */
    public class SenderMetrics {
        private final Metrics metrics;
        public final Sensor retrySensor;
        public final Sensor errorSensor;
        public final Sensor queueTimeSensor;
        public final Sensor requestTimeSensor;
        public final Sensor recordsPerRequestSensor;
        public final Sensor batchSizeSensor;
        public final Sensor compressionRateSensor;
        public final Sensor maxRecordSizeSensor;
        public final Sensor produceThrottleTimeSensor;

        public SenderMetrics(Metrics metrics) {
            this.metrics = metrics;
            this.batchSizeSensor = metrics.sensor(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE);
            this.batchSizeSensor.add(metrics.metricName("batch-size-avg", "producer-metrics", "The average number of bytes sent per partition per-request."), new Avg());
            this.batchSizeSensor.add(metrics.metricName("batch-size-max", "producer-metrics", "The max number of bytes sent per partition per-request."), new Max());
            this.compressionRateSensor = metrics.sensor("compression-rate");
            this.compressionRateSensor.add(metrics.metricName("compression-rate-avg", "producer-metrics", "The average compression rate of record batches."), new Avg());
            this.queueTimeSensor = metrics.sensor("queue-time");
            this.queueTimeSensor.add(metrics.metricName("record-queue-time-avg", "producer-metrics", "The average time in ms record batches spent in the record accumulator."), new Avg());
            this.queueTimeSensor.add(metrics.metricName("record-queue-time-max", "producer-metrics", "The maximum time in ms record batches spent in the record accumulator."), new Max());
            this.requestTimeSensor = metrics.sensor("request-time");
            this.requestTimeSensor.add(metrics.metricName("request-latency-avg", "producer-metrics", "The average request latency in ms"), new Avg());
            this.requestTimeSensor.add(metrics.metricName("request-latency-max", "producer-metrics", "The maximum request latency in ms"), new Max());
            this.produceThrottleTimeSensor = metrics.sensor("produce-throttle-time");
            this.produceThrottleTimeSensor.add(metrics.metricName("produce-throttle-time-avg", "producer-metrics", "The average throttle time in ms"), new Avg());
            this.produceThrottleTimeSensor.add(metrics.metricName("produce-throttle-time-max", "producer-metrics", "The maximum throttle time in ms"), new Max());
            this.recordsPerRequestSensor = metrics.sensor("records-per-request");
            this.recordsPerRequestSensor.add(metrics.metricName("record-send-rate", "producer-metrics", "The average number of records sent per second."), new Rate());
            this.recordsPerRequestSensor.add(metrics.metricName("records-per-request-avg", "producer-metrics", "The average number of records per request."), new Avg());
            this.retrySensor = metrics.sensor("record-retries");
            this.retrySensor.add(metrics.metricName("record-retry-rate", "producer-metrics", "The average per-second number of retried record sends"), new Rate());
            this.errorSensor = metrics.sensor("errors");
            this.errorSensor.add(metrics.metricName("record-error-rate", "producer-metrics", "The average per-second number of record sends that resulted in errors"), new Rate());
            this.maxRecordSizeSensor = metrics.sensor("record-size-max");
            this.maxRecordSizeSensor.add(metrics.metricName("record-size-max", "producer-metrics", "The maximum record size"), new Max());
            this.maxRecordSizeSensor.add(metrics.metricName("record-size-avg", "producer-metrics", "The average record size"), new Avg());
            this.metrics.addMetric(metrics.metricName("requests-in-flight", "producer-metrics", "The current number of in-flight requests awaiting a response."), new Measurable() { // from class: org.apache.kafka.clients.producer.internals.Sender.SenderMetrics.1
                @Override // org.apache.kafka.common.metrics.Measurable
                public double measure(MetricConfig metricConfig, long j) {
                    return Sender.this.client.inFlightRequestCount();
                }
            });
            metrics.addMetric(metrics.metricName("metadata-age", "producer-metrics", "The age in seconds of the current producer metadata being used."), new Measurable() { // from class: org.apache.kafka.clients.producer.internals.Sender.SenderMetrics.2
                @Override // org.apache.kafka.common.metrics.Measurable
                public double measure(MetricConfig metricConfig, long j) {
                    return (j - Sender.this.metadata.lastSuccessfulUpdate()) / 1000.0d;
                }
            });
        }

        private void maybeRegisterTopicMetrics(String str) {
            String str2 = "topic." + str + ".records-per-batch";
            if (this.metrics.getSensor(str2) == null) {
                Map<String, String> singletonMap = Collections.singletonMap(ConsumerProtocol.TOPIC_KEY_NAME, str);
                this.metrics.sensor(str2).add(this.metrics.metricName("record-send-rate", "producer-topic-metrics", singletonMap), new Rate());
                this.metrics.sensor("topic." + str + ".bytes").add(this.metrics.metricName("byte-rate", "producer-topic-metrics", singletonMap), new Rate());
                this.metrics.sensor("topic." + str + ".compression-rate").add(this.metrics.metricName("compression-rate", "producer-topic-metrics", singletonMap), new Avg());
                this.metrics.sensor("topic." + str + ".record-retries").add(this.metrics.metricName("record-retry-rate", "producer-topic-metrics", singletonMap), new Rate());
                this.metrics.sensor("topic." + str + ".record-errors").add(this.metrics.metricName("record-error-rate", "producer-topic-metrics", singletonMap), new Rate());
            }
        }

        public void updateProduceRequestMetrics(Map<Integer, List<RecordBatch>> map) {
            long milliseconds = Sender.this.time.milliseconds();
            Iterator<List<RecordBatch>> it = map.values().iterator();
            while (it.hasNext()) {
                int i = 0;
                for (RecordBatch recordBatch : it.next()) {
                    String str = recordBatch.topicPartition.topic();
                    maybeRegisterTopicMetrics(str);
                    ((Sensor) Utils.notNull(this.metrics.getSensor("topic." + str + ".records-per-batch"))).record(recordBatch.recordCount);
                    ((Sensor) Utils.notNull(this.metrics.getSensor("topic." + str + ".bytes"))).record(recordBatch.records.sizeInBytes());
                    ((Sensor) Utils.notNull(this.metrics.getSensor("topic." + str + ".compression-rate"))).record(recordBatch.records.compressionRate());
                    this.batchSizeSensor.record(recordBatch.records.sizeInBytes(), milliseconds);
                    this.queueTimeSensor.record(recordBatch.drainedMs - recordBatch.createdMs, milliseconds);
                    this.compressionRateSensor.record(recordBatch.records.compressionRate());
                    this.maxRecordSizeSensor.record(recordBatch.maxRecordSize, milliseconds);
                    i += recordBatch.recordCount;
                }
                this.recordsPerRequestSensor.record(i, milliseconds);
            }
        }

        public void recordRetries(String str, int i) {
            long milliseconds = Sender.this.time.milliseconds();
            this.retrySensor.record(i, milliseconds);
            Sensor sensor = this.metrics.getSensor("topic." + str + ".record-retries");
            if (sensor != null) {
                sensor.record(i, milliseconds);
            }
        }

        public void recordErrors(String str, int i) {
            long milliseconds = Sender.this.time.milliseconds();
            this.errorSensor.record(i, milliseconds);
            Sensor sensor = this.metrics.getSensor("topic." + str + ".record-errors");
            if (sensor != null) {
                sensor.record(i, milliseconds);
            }
        }

        public void recordLatency(String str, long j) {
            long milliseconds = Sender.this.time.milliseconds();
            this.requestTimeSensor.record(j, milliseconds);
            if (str.isEmpty()) {
                return;
            }
            Sensor sensor = this.metrics.getSensor("node-" + str + ".latency");
            if (sensor != null) {
                sensor.record(j, milliseconds);
            }
        }

        public void recordThrottleTime(String str, long j) {
            this.produceThrottleTimeSensor.record(j, Sender.this.time.milliseconds());
        }
    }

    public Sender(KafkaClient kafkaClient, Metadata metadata, RecordAccumulator recordAccumulator, boolean z, int i, short s, int i2, Metrics metrics, Time time, String str, int i3) {
        this.client = kafkaClient;
        this.accumulator = recordAccumulator;
        this.metadata = metadata;
        this.guaranteeMessageOrder = z;
        this.maxRequestSize = i;
        this.acks = s;
        this.retries = i2;
        this.time = time;
        this.clientId = str;
        this.sensors = new SenderMetrics(metrics);
        this.requestTimeout = i3;
    }

    @Override // java.lang.Runnable
    public void run() {
        log.debug("Starting Kafka producer I/O thread.");
        while (this.running) {
            try {
                run(this.time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", (Throwable) e);
            }
        }
        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
        while (!this.forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
            try {
                run(this.time.milliseconds());
            } catch (Exception e2) {
                log.error("Uncaught error in kafka producer I/O thread: ", (Throwable) e2);
            }
        }
        if (this.forceClose) {
            this.accumulator.abortIncompleteBatches();
        }
        try {
            this.client.close();
        } catch (Exception e3) {
            log.error("Failed to close network client", (Throwable) e3);
        }
        log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }

    void run(long j) {
        Cluster fetch = this.metadata.fetch();
        RecordAccumulator.ReadyCheckResult ready = this.accumulator.ready(fetch, j);
        if (!ready.unknownLeaderTopics.isEmpty()) {
            Iterator<String> it = ready.unknownLeaderTopics.iterator();
            while (it.hasNext()) {
                this.metadata.add(it.next());
            }
            this.metadata.requestUpdate();
        }
        Iterator<Node> it2 = ready.readyNodes.iterator();
        long j2 = Long.MAX_VALUE;
        while (it2.hasNext()) {
            Node next = it2.next();
            if (!this.client.ready(next, j)) {
                it2.remove();
                j2 = Math.min(j2, this.client.connectionDelay(next, j));
            }
        }
        Map<Integer, List<RecordBatch>> drain = this.accumulator.drain(fetch, ready.readyNodes, this.maxRequestSize, j);
        if (this.guaranteeMessageOrder) {
            Iterator<List<RecordBatch>> it3 = drain.values().iterator();
            while (it3.hasNext()) {
                Iterator<RecordBatch> it4 = it3.next().iterator();
                while (it4.hasNext()) {
                    this.accumulator.mutePartition(it4.next().topicPartition);
                }
            }
        }
        for (RecordBatch recordBatch : this.accumulator.abortExpiredBatches(this.requestTimeout, j)) {
            this.sensors.recordErrors(recordBatch.topicPartition.topic(), recordBatch.recordCount);
        }
        this.sensors.updateProduceRequestMetrics(drain);
        List<ClientRequest> createProduceRequests = createProduceRequests(drain, j);
        long min = Math.min(ready.nextReadyCheckDelayMs, j2);
        if (ready.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", ready.readyNodes);
            log.trace("Created {} produce requests: {}", Integer.valueOf(createProduceRequests.size()), createProduceRequests);
            min = 0;
        }
        Iterator<ClientRequest> it5 = createProduceRequests.iterator();
        while (it5.hasNext()) {
            this.client.send(it5.next(), j);
        }
        this.client.poll(min, j);
    }

    public void initiateClose() {
        this.accumulator.close();
        this.running = false;
        wakeup();
    }

    public void forceClose() {
        this.forceClose = true;
        initiateClose();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleProduceResponse(ClientResponse clientResponse, Map<TopicPartition, RecordBatch> map, long j) {
        int correlationId = clientResponse.request().request().header().correlationId();
        if (clientResponse.wasDisconnected()) {
            log.trace("Cancelled request {} due to node {} being disconnected", clientResponse, clientResponse.request().request().destination());
            Iterator<RecordBatch> it = map.values().iterator();
            while (it.hasNext()) {
                completeBatch(it.next(), Errors.NETWORK_EXCEPTION, -1L, -1L, correlationId, j);
            }
            return;
        }
        log.trace("Received produce response from node {} with correlation id {}", clientResponse.request().request().destination(), Integer.valueOf(correlationId));
        if (!clientResponse.hasResponse()) {
            Iterator<RecordBatch> it2 = map.values().iterator();
            while (it2.hasNext()) {
                completeBatch(it2.next(), Errors.NONE, -1L, -1L, correlationId, j);
            }
            return;
        }
        for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : new ProduceResponse(clientResponse.responseBody()).responses().entrySet()) {
            TopicPartition key = entry.getKey();
            ProduceResponse.PartitionResponse value = entry.getValue();
            completeBatch(map.get(key), Errors.forCode(value.errorCode), value.baseOffset, value.timestamp, correlationId, j);
        }
        this.sensors.recordLatency(clientResponse.request().request().destination(), clientResponse.requestLatencyMs());
        this.sensors.recordThrottleTime(clientResponse.request().request().destination(), r0.getThrottleTime());
    }

    private void completeBatch(RecordBatch recordBatch, Errors errors, long j, long j2, long j3, long j4) {
        if (errors == Errors.NONE || !canRetry(recordBatch, errors)) {
            recordBatch.done(j, j2, errors == Errors.TOPIC_AUTHORIZATION_FAILED ? new TopicAuthorizationException(recordBatch.topicPartition.topic()) : errors.exception());
            this.accumulator.deallocate(recordBatch);
            if (errors != Errors.NONE) {
                this.sensors.recordErrors(recordBatch.topicPartition.topic(), recordBatch.recordCount);
            }
        } else {
            log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", Long.valueOf(j3), recordBatch.topicPartition, Integer.valueOf((this.retries - recordBatch.attempts) - 1), errors);
            this.accumulator.reenqueue(recordBatch, j4);
            this.sensors.recordRetries(recordBatch.topicPartition.topic(), recordBatch.recordCount);
        }
        if (errors.exception() instanceof InvalidMetadataException) {
            if (errors.exception() instanceof UnknownTopicOrPartitionException) {
                log.warn("Received unknown topic or partition error in produce request on partition {}. The topic/partition may not exist or the user may not have Describe access to it", recordBatch.topicPartition);
            }
            this.metadata.requestUpdate();
        }
        if (this.guaranteeMessageOrder) {
            this.accumulator.unmutePartition(recordBatch.topicPartition);
        }
    }

    private boolean canRetry(RecordBatch recordBatch, Errors errors) {
        return recordBatch.attempts < this.retries && (errors.exception() instanceof RetriableException);
    }

    private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> map, long j) {
        ArrayList arrayList = new ArrayList(map.size());
        for (Map.Entry<Integer, List<RecordBatch>> entry : map.entrySet()) {
            arrayList.add(produceRequest(j, entry.getKey().intValue(), this.acks, this.requestTimeout, entry.getValue()));
        }
        return arrayList;
    }

    private ClientRequest produceRequest(long j, int i, short s, int i2, List<RecordBatch> list) {
        HashMap hashMap = new HashMap(list.size());
        final HashMap hashMap2 = new HashMap(list.size());
        for (RecordBatch recordBatch : list) {
            TopicPartition topicPartition = recordBatch.topicPartition;
            hashMap.put(topicPartition, recordBatch.records.buffer());
            hashMap2.put(topicPartition, recordBatch);
        }
        return new ClientRequest(j, s != 0, new RequestSend(Integer.toString(i), this.client.nextRequestHeader(ApiKeys.PRODUCE), new ProduceRequest(s, i2, hashMap).toStruct()), new RequestCompletionHandler() { // from class: org.apache.kafka.clients.producer.internals.Sender.1
            @Override // org.apache.kafka.clients.RequestCompletionHandler
            public void onComplete(ClientResponse clientResponse) {
                Sender.this.handleProduceResponse(clientResponse, hashMap2, Sender.this.time.milliseconds());
            }
        });
    }

    public void wakeup() {
        this.client.wakeup();
    }
}
