package org.apache.carbondata.examples;

import java.io.File;
import java.io.PrintWriter;
import java.net.ServerSocket;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.examples.util.ExampleUtils$;
import org.apache.spark.sql.CarbonEnv$;
import org.apache.spark.sql.CarbonToSparkAdapter$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import scala.Predef$;
import scala.Some;
import scala.collection.immutable.StringOps;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;

/* compiled from: StreamingWithRowParserExample.scala */
/* loaded from: input_file:org/apache/carbondata/examples/StreamingWithRowParserExample$.class */
public final class StreamingWithRowParserExample$ {
    public static StreamingWithRowParserExample$ MODULE$;

    static {
        new StreamingWithRowParserExample$();
    }

    public void main(String[] strArr) {
        String canonicalPath = new File(new StringBuilder(11).append(getClass().getResource("/").getPath()).append("../../../..").toString()).getCanonicalPath();
        SparkSession createSparkSession = ExampleUtils$.MODULE$.createSparkSession("StreamingWithRowParserExample", 4);
        if (1 != 0) {
            createSparkSession.sql(new StringBuilder(21).append("DROP TABLE IF EXISTS ").append("stream_table_with_row_parser").toString());
            if (0 != 0) {
                createSparkSession.sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(353).append("\n             | CREATE TABLE ").append("stream_table_with_row_parser").append("(\n             | id INT,\n             | name STRING,\n             | city STRING,\n             | salary FLOAT,\n             | file struct<school:array<string>, age:int>\n             | )\n             | STORED AS carbondata\n             | TBLPROPERTIES(\n             | 'streaming'='true', 'sort_columns'='name')\n             | ").toString())).stripMargin());
            } else {
                createSparkSession.sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(294).append("\n             | CREATE TABLE ").append("stream_table_with_row_parser").append("(\n             | id INT,\n             | name STRING,\n             | city STRING,\n             | salary FLOAT\n             | )\n             | STORED AS carbondata\n             | TBLPROPERTIES(\n             | 'streaming'='true', 'sort_columns'='name')\n             | ").toString())).stripMargin());
            }
            CarbonTable carbonTable = CarbonEnv$.MODULE$.getCarbonTable(new Some("default"), "stream_table_with_row_parser", createSparkSession);
            createSparkSession.sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(112).append("\n           | LOAD DATA LOCAL INPATH '").append(new StringBuilder(51).append(canonicalPath).append("/examples/spark/src/main/resources/streamSample.csv").toString()).append("'\n           | INTO TABLE ").append("stream_table_with_row_parser").append("\n           | OPTIONS('HEADER'='true')\n         ").toString())).stripMargin());
            ServerSocket serverSocket = new ServerSocket(7071);
            Thread startStreaming = startStreaming(createSparkSession, carbonTable.getTablePath());
            Thread writeSocket = writeSocket(serverSocket);
            Thread showTableCount = showTableCount(createSparkSession, "stream_table_with_row_parser");
            System.out.println("type enter to interrupt streaming");
            System.in.read();
            startStreaming.interrupt();
            writeSocket.interrupt();
            showTableCount.interrupt();
            serverSocket.close();
        }
        createSparkSession.sql(new StringBuilder(21).append("select count(*) from ").append("stream_table_with_row_parser").toString()).show(100, false);
        createSparkSession.sql(new StringBuilder(14).append("select * from ").append("stream_table_with_row_parser").toString()).show(100, false);
        createSparkSession.sql(new StringBuilder(55).append("select * ").append("from ").append("stream_table_with_row_parser").append(" ").append("where id = 100000001 or id = 1 limit 100").toString()).show(100, false);
        createSparkSession.sql(new StringBuilder(38).append("select * ").append("from ").append("stream_table_with_row_parser").append(" ").append("where id < 10 limit 100").toString()).show(100, false);
        if (0 != 0) {
            createSparkSession.sql(new StringBuilder(61).append("select file.age, file.school ").append("from ").append("stream_table_with_row_parser").append(" ").append("where where file.age = 30 ").toString()).show(100, false);
        }
        createSparkSession.stop();
        System.out.println("streaming finished");
    }

    public Thread showTableCount(final SparkSession sparkSession, final String str) {
        Thread thread = new Thread(sparkSession, str) { // from class: org.apache.carbondata.examples.StreamingWithRowParserExample$$anon$1
            private final SparkSession spark$1;
            private final String tableName$1;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 1000).foreach$mVc$sp(i -> {
                    this.spark$1.sql(new StringBuilder(21).append("select count(*) from ").append(this.tableName$1).toString()).show(false);
                    Thread.sleep(3000L);
                });
            }

            {
                this.spark$1 = sparkSession;
                this.tableName$1 = str;
            }
        };
        thread.start();
        return thread;
    }

    public Thread startStreaming(final SparkSession sparkSession, final String str) {
        Thread thread = new Thread(sparkSession, str) { // from class: org.apache.carbondata.examples.StreamingWithRowParserExample$$anon$2
            private final SparkSession spark$2;
            private final String tablePath$1;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                StreamingQuery streamingQuery = null;
                try {
                    try {
                        final StreamingWithRowParserExample$$anon$2 streamingWithRowParserExample$$anon$2 = null;
                        streamingQuery = this.spark$2.readStream().format("socket").option("host", "localhost").option("port", 7071L).load().as(this.spark$2.implicits().newStringEncoder()).map(str2 -> {
                            return str2.split(",");
                        }, this.spark$2.implicits().newStringArrayEncoder()).map(strArr -> {
                            String[] split = strArr[4].split("\\$");
                            FileElement fileElement = new FileElement(split[0].split(":"), new StringOps(Predef$.MODULE$.augmentString(split[1])).toInt());
                            return new StringOps(Predef$.MODULE$.augmentString(strArr[0])).toInt() % 2 == 0 ? new StreamData(new StringOps(Predef$.MODULE$.augmentString(strArr[0])).toInt(), null, strArr[2], new StringOps(Predef$.MODULE$.augmentString(strArr[3])).toFloat(), fileElement) : new StreamData(new StringOps(Predef$.MODULE$.augmentString(strArr[0])).toInt(), strArr[1], strArr[2], new StringOps(Predef$.MODULE$.augmentString(strArr[3])).toFloat(), fileElement);
                        }, this.spark$2.implicits().newProductEncoder(package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(StreamingWithRowParserExample$$anon$2.class.getClassLoader()), new TypeCreator(streamingWithRowParserExample$$anon$2) { // from class: org.apache.carbondata.examples.StreamingWithRowParserExample$$anon$2$$typecreator14$1
                            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                                mirror.universe();
                                return mirror.staticClass("org.apache.carbondata.examples.StreamData").asType().toTypeConstructor();
                            }
                        }))).writeStream().format("carbondata").trigger((Trigger) CarbonToSparkAdapter$.MODULE$.getProcessingTime().apply("5 seconds")).option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(this.tablePath$1)).option("dbName", "default").option("tableName", "stream_table_with_row_parser").start();
                        streamingQuery.awaitTermination();
                    } catch (Exception e) {
                        e.printStackTrace();
                        Predef$.MODULE$.println("Done reading and writing streaming data");
                    }
                } finally {
                    streamingQuery.stop();
                }
            }

            {
                this.spark$2 = sparkSession;
                this.tablePath$1 = str;
            }
        };
        thread.start();
        return thread;
    }

    public Thread writeSocket(final ServerSocket serverSocket) {
        Thread thread = new Thread(serverSocket) { // from class: org.apache.carbondata.examples.StreamingWithRowParserExample$$anon$3
            private final ServerSocket serverSocket$1;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                PrintWriter printWriter = new PrintWriter(this.serverSocket$1.accept().getOutputStream());
                IntRef create = IntRef.create(0);
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 1000).foreach$mVc$sp(i -> {
                    RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 1000).foreach$mVc$sp(i -> {
                        create.elem++;
                        printWriter.println(new StringBuilder(30).append(BoxesRunTime.boxToInteger(create.elem).toString()).append(",name_").append(create.elem).append(",city_").append(create.elem).append(",").append(BoxesRunTime.boxToDouble(create.elem * 10000.0d).toString()).append(",school_").append(create.elem).append(":school_").append(create.elem).append(create.elem).append("$").append(create.elem).toString());
                    });
                    printWriter.flush();
                    Thread.sleep(1000L);
                });
                printWriter.close();
                System.out.println("Socket closed");
            }

            {
                this.serverSocket$1 = serverSocket;
            }
        };
        thread.start();
        return thread;
    }

    private StreamingWithRowParserExample$() {
        MODULE$ = this;
    }
}
