package org.apache.hudi.utilities.hst;

import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.hst.MetaKafka;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.utilities.HoodieSnapshotExporter;
import org.apache.hudi.utilities.streamer.NoNewDataTerminationStrategy;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.collection.JavaConverters;

/* loaded from: input_file:org/apache/hudi/utilities/hst/HoodieStreamTableSync.class */
public class HoodieStreamTableSync {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/utilities/hst/HoodieStreamTableSync$DebeziumJsonFlatMap.class */
    public static class DebeziumJsonFlatMap implements FlatMapFunction<Row, Row> {
        boolean debeziumJsonSchemaEnabled;

        public DebeziumJsonFlatMap(boolean z) {
            this.debeziumJsonSchemaEnabled = z;
        }

        public Iterator<Row> call(Row row) {
            Row row2 = this.debeziumJsonSchemaEnabled ? (Row) row.getAs("payload") : row;
            int intValue = ((Integer) row.getAs(HoodieRecord.HST_KAFKA_PARTITION_FIELD)).intValue();
            long longValue = ((Long) row.getAs(HoodieRecord.HST_KAFKA_OFFSET_FIELD)).longValue();
            String str = (String) row2.getAs("op");
            Timestamp timestamp = (Timestamp) row2.getAs("ts_ms");
            boolean z = -1;
            switch (str.hashCode()) {
                case 99:
                    if (str.equals("c")) {
                        z = false;
                        break;
                    }
                    break;
                case 100:
                    if (str.equals("d")) {
                        z = 3;
                        break;
                    }
                    break;
                case 105:
                    if (str.equals("i")) {
                        z = true;
                        break;
                    }
                    break;
                case 117:
                    if (str.equals("u")) {
                        z = 2;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                case true:
                    Row enhance = enhance((Row) row2.getAs("after"), HoodieOperation.INSERT.getName(), intValue, longValue, timestamp);
                    System.out.println("enhancedAfter: " + enhance);
                    return Stream.of(enhance).iterator();
                case true:
                    return Stream.of((Object[]) new Row[]{enhance((Row) row2.getAs("before"), HoodieOperation.UPDATE_BEFORE.getName(), intValue, longValue, timestamp), enhance((Row) row2.getAs("after"), HoodieOperation.UPDATE_AFTER.getName(), intValue, longValue, timestamp)}).iterator();
                case NoNewDataTerminationStrategy.DEFAULT_MAX_ROUNDS_WITHOUT_NEW_DATA_TO_SHUTDOWN /* 3 */:
                    return Stream.of(enhance((Row) row2.getAs("before"), HoodieOperation.DELETE.getName(), intValue, longValue, timestamp)).iterator();
                default:
                    throw new IllegalArgumentException("Unknown Debezium operation " + str);
            }
        }

        private Row enhance(Row row, String str, int i, long j, Timestamp timestamp) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(str);
            arrayList.add(Integer.valueOf(i));
            arrayList.add(Long.valueOf(j));
            arrayList.add(timestamp);
            arrayList.addAll(JavaConverters.seqAsJavaList(row.toSeq()));
            return new GenericRow(arrayList.toArray());
        }
    }

    public static DataStreamWriter<Row> hstStreamWriter(SparkSession sparkSession, String str, String str2, String str3, StructType structType) {
        return hstStreamWriter(sparkSession, str, str2, str3, structType, null);
    }

    public static DataStreamWriter<Row> hstStreamWriter(SparkSession sparkSession, String str, String str2, String str3, StructType structType, StructType structType2) {
        StructType add = new StructType().add(HoodieRecord.OPERATION_METADATA_FIELD, DataTypes.StringType).add(HoodieRecord.HST_KAFKA_PARTITION_FIELD, DataTypes.IntegerType).add(HoodieRecord.HST_KAFKA_OFFSET_FIELD, DataTypes.LongType).add("ts_ms", DataTypes.TimestampType);
        for (StructField structField : structType.fields()) {
            add = add.add(structField);
        }
        if (structType2 == null) {
            structType2 = new StructType().add("schema", DataTypes.StringType).add("payload", new StructType().add("op", DataTypes.StringType).add("before", structType).add("after", structType).add("ts_ms", DataTypes.TimestampType).add("source", DataTypes.StringType));
        }
        return sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers", str).option("subscribe", str2).option("startingOffsets", startingOffsets(sparkSession.sparkContext().hadoopConfiguration(), str3, str2)).load().selectExpr(new String[]{"partition", "offset", "CAST(value AS STRING)"}).select(new Column[]{functions.col("partition").as(HoodieRecord.HST_KAFKA_PARTITION_FIELD), functions.col("offset").as(HoodieRecord.HST_KAFKA_OFFSET_FIELD), functions.from_json(functions.col("value"), structType2).as("data")}).select(HoodieRecord.HST_KAFKA_PARTITION_FIELD, new String[]{HoodieRecord.HST_KAFKA_OFFSET_FIELD, "data.*"}).flatMap(new DebeziumJsonFlatMap(structType2.indexWhere(structField2 -> {
            return Boolean.valueOf("schema".equals(structField2.name()));
        }) >= 0), RowEncoder.apply(add)).writeStream().format(HoodieSnapshotExporter.OutputFormatValidator.HUDI).option(DataSourceWriteOptions.OPERATION().key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL()).option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "ts_ms").option(HoodieTableConfig.TYPE.key(), HoodieTableType.MERGE_ON_READ.name()).option(DataSourceWriteOptions.TABLE_TYPE().key(), HoodieTableType.MERGE_ON_READ.name()).option(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), "true").option(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), "false").option(HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key(), "false").option(HoodieWriteConfig.COMBINE_BEFORE_DELETE.key(), "false").option(HoodieIndexConfig.INDEX_TYPE.key(), "BUCKET");
    }

    private static String startingOffsets(Configuration configuration, String str, String str2) {
        String str3;
        String str4 = null;
        try {
            HoodieTableMetaClient build = HoodieTableMetaClient.builder().setConf(new HadoopStorageConfiguration(configuration)).setBasePath(str).setLoadActiveTimelineOnLoad(true).build();
            if (build != null) {
                HoodieTimeline filterCompletedInstants = build.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
                Option lastInstant = filterCompletedInstants.lastInstant();
                if (lastInstant.isPresent() && (str3 = (String) HoodieInputFormatUtils.getCommitMetadata((HoodieInstant) lastInstant.get(), filterCompletedInstants).getExtraMetadata().get("commitKafkaMetadata")) != null) {
                    str4 = MetaKafka.fromString(str3).toOffsetsForSpark();
                }
            }
        } catch (TableNotFoundException e) {
        } catch (IOException e2) {
            throw new HoodieException(e2);
        }
        return str4 != null ? String.format("{\"%s\":%s}", str2, str4) : "earliest";
    }
}
