package org.apache.hudi.utilities.sources;

import java.io.Closeable;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.org.apache.hadoop.hbase.io.crypto.KeyProvider;
import org.apache.hudi.org.apache.hbase.thirdparty.io.netty.handler.codec.rtsp.RtspHeaders;
import org.apache.hudi.org.apache.hbase.thirdparty.io.netty.handler.ssl.ApplicationProtocolNames;
import org.apache.hudi.utilities.SqlQueryBuilder;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.storage.StorageLevel;

/* loaded from: input_file:org/apache/hudi/utilities/sources/JdbcSource.class */
public class JdbcSource extends RowSource {
    private static final Logger LOG = LogManager.getLogger(JdbcSource.class);
    private static final List<String> DB_LIMIT_CLAUSE = Arrays.asList("mysql", "postgresql", ApplicationProtocolNames.HTTP_2);
    private static final String URI_JDBC_PREFIX = "jdbc:";

    /* loaded from: input_file:org/apache/hudi/utilities/sources/JdbcSource$Config.class */
    protected static class Config {
        private static final String URL = "hoodie.deltastreamer.jdbc.url";
        private static final String URL_PROP = "url";
        private static final String USER = "hoodie.deltastreamer.jdbc.user";
        private static final String USER_PROP = "user";
        private static final String PASSWORD = "hoodie.deltastreamer.jdbc.password";
        private static final String PASSWORD_FILE = "hoodie.deltastreamer.jdbc.password.file";
        private static final String PASSWORD_PROP = "password";
        private static final String DRIVER_CLASS = "hoodie.deltastreamer.jdbc.driver.class";
        private static final String DRIVER_PROP = "driver";
        private static final String RDBMS_TABLE_NAME = "hoodie.deltastreamer.jdbc.table.name";
        private static final String RDBMS_TABLE_PROP = "dbtable";
        private static final String INCREMENTAL_COLUMN = "hoodie.deltastreamer.jdbc.table.incr.column.name";
        private static final String IS_INCREMENTAL = "hoodie.deltastreamer.jdbc.incr.pull";
        private static final String EXTRA_OPTIONS = "hoodie.deltastreamer.jdbc.extra.options.";
        private static final String STORAGE_LEVEL = "hoodie.deltastreamer.jdbc.storage.level";
        private static final String FALLBACK_TO_FULL_FETCH = "hoodie.deltastreamer.jdbc.incr.fallback.to.full.fetch";

        protected Config() {
        }
    }

    public JdbcSource(TypedProperties typedProperties, JavaSparkContext javaSparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
        super(typedProperties, javaSparkContext, sparkSession, schemaProvider);
    }

    private static DataFrameReader validatePropsAndGetDataFrameReader(SparkSession sparkSession, TypedProperties typedProperties) throws HoodieException {
        DataFrameReader option;
        Closeable closeable = null;
        try {
            try {
                DataFrameReader option2 = sparkSession.read().format("jdbc").option(RtspHeaders.Values.URL, typedProperties.getString("hoodie.deltastreamer.jdbc.url")).option(ClientQuotaEntity.USER, typedProperties.getString("hoodie.deltastreamer.jdbc.user")).option("driver", typedProperties.getString("hoodie.deltastreamer.jdbc.driver.class")).option("dbtable", typedProperties.getString("hoodie.deltastreamer.jdbc.table.name"));
                if (typedProperties.containsKey("hoodie.deltastreamer.jdbc.password")) {
                    LOG.info("Reading JDBC password from properties file....");
                    option = option2.option(KeyProvider.PASSWORD, typedProperties.getString("hoodie.deltastreamer.jdbc.password"));
                } else {
                    if (!typedProperties.containsKey("hoodie.deltastreamer.jdbc.password.file") || StringUtils.isNullOrEmpty(typedProperties.getString("hoodie.deltastreamer.jdbc.password.file"))) {
                        throw new IllegalArgumentException(String.format("JDBCSource needs either a %s or %s to connect to RDBMS datasource", "hoodie.deltastreamer.jdbc.password.file", "hoodie.deltastreamer.jdbc.password"));
                    }
                    LOG.info(String.format("Reading JDBC password from password file by config %s", "hoodie.deltastreamer.jdbc.password.file"));
                    closeable = FileSystem.get(sparkSession.sparkContext().hadoopConfiguration()).open(new Path(typedProperties.getString("hoodie.deltastreamer.jdbc.password.file")));
                    byte[] bArr = new byte[closeable.available()];
                    closeable.read(bArr);
                    option = option2.option(KeyProvider.PASSWORD, new String(bArr));
                }
                addExtraJdbcOptions(typedProperties, option);
                if (typedProperties.getBoolean("hoodie.deltastreamer.jdbc.incr.pull")) {
                    DataSourceUtils.checkRequiredProperties(typedProperties, Collections.singletonList("hoodie.deltastreamer.jdbc.table.incr.column.name"));
                }
                closeable = closeable;
                return option;
            } catch (Exception e) {
                throw new HoodieException("Failed to validate properties", e);
            }
        } finally {
            IOUtils.closeStream((Closeable) null);
        }
    }

    private static void addExtraJdbcOptions(TypedProperties typedProperties, DataFrameReader dataFrameReader) {
        Iterator it = typedProperties.keySet().iterator();
        while (it.hasNext()) {
            String obj = it.next().toString();
            if (obj.startsWith("hoodie.deltastreamer.jdbc.extra.options.")) {
                String join = String.join("", obj.split("hoodie.deltastreamer.jdbc.extra.options."));
                String string = typedProperties.getString(obj);
                if (!StringUtils.isNullOrEmpty(string)) {
                    LOG.info(String.format("Adding %s -> %s to jdbc options", join, string));
                    dataFrameReader.option(join, string);
                }
            }
        }
    }

    @Override // org.apache.hudi.utilities.sources.RowSource
    protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> option, long j) throws HoodieException {
        try {
            DataSourceUtils.checkRequiredProperties(this.props, Arrays.asList("hoodie.deltastreamer.jdbc.url", "hoodie.deltastreamer.jdbc.driver.class", "hoodie.deltastreamer.jdbc.user", "hoodie.deltastreamer.jdbc.table.name", "hoodie.deltastreamer.jdbc.incr.pull"));
            return fetch(option, j);
        } catch (HoodieException e) {
            LOG.error("Exception while running JDBCSource ", e);
            throw e;
        } catch (Exception e2) {
            LOG.error("Exception while running JDBCSource ", e2);
            throw new HoodieException("Error fetching next batch from JDBC source. Last checkpoint: " + option.orElse(null), e2);
        }
    }

    private Pair<Option<Dataset<Row>>, String> fetch(Option<String> option, long j) {
        Dataset<Row> fullFetch;
        if (!option.isPresent() || StringUtils.isNullOrEmpty(option.get())) {
            LOG.info("No checkpoint references found. Doing a full rdbms table fetch");
            fullFetch = fullFetch(j);
        } else {
            fullFetch = incrementalFetch(option, j);
        }
        fullFetch.persist(StorageLevel.fromString(this.props.getString("hoodie.deltastreamer.jdbc.storage.level", "MEMORY_AND_DISK_SER")));
        Pair<Option<Dataset<Row>>, String> of = Pair.of(Option.of(fullFetch), checkpoint(fullFetch, this.props.getBoolean("hoodie.deltastreamer.jdbc.incr.pull"), option));
        fullFetch.unpersist();
        return of;
    }

    private Dataset<Row> incrementalFetch(Option<String> option, long j) {
        try {
            SqlQueryBuilder where = SqlQueryBuilder.select("*").from(this.props.getString("hoodie.deltastreamer.jdbc.table.name")).where(String.format(" %s > '%s'", this.props.getString("hoodie.deltastreamer.jdbc.table.incr.column.name"), option.get()));
            if (j > 0 && DB_LIMIT_CLAUSE.contains(URI.create(this.props.getString("hoodie.deltastreamer.jdbc.url").substring(URI_JDBC_PREFIX.length())).getScheme())) {
                where.orderBy(this.props.getString("hoodie.deltastreamer.jdbc.table.incr.column.name")).limit(j);
            }
            String format = String.format("(%s) rdbms_table", where.toString());
            LOG.info("PPD QUERY: " + format);
            LOG.info(String.format("Referenced last checkpoint and prepared new predicate pushdown query for jdbc pull %s", format));
            return validatePropsAndGetDataFrameReader(this.sparkSession, this.props).option("dbtable", format).load();
        } catch (Exception e) {
            LOG.error("Error while performing an incremental fetch. Not all database support the PPD query we generate to do an incremental scan", e);
            if (!this.props.containsKey("hoodie.deltastreamer.jdbc.incr.fallback.to.full.fetch") || !this.props.getBoolean("hoodie.deltastreamer.jdbc.incr.fallback.to.full.fetch")) {
                throw e;
            }
            LOG.warn("Falling back to full scan.");
            return fullFetch(j);
        }
    }

    private Dataset<Row> fullFetch(long j) {
        SqlQueryBuilder from = SqlQueryBuilder.select("*").from(this.props.getString("hoodie.deltastreamer.jdbc.table.name"));
        if (j > 0 && DB_LIMIT_CLAUSE.contains(URI.create(this.props.getString("hoodie.deltastreamer.jdbc.url").substring(URI_JDBC_PREFIX.length())).getScheme())) {
            if (this.props.containsKey("hoodie.deltastreamer.jdbc.table.incr.column.name")) {
                from.orderBy(this.props.getString("hoodie.deltastreamer.jdbc.table.incr.column.name")).limit(j);
            } else {
                from.limit(j);
            }
        }
        return validatePropsAndGetDataFrameReader(this.sparkSession, this.props).option("dbtable", String.format("(%s) rdbms_table", from.toString())).load();
    }

    private String checkpoint(Dataset<Row> dataset, boolean z, Option<String> option) {
        if (!z) {
            return "";
        }
        try {
            Column col = dataset.col(this.props.getString("hoodie.deltastreamer.jdbc.table.incr.column.name"));
            String string = ((Row) dataset.agg(functions.max(col).cast(DataTypes.StringType), new Column[0]).first()).getString(0);
            LOG.info(String.format("Checkpointing column %s with value: %s ", col, string));
            return string != null ? string : (!option.isPresent() || StringUtils.isNullOrEmpty(option.get())) ? "" : option.get();
        } catch (Exception e) {
            LOG.error("Failed to checkpoint");
            throw new HoodieException("Failed to checkpoint. Last checkpoint: " + option.orElse(null), e);
        }
    }
}
