package org.apache.carbondata.examples;

import java.io.File;
import java.io.PrintWriter;
import java.net.ServerSocket;
import org.apache.carbondata.examples.util.ExampleUtils$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.CarbonEnv$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Time;
import scala.Predef$;
import scala.Some;
import scala.collection.immutable.StringOps;
import scala.reflect.ClassTag$;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

/* compiled from: StreamingUsingBatchLoadExample.scala */
/* loaded from: input_file:org/apache/carbondata/examples/StreamingUsingBatchLoadExample$.class */
public final class StreamingUsingBatchLoadExample$ {
    public static StreamingUsingBatchLoadExample$ MODULE$;

    static {
        new StreamingUsingBatchLoadExample$();
    }

    public void main(String[] strArr) {
        String canonicalPath = new File(new StringBuilder(11).append(getClass().getResource("/").getPath()).append("../../../..").toString()).getCanonicalPath();
        String sb = new StringBuilder(42).append(canonicalPath).append("/examples/spark/target/spark_streaming_cp_").append(BoxesRunTime.boxToLong(System.currentTimeMillis()).toString()).toString();
        SparkSession createSparkSession = ExampleUtils$.MODULE$.createSparkSession("StreamingUsingBatchLoadExample", 4);
        if (1 != 0) {
            createSparkSession.sql(new StringBuilder(21).append("DROP TABLE IF EXISTS ").append("dstream_batch_table").toString());
            createSparkSession.sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(343).append("\n           | CREATE TABLE ").append("dstream_batch_table").append("(\n           | id INT,\n           | name STRING,\n           | city STRING,\n           | salary FLOAT\n           | )\n           | STORED AS carbondata\n           | TBLPROPERTIES(\n           | 'sort_columns'='name',\n           | 'AUTO_LOAD_MERGE'='true',\n           | 'COMPACTION_LEVEL_THRESHOLD'='4,10')\n           | ").toString())).stripMargin());
            CarbonEnv$.MODULE$.getCarbonTable(new Some("default"), "dstream_batch_table", createSparkSession);
            createSparkSession.sql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(112).append("\n           | LOAD DATA LOCAL INPATH '").append(new StringBuilder(51).append(canonicalPath).append("/examples/spark/src/main/resources/streamSample.csv").toString()).append("'\n           | INTO TABLE ").append("dstream_batch_table").append("\n           | OPTIONS('HEADER'='true')\n         ").toString())).stripMargin());
            ServerSocket serverSocket = new ServerSocket(7071);
            Thread writeSocket = writeSocket(serverSocket);
            Thread showTableCount = showTableCount(createSparkSession, "dstream_batch_table");
            StreamingContext startStreaming = startStreaming(createSparkSession, "dstream_batch_table", sb);
            waitForStopSignal(startStreaming);
            startStreaming.start();
            startStreaming.awaitTermination();
            writeSocket.interrupt();
            showTableCount.interrupt();
            serverSocket.close();
        }
        createSparkSession.sql(new StringBuilder(21).append("select count(*) from ").append("dstream_batch_table").toString()).show(100, false);
        createSparkSession.sql(new StringBuilder(14).append("select * from ").append("dstream_batch_table").toString()).show(100, false);
        createSparkSession.sql(new StringBuilder(55).append("select * ").append("from ").append("dstream_batch_table").append(" ").append("where id = 100000001 or id = 1 limit 100").toString()).show(100, false);
        createSparkSession.sql(new StringBuilder(38).append("select * ").append("from ").append("dstream_batch_table").append(" ").append("where id < 10 limit 100").toString()).show(100, false);
        createSparkSession.sql(new StringBuilder(24).append("SHOW SEGMENTS FOR TABLE ").append("dstream_batch_table").toString()).show(false);
        createSparkSession.sql(new StringBuilder(21).append("DROP TABLE IF EXISTS ").append("dstream_batch_table").toString());
        createSparkSession.stop();
        System.out.println("streaming finished");
    }

    public Thread showTableCount(final SparkSession sparkSession, final String str) {
        Thread thread = new Thread(sparkSession, str) { // from class: org.apache.carbondata.examples.StreamingUsingBatchLoadExample$$anon$1
            private final SparkSession spark$1;
            private final String tableName$1;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 1000).foreach$mVc$sp(i -> {
                    this.spark$1.sql(new StringBuilder(21).append("select count(*) from ").append(this.tableName$1).toString()).show(false);
                    this.spark$1.sql(new StringBuilder(24).append("SHOW SEGMENTS FOR TABLE ").append(this.tableName$1).toString()).show(false);
                    Thread.sleep(5000L);
                });
            }

            {
                this.spark$1 = sparkSession;
                this.tableName$1 = str;
            }
        };
        thread.start();
        return thread;
    }

    public Thread waitForStopSignal(final StreamingContext streamingContext) {
        Thread thread = new Thread(streamingContext) { // from class: org.apache.carbondata.examples.StreamingUsingBatchLoadExample$$anon$2
            private final StreamingContext ssc$1;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                new ServerSocket(7072).accept();
                this.ssc$1.stop(false, true);
            }

            {
                this.ssc$1 = streamingContext;
            }
        };
        thread.start();
        return thread;
    }

    public StreamingContext startStreaming(SparkSession sparkSession, String str, String str2) {
        StreamingContext streamingContext = null;
        try {
            streamingContext = new StreamingContext(sparkSession.sparkContext(), Seconds$.MODULE$.apply(15L));
            streamingContext.checkpoint(str2);
            streamingContext.socketTextStream("localhost", 7071, streamingContext.socketTextStream$default$3()).map(str3 -> {
                return str3.split(",");
            }, ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(String.class))).map(strArr -> {
                return new DStreamData(new StringOps(Predef$.MODULE$.augmentString(strArr[0])).toInt(), strArr[1], strArr[2], new StringOps(Predef$.MODULE$.augmentString(strArr[3])).toFloat());
            }, ClassTag$.MODULE$.apply(DStreamData.class)).foreachRDD((rdd, time) -> {
                $anonfun$startStreaming$3(sparkSession, str, rdd, time);
                return BoxedUnit.UNIT;
            });
        } catch (Exception e) {
            e.printStackTrace();
            Predef$.MODULE$.println("Done reading and writing streaming data");
        }
        return streamingContext;
    }

    public Thread writeSocket(final ServerSocket serverSocket) {
        Thread thread = new Thread(serverSocket) { // from class: org.apache.carbondata.examples.StreamingUsingBatchLoadExample$$anon$3
            private final ServerSocket serverSocket$1;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                PrintWriter printWriter = new PrintWriter(this.serverSocket$1.accept().getOutputStream());
                IntRef create = IntRef.create(0);
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 1000).foreach$mVc$sp(i -> {
                    RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 1000).foreach$mVc$sp(i -> {
                        create.elem++;
                        printWriter.println(new StringBuilder(30).append(BoxesRunTime.boxToInteger(create.elem).toString()).append(",name_").append(create.elem).append(",city_").append(create.elem).append(",").append(BoxesRunTime.boxToDouble(create.elem * 10000.0d).toString()).append(",school_").append(create.elem).append(":school_").append(create.elem).append(create.elem).append("$").append(create.elem).toString());
                    });
                    printWriter.flush();
                    Thread.sleep(1000L);
                });
                printWriter.close();
                System.out.println("Socket closed");
            }

            {
                this.serverSocket$1 = serverSocket;
            }
        };
        thread.start();
        return thread;
    }

    public static final /* synthetic */ void $anonfun$startStreaming$3(SparkSession sparkSession, String str, RDD rdd, Time time) {
        Dataset df = sparkSession.createDataFrame(rdd, package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(MODULE$.getClass().getClassLoader()), new TypeCreator() { // from class: org.apache.carbondata.examples.StreamingUsingBatchLoadExample$$typecreator1$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                mirror.universe();
                return mirror.staticClass("org.apache.carbondata.examples.DStreamData").asType().toTypeConstructor();
            }
        })).toDF(Predef$.MODULE$.wrapRefArray(new String[]{"id", "name", "city", "salary"}));
        Predef$.MODULE$.println(new StringBuilder(38).append("at time: ").append(time.toString()).append(" the count of received data: ").append(df.count()).toString());
        df.write().format("carbondata").option("tableName", str).mode(SaveMode.Append).save();
    }

    private StreamingUsingBatchLoadExample$() {
        MODULE$ = this;
    }
}
