package org.apache.kafka.connect.runtime;

import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerSourceTask.class */
public class WorkerSourceTask extends WorkerTask {
    private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);
    private static final long SEND_FAILED_BACKOFF_MS = 100;
    private final WorkerConfig workerConfig;
    private final SourceTask task;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private KafkaProducer<byte[], byte[]> producer;
    private final OffsetStorageReader offsetReader;
    private final OffsetStorageWriter offsetWriter;
    private final Time time;
    private List<SourceRecord> toSend;
    private boolean lastSendFailed;
    private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessages;
    private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessagesBacklog;
    private boolean flushing;
    private CountDownLatch stopRequestedLatch;
    private Map<String, String> taskConfig;
    private boolean finishedStart;
    private boolean startedShutdownBeforeStartCompleted;

    public WorkerSourceTask(ConnectorTaskId connectorTaskId, SourceTask sourceTask, TaskStatus.Listener listener, TargetState targetState, Converter converter, Converter converter2, KafkaProducer<byte[], byte[]> kafkaProducer, OffsetStorageReader offsetStorageReader, OffsetStorageWriter offsetStorageWriter, WorkerConfig workerConfig, Time time) {
        super(connectorTaskId, listener, targetState);
        this.finishedStart = false;
        this.startedShutdownBeforeStartCompleted = false;
        this.workerConfig = workerConfig;
        this.task = sourceTask;
        this.keyConverter = converter;
        this.valueConverter = converter2;
        this.producer = kafkaProducer;
        this.offsetReader = offsetStorageReader;
        this.offsetWriter = offsetStorageWriter;
        this.time = time;
        this.toSend = null;
        this.lastSendFailed = false;
        this.outstandingMessages = new IdentityHashMap<>();
        this.outstandingMessagesBacklog = new IdentityHashMap<>();
        this.flushing = false;
        this.stopRequestedLatch = new CountDownLatch(1);
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void initialize(TaskConfig taskConfig) {
        try {
            this.taskConfig = taskConfig.originalsStrings();
        } catch (Throwable th) {
            log.error("Task {} failed initialization and will not be started.", th);
            onFailure(th);
        }
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    protected void close() {
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void stop() {
        super.stop();
        this.stopRequestedLatch.countDown();
        synchronized (this) {
            if (this.finishedStart) {
                this.task.stop();
            } else {
                this.startedShutdownBeforeStartCompleted = true;
            }
        }
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void execute() {
        try {
            this.task.initialize(new WorkerSourceTaskContext(this.offsetReader));
            this.task.start(this.taskConfig);
            log.info("Source task {} finished initialization and start", this);
            synchronized (this) {
                if (this.startedShutdownBeforeStartCompleted) {
                    this.task.stop();
                    commitOffsets();
                    return;
                }
                this.finishedStart = true;
                while (!isStopping()) {
                    if (shouldPause()) {
                        awaitUnpause();
                    } else {
                        if (this.toSend == null) {
                            log.debug("Nothing to send to Kafka. Polling source for additional records");
                            this.toSend = this.task.poll();
                        }
                        if (this.toSend != null) {
                            log.debug("About to send " + this.toSend.size() + " records to Kafka");
                            if (!sendRecords()) {
                                this.stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS);
                            }
                        }
                    }
                }
                commitOffsets();
            }
        } catch (InterruptedException e) {
            commitOffsets();
        } catch (Throwable th) {
            commitOffsets();
            throw th;
        }
    }

    private boolean sendRecords() {
        int i = 0;
        for (final SourceRecord sourceRecord : this.toSend) {
            final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(sourceRecord.topic(), sourceRecord.kafkaPartition(), this.keyConverter.fromConnectData(sourceRecord.topic(), sourceRecord.keySchema(), sourceRecord.key()), this.valueConverter.fromConnectData(sourceRecord.topic(), sourceRecord.valueSchema(), sourceRecord.value()));
            log.trace("Appending record with key {}, value {}", sourceRecord.key(), sourceRecord.value());
            synchronized (this) {
                if (!this.lastSendFailed) {
                    if (this.flushing) {
                        this.outstandingMessagesBacklog.put(producerRecord, producerRecord);
                    } else {
                        this.outstandingMessages.put(producerRecord, producerRecord);
                    }
                    this.offsetWriter.offset(sourceRecord.sourcePartition(), sourceRecord.sourceOffset());
                }
            }
            try {
                this.producer.send(producerRecord, new Callback() { // from class: org.apache.kafka.connect.runtime.WorkerSourceTask.1
                    public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                        if (exc != null) {
                            WorkerSourceTask.log.error("{} failed to send record to {}: {}", new Object[]{WorkerSourceTask.this.id, sourceRecord.topic(), exc});
                            WorkerSourceTask.log.debug("Failed record: topic {}, Kafka partition {}, key {}, value {}, source offset {}, source partition {}", new Object[]{sourceRecord.topic(), sourceRecord.kafkaPartition(), sourceRecord.key(), sourceRecord.value(), sourceRecord.sourceOffset(), sourceRecord.sourcePartition()});
                        } else {
                            WorkerSourceTask.log.trace("Wrote record successfully: topic {} partition {} offset {}", new Object[]{recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset())});
                            WorkerSourceTask.this.commitTaskRecord(sourceRecord);
                        }
                        WorkerSourceTask.this.recordSent(producerRecord);
                    }
                });
                this.lastSendFailed = false;
                i++;
            } catch (KafkaException e) {
                throw new ConnectException("Unrecoverable exception trying to send", e);
            } catch (RetriableException e2) {
                log.warn("Failed to send {}, backing off before retrying:", producerRecord, e2);
                this.toSend = this.toSend.subList(i, this.toSend.size());
                this.lastSendFailed = true;
                return false;
            }
        }
        this.toSend = null;
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commitTaskRecord(SourceRecord sourceRecord) {
        try {
            this.task.commitRecord(sourceRecord);
        } catch (InterruptedException e) {
            log.error("Exception thrown", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void recordSent(ProducerRecord<byte[], byte[]> producerRecord) {
        ProducerRecord<byte[], byte[]> remove = this.outstandingMessages.remove(producerRecord);
        if (remove == null && this.flushing) {
            remove = this.outstandingMessagesBacklog.remove(producerRecord);
        }
        if (remove == null) {
            log.error("CRITICAL Saw callback for record that was not present in the outstanding message set: {}", producerRecord);
        } else if (this.flushing && this.outstandingMessages.isEmpty()) {
            notifyAll();
        }
    }

    public boolean commitOffsets() {
        long longValue = this.workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG).longValue();
        log.debug("{} Committing offsets", this);
        long milliseconds = this.time.milliseconds();
        long j = milliseconds + longValue;
        synchronized (this) {
            this.flushing = true;
            boolean beginFlush = this.offsetWriter.beginFlush();
            log.debug("{} flushing {} outstanding messages for offset commit", this, Integer.valueOf(this.outstandingMessages.size()));
            while (!this.outstandingMessages.isEmpty()) {
                try {
                    long milliseconds2 = j - this.time.milliseconds();
                    if (milliseconds2 <= 0) {
                        log.error("Failed to flush {}, timed out while waiting for producer to flush outstanding {} messages", this, Integer.valueOf(this.outstandingMessages.size()));
                        finishFailedFlush();
                        return false;
                    }
                    wait(milliseconds2);
                } catch (InterruptedException e) {
                    log.error("{} Interrupted while flushing messages, offsets will not be committed", this);
                    finishFailedFlush();
                    return false;
                }
            }
            if (!beginFlush) {
                finishSuccessfulFlush();
                log.debug("Finished {} offset commitOffsets successfully in {} ms", this, Long.valueOf(this.time.milliseconds() - milliseconds));
                commitSourceTask();
                return true;
            }
            Future<Void> doFlush = this.offsetWriter.doFlush(new org.apache.kafka.connect.util.Callback<Void>() { // from class: org.apache.kafka.connect.runtime.WorkerSourceTask.2
                @Override // org.apache.kafka.connect.util.Callback
                public void onCompletion(Throwable th, Void r7) {
                    if (th != null) {
                        WorkerSourceTask.log.error("Failed to flush {} offsets to storage: ", this, th);
                    } else {
                        WorkerSourceTask.log.trace("Finished flushing {} offsets to storage", this);
                    }
                }
            });
            if (doFlush == null) {
                finishFailedFlush();
                return false;
            }
            try {
                doFlush.get(Math.max(j - this.time.milliseconds(), 0L), TimeUnit.MILLISECONDS);
                finishSuccessfulFlush();
                log.info("Finished {} commitOffsets successfully in {} ms", this, Long.valueOf(this.time.milliseconds() - milliseconds));
                commitSourceTask();
                return true;
            } catch (InterruptedException e2) {
                log.warn("Flush of {} offsets interrupted, cancelling", this);
                finishFailedFlush();
                return false;
            } catch (ExecutionException e3) {
                log.error("Flush of {} offsets threw an unexpected exception: ", this, e3);
                finishFailedFlush();
                return false;
            } catch (TimeoutException e4) {
                log.error("Timed out waiting to flush {} offsets to storage", this);
                finishFailedFlush();
                return false;
            }
        }
    }

    private void commitSourceTask() {
        try {
            this.task.commit();
        } catch (InterruptedException e) {
            log.warn("Commit interrupted", e);
        } catch (Throwable th) {
            log.error("Exception thrown while calling task.commit()", th);
        }
    }

    private synchronized void finishFailedFlush() {
        this.offsetWriter.cancelFlush();
        this.outstandingMessages.putAll(this.outstandingMessagesBacklog);
        this.outstandingMessagesBacklog.clear();
        this.flushing = false;
    }

    private synchronized void finishSuccessfulFlush() {
        IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> identityHashMap = this.outstandingMessages;
        this.outstandingMessages = this.outstandingMessagesBacklog;
        this.outstandingMessagesBacklog = identityHashMap;
        this.flushing = false;
    }

    public String toString() {
        return "WorkerSourceTask{id=" + this.id + '}';
    }
}
