package org.apache.flink.connector.jdbc.core.datastream.source.reader;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceOptions;
import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReader.class */
public class JdbcSourceSplitReader<T> implements SplitReader<RecordAndOffset<T>, JdbcSourceSplit>, ResultTypeQueryable<T> {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceSplitReader.class);
    private final Configuration config;
    private final TypeInformation<T> typeInformation;
    private final JdbcConnectionProvider connectionProvider;
    private transient Connection connection;
    private transient PreparedStatement statement;
    private final Boolean autoCommit;
    private transient ResultSet resultSet;
    private int currentSplitOffset;
    private final ResultExtractor<T> resultExtractor;
    private final DeliveryGuarantee deliveryGuarantee;
    private final int splitReaderFetchBatchSize;
    private final int resultSetType;
    private final int resultSetConcurrency;
    private final int resultSetFetchSize;
    private final SourceReaderContext context;

    @Nullable
    private JdbcSourceSplit skippedSplit;
    private final Queue<JdbcSourceSplit> splits = new ArrayDeque();
    protected boolean hasNextRecordCurrentSplit = false;

    @Nullable
    private JdbcSourceSplit currentSplit = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReader$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$connector$base$DeliveryGuarantee = new int[DeliveryGuarantee.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$connector$base$DeliveryGuarantee[DeliveryGuarantee.EXACTLY_ONCE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$connector$base$DeliveryGuarantee[DeliveryGuarantee.AT_LEAST_ONCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$connector$base$DeliveryGuarantee[DeliveryGuarantee.NONE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public JdbcSourceSplitReader(SourceReaderContext sourceReaderContext, Configuration configuration, TypeInformation<T> typeInformation, JdbcConnectionProvider jdbcConnectionProvider, DeliveryGuarantee deliveryGuarantee, ResultExtractor<T> resultExtractor) {
        this.context = (SourceReaderContext) Preconditions.checkNotNull(sourceReaderContext);
        this.config = (Configuration) Preconditions.checkNotNull(configuration);
        this.typeInformation = (TypeInformation) Preconditions.checkNotNull(typeInformation);
        this.connectionProvider = (JdbcConnectionProvider) Preconditions.checkNotNull(jdbcConnectionProvider);
        this.resultSetType = configuration.getInteger(JdbcSourceOptions.RESULTSET_TYPE);
        this.resultSetConcurrency = configuration.getInteger(JdbcSourceOptions.RESULTSET_CONCURRENCY);
        this.resultSetFetchSize = configuration.getInteger(JdbcSourceOptions.RESULTSET_FETCH_SIZE);
        this.autoCommit = Boolean.valueOf(configuration.getBoolean(JdbcSourceOptions.AUTO_COMMIT));
        this.deliveryGuarantee = (DeliveryGuarantee) Preconditions.checkNotNull(deliveryGuarantee);
        int integer = configuration.getInteger(JdbcSourceOptions.READER_FETCH_BATCH_SIZE);
        Preconditions.checkArgument(integer > 0 && integer < Integer.MAX_VALUE);
        this.splitReaderFetchBatchSize = integer;
        this.resultExtractor = (ResultExtractor) Preconditions.checkNotNull(resultExtractor);
        this.currentSplitOffset = 0;
    }

    public RecordsWithSplitIds<RecordAndOffset<T>> fetch() throws IOException {
        if (!checkSplitOrStartNext()) {
            return new RecordsBySplits.Builder().build();
        }
        if (!this.hasNextRecordCurrentSplit) {
            return finishSplit();
        }
        RecordsBySplits.Builder builder = new RecordsBySplits.Builder();
        Preconditions.checkState(this.currentSplit != null, "currentSplit");
        int i = this.splitReaderFetchBatchSize;
        while (i > 0 && this.hasNextRecordCurrentSplit) {
            try {
                T extract = this.resultExtractor.extract(this.resultSet);
                JdbcSourceSplit jdbcSourceSplit = this.currentSplit;
                int i2 = this.currentSplitOffset + 1;
                this.currentSplitOffset = i2;
                builder.add(jdbcSourceSplit, new RecordAndOffset(extract, i2, 0L));
                i--;
                this.hasNextRecordCurrentSplit = this.resultSet.next();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        if (!this.hasNextRecordCurrentSplit) {
            this.currentSplitOffset = 0;
            builder.addFinishedSplit(this.currentSplit.splitId());
            closeResultSetAndStatement();
        }
        return builder.build();
    }

    private RecordsWithSplitIds<RecordAndOffset<T>> finishSplit() {
        closeResultSetAndStatement();
        RecordsBySplits.Builder builder = new RecordsBySplits.Builder();
        JdbcSourceSplit jdbcSourceSplit = Objects.nonNull(this.currentSplit) ? this.currentSplit : this.skippedSplit;
        Preconditions.checkState(jdbcSourceSplit != null, "Split to finish mustn't be null.");
        builder.addFinishedSplit(jdbcSourceSplit.splitId());
        this.currentSplit = null;
        this.skippedSplit = null;
        return builder.build();
    }

    private void closeResultSetAndStatement() {
        closeResultSetIfNeeded();
        closeStatementIfNeeded();
    }

    private void closeResultSetIfNeeded() {
        try {
            if (this.resultSet != null && !this.resultSet.isClosed()) {
                this.resultSet.close();
            }
            this.resultSet = null;
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    private void closeStatementIfNeeded() {
        try {
            if (this.statement != null && !this.statement.isClosed()) {
                this.statement.close();
            }
            this.statement = null;
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    public void handleSplitsChanges(SplitsChange<JdbcSourceSplit> splitsChange) {
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        LOG.debug("Handling split change {}", splitsChange);
        this.splits.addAll(splitsChange.splits());
    }

    public void wakeUp() {
    }

    public void close() throws Exception {
        closeResultSetAndStatement();
        if (this.connection != null && !this.connection.isClosed()) {
            this.connection.close();
        }
        this.connection = null;
        this.currentSplit = null;
    }

    public TypeInformation<T> getProducedType() {
        return this.typeInformation;
    }

    @VisibleForTesting
    public List<JdbcSourceSplit> getSplits() {
        return Collections.unmodifiableList(Arrays.asList((JdbcSourceSplit[]) this.splits.toArray(new JdbcSourceSplit[0])));
    }

    @VisibleForTesting
    public Connection getConnection() {
        return this.connection;
    }

    @VisibleForTesting
    public PreparedStatement getStatement() {
        return this.statement;
    }

    @VisibleForTesting
    public ResultSet getResultSet() {
        return this.resultSet;
    }

    private boolean checkSplitOrStartNext() {
        try {
            if (this.hasNextRecordCurrentSplit && this.resultSet != null) {
                return true;
            }
            JdbcSourceSplit poll = this.splits.poll();
            if (poll == null) {
                return false;
            }
            this.currentSplit = poll;
            openResultSetForSplit(this.currentSplit);
            return true;
        } catch (ClassNotFoundException | SQLException e) {
            throw new RuntimeException(e);
        }
    }

    private void getOrEstablishConnection() throws SQLException, ClassNotFoundException {
        this.connection = this.connectionProvider.getOrEstablishConnection();
        if (this.autoCommit == null || this.autoCommit.booleanValue() == this.connection.getAutoCommit()) {
            return;
        }
        this.connection.setAutoCommit(this.autoCommit.booleanValue());
    }

    private void openResultSetForSplit(JdbcSourceSplit jdbcSourceSplit) throws SQLException, ClassNotFoundException {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$connector$base$DeliveryGuarantee[this.deliveryGuarantee.ordinal()]) {
            case 1:
                openResultSetForSplitWhenExactlyOnce(jdbcSourceSplit);
                return;
            case 2:
                openResultSetForSplitWhenAtLeastOnce(jdbcSourceSplit);
                return;
            case 3:
            default:
                openResultSetForSplitWhenAtMostOnce(jdbcSourceSplit);
                return;
        }
    }

    private void openResultSetForSplitWhenAtMostOnce(JdbcSourceSplit jdbcSourceSplit) throws SQLException, ClassNotFoundException {
        if (jdbcSourceSplit.getReaderPosition() == 0) {
            openResultSetForSplitWhenAtLeastOnce(jdbcSourceSplit);
            return;
        }
        this.skippedSplit = this.currentSplit;
        this.currentSplit = null;
        this.hasNextRecordCurrentSplit = false;
        this.currentSplitOffset = 0;
        closeResultSetAndStatement();
    }

    private void openResultSetForSplitWhenExactlyOnce(JdbcSourceSplit jdbcSourceSplit) throws SQLException, ClassNotFoundException {
        getOrEstablishConnection();
        closeResultSetIfNeeded();
        prepareStatement(jdbcSourceSplit);
        this.resultSet = this.statement.executeQuery();
        this.currentSplitOffset = 0;
        this.hasNextRecordCurrentSplit = this.resultSet.next();
        if (this.hasNextRecordCurrentSplit) {
            moveResultSetCursorByOffset();
        }
    }

    private void moveResultSetCursorByOffset() throws SQLException {
        int readerPosition = this.currentSplit.getReaderPosition();
        if (readerPosition == 0) {
            return;
        }
        this.resultSet.last();
        int row = this.resultSet.getRow();
        this.resultSet.absolute(1);
        if (readerPosition < row) {
            this.currentSplitOffset = readerPosition;
            this.resultSet.absolute(readerPosition + 1);
        } else {
            this.hasNextRecordCurrentSplit = false;
            LOG.warn("The offset will not be set from splitState, because the last cursor is {}, the expected cursor is {}.", Integer.valueOf(row), Integer.valueOf(readerPosition + 1));
        }
    }

    private void openResultSetForSplitWhenAtLeastOnce(JdbcSourceSplit jdbcSourceSplit) throws SQLException, ClassNotFoundException {
        getOrEstablishConnection();
        closeResultSetIfNeeded();
        prepareStatement(jdbcSourceSplit);
        this.resultSet = this.statement.executeQuery();
        this.hasNextRecordCurrentSplit = this.resultSet.next();
        this.currentSplitOffset = 0;
    }

    private void prepareStatement(JdbcSourceSplit jdbcSourceSplit) throws SQLException {
        closeStatementIfNeeded();
        this.statement = this.connection.prepareStatement(jdbcSourceSplit.getSqlTemplate(), this.resultSetType, this.resultSetConcurrency);
        if (jdbcSourceSplit.getParameters() != null) {
            Object[] parameters = jdbcSourceSplit.getParameters();
            for (int i = 0; i < parameters.length; i++) {
                this.statement.setObject(i + 1, parameters[i]);
            }
        }
        this.statement.setFetchSize(this.resultSetFetchSize);
    }
}
