package org.apache.flink.connector.mongodb.source.reader.split;

import com.mongodb.MongoException;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCursor;
import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
import org.apache.flink.connector.mongodb.common.utils.MongoUtils;
import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
import org.apache.flink.connector.mongodb.source.reader.MongoSourceReaderContext;
import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
import org.apache.flink.util.CollectionUtil;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.class */
public class MongoScanSourceSplitReader implements MongoSourceSplitReader<MongoSourceSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoScanSourceSplitReader.class);
    private final MongoConnectionOptions connectionOptions;
    private final MongoReadOptions readOptions;
    private final MongoSourceReaderContext readerContext;

    @Nullable
    private final List<String> projectedFields;
    private final Bson filter;
    private boolean closed = false;
    private boolean finished = false;
    private MongoClient mongoClient;
    private MongoCursor<BsonDocument> currentCursor;
    private MongoScanSourceSplit currentSplit;

    public MongoScanSourceSplitReader(MongoConnectionOptions mongoConnectionOptions, MongoReadOptions mongoReadOptions, @Nullable List<String> list, Bson bson, MongoSourceReaderContext mongoSourceReaderContext) {
        this.connectionOptions = mongoConnectionOptions;
        this.readOptions = mongoReadOptions;
        this.projectedFields = list;
        this.filter = bson;
        this.readerContext = mongoSourceReaderContext;
    }

    public RecordsWithSplitIds<BsonDocument> fetch() throws IOException {
        if (this.closed) {
            throw new IllegalStateException("Cannot fetch records from a closed split reader");
        }
        RecordsBySplits.Builder builder = new RecordsBySplits.Builder();
        if (this.currentSplit == null) {
            return builder.build();
        }
        if (this.readerContext.isOverLimit()) {
            builder.addFinishedSplit(this.currentSplit.splitId());
            this.currentSplit = null;
            this.finished = true;
            return builder.build();
        }
        this.currentCursor = getOrCreateCursor();
        int fetchSize = this.readOptions.getFetchSize();
        int i = 0;
        while (true) {
            if (i >= fetchSize) {
                break;
            }
            try {
                try {
                    if (!this.currentCursor.hasNext()) {
                        builder.addFinishedSplit(this.currentSplit.splitId());
                        this.finished = true;
                        break;
                    }
                    builder.add(this.currentSplit, (BsonDocument) this.currentCursor.next());
                    this.readerContext.getReadCount().incrementAndGet();
                    if (this.readerContext.isOverLimit()) {
                        builder.addFinishedSplit(this.currentSplit.splitId());
                        this.finished = true;
                        break;
                    }
                    i++;
                } catch (MongoException e) {
                    throw new IOException("Scan records form MongoDB failed", e);
                }
            } catch (Throwable th) {
                if (this.finished) {
                    closeCursor();
                }
                throw th;
            }
        }
        RecordsBySplits build = builder.build();
        if (this.finished) {
            closeCursor();
        }
        return build;
    }

    public void handleSplitsChanges(SplitsChange<MongoSourceSplit> splitsChange) {
        LOG.debug("Handle split changes {}", splitsChange);
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        MongoSourceSplit mongoSourceSplit = (MongoSourceSplit) splitsChange.splits().get(0);
        if (!(mongoSourceSplit instanceof MongoScanSourceSplit)) {
            throw new UnsupportedOperationException(String.format("The SourceSplit type of %s is not supported.", mongoSourceSplit.getClass()));
        }
        this.currentSplit = (MongoScanSourceSplit) mongoSourceSplit;
        this.finished = false;
    }

    public void wakeUp() {
        closeCursor();
    }

    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        closeCursor();
    }

    private MongoCursor<BsonDocument> getOrCreateCursor() {
        if (this.currentCursor == null) {
            LOG.debug("Opened cursor for split: {}", this.currentSplit);
            this.mongoClient = MongoClients.create(this.connectionOptions.getUri());
            FindIterable noCursorTimeout = this.mongoClient.getDatabase(this.connectionOptions.getDatabase()).getCollection(this.connectionOptions.getCollection(), BsonDocument.class).find(this.filter).min(this.currentSplit.getMin()).max(this.currentSplit.getMax()).hint(this.currentSplit.getHint()).noCursorTimeout(this.readOptions.isNoCursorTimeout());
            if (this.currentSplit.getOffset() > 0) {
                noCursorTimeout.skip(this.currentSplit.getOffset());
            }
            if (this.readerContext.isLimitPushedDown()) {
                noCursorTimeout.limit(this.readerContext.getLimit());
            }
            if (!CollectionUtil.isNullOrEmpty(this.projectedFields)) {
                noCursorTimeout.projection(MongoUtils.project(this.projectedFields));
            }
            this.currentCursor = noCursorTimeout.cursor();
        }
        return this.currentCursor;
    }

    private void closeCursor() {
        if (this.currentCursor != null) {
            LOG.debug("Closing cursor for split: {}", this.currentSplit);
            try {
                this.currentCursor.close();
                this.currentCursor = null;
                try {
                    this.mongoClient.close();
                } finally {
                }
            } catch (Throwable th) {
                this.currentCursor = null;
                try {
                    this.mongoClient.close();
                    throw th;
                } finally {
                }
            }
        }
    }
}
