package com.ververica.cdc.connectors.base.source.reader.external;

import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.class */
public class IncrementalSourceScanFetcher implements Fetcher<SourceRecords, SourceSplitBase> {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalSourceScanFetcher.class);
    public AtomicBoolean hasNextElement = new AtomicBoolean(false);
    public AtomicBoolean reachEnd = new AtomicBoolean(false);
    private final FetchTask.Context taskContext;
    private final ExecutorService executorService;
    private volatile ChangeEventQueue<DataChangeEvent> queue;
    private volatile Throwable readException;
    private FetchTask<SourceSplitBase> snapshotSplitReadTask;
    private SnapshotSplit currentSnapshotSplit;
    private static final long READER_CLOSE_TIMEOUT_SECONDS = 30;

    public IncrementalSourceScanFetcher(FetchTask.Context context, int i) {
        this.taskContext = context;
        this.executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("debezium-snapshot-reader-" + i).build());
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.Fetcher
    public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
        this.snapshotSplitReadTask = fetchTask;
        this.currentSnapshotSplit = fetchTask.getSplit().asSnapshotSplit();
        this.taskContext.configure(this.currentSnapshotSplit);
        this.queue = this.taskContext.getQueue();
        this.hasNextElement.set(true);
        this.reachEnd.set(false);
        this.executorService.submit(() -> {
            try {
                this.snapshotSplitReadTask.execute(this.taskContext);
            } catch (Exception e) {
                LOG.error(String.format("Execute snapshot read task for snapshot split %s fail", this.currentSnapshotSplit), e);
                this.readException = e;
            }
        });
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.Fetcher
    public boolean isFinished() {
        return this.currentSnapshotSplit == null || !(this.snapshotSplitReadTask.isRunning() || this.hasNextElement.get() || !this.reachEnd.get());
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.Fetcher
    @Nullable
    public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
        checkReadException();
        if (!this.hasNextElement.get()) {
            this.reachEnd.compareAndSet(false, true);
            return null;
        }
        boolean z = false;
        boolean z2 = false;
        SourceRecord sourceRecord = null;
        SourceRecord sourceRecord2 = null;
        HashMap hashMap = new HashMap();
        while (!z2) {
            checkReadException();
            Iterator<DataChangeEvent> it = this.queue.poll().iterator();
            while (true) {
                if (it.hasNext()) {
                    SourceRecord record = it.next().getRecord();
                    if (sourceRecord == null) {
                        sourceRecord = record;
                        assertLowWatermark(sourceRecord);
                    } else if (sourceRecord2 != null || !WatermarkEvent.isHighWatermarkEvent(record)) {
                        if (z && WatermarkEvent.isEndWatermarkEvent(record)) {
                            z2 = true;
                            break;
                        }
                        if (!z) {
                            hashMap.put((Struct) record.key(), record);
                        } else if (isChangeRecordInChunkRange(record)) {
                            this.taskContext.rewriteOutputBuffer(hashMap, record);
                        }
                    } else {
                        sourceRecord2 = record;
                        z = true;
                    }
                }
            }
        }
        this.hasNextElement.set(false);
        ArrayList arrayList = new ArrayList();
        arrayList.add(sourceRecord);
        arrayList.addAll(this.taskContext.formatMessageTimestamp(hashMap.values()));
        arrayList.add(sourceRecord2);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(new SourceRecords(arrayList));
        return arrayList2.iterator();
    }

    private void checkReadException() {
        if (this.readException != null) {
            throw new FlinkRuntimeException(String.format("Read split %s error due to %s.", this.currentSnapshotSplit, this.readException.getMessage()), this.readException);
        }
    }

    @Override // com.ververica.cdc.connectors.base.source.reader.external.Fetcher
    public void close() {
        try {
            if (this.taskContext != null) {
                this.taskContext.close();
            }
            if (this.snapshotSplitReadTask != null) {
                this.snapshotSplitReadTask.close();
            }
            if (this.executorService != null) {
                this.executorService.shutdown();
                if (!this.executorService.awaitTermination(READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
                    LOG.warn("Failed to close the scan fetcher in {} seconds.", Long.valueOf(READER_CLOSE_TIMEOUT_SECONDS));
                }
            }
        } catch (Exception e) {
            LOG.error("Close scan fetcher error", e);
        }
    }

    @VisibleForTesting
    public ExecutorService getExecutorService() {
        return this.executorService;
    }

    private void assertLowWatermark(SourceRecord sourceRecord) {
        Preconditions.checkState(WatermarkEvent.isLowWatermarkEvent(sourceRecord), String.format("The first record should be low watermark signal event, but actual is %s", sourceRecord));
    }

    private boolean isChangeRecordInChunkRange(SourceRecord sourceRecord) {
        if (this.taskContext.isDataChangeRecord(sourceRecord)) {
            return this.taskContext.isRecordBetween(sourceRecord, this.currentSnapshotSplit.getSplitStart(), this.currentSnapshotSplit.getSplitEnd());
        }
        return false;
    }
}
