package org.apache.carbondata.examples;

import java.io.File;
import java.io.PrintWriter;
import java.net.ServerSocket;
import org.apache.carbondata.examples.util.ExampleUtils$;
import org.apache.spark.sql.CarbonEnv$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.StreamingContext;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

/* compiled from: StreamingUsingBatchLoadExample.scala */
/* loaded from: input_file:org/apache/carbondata/examples/StreamingUsingBatchLoadExample$.class */
public final class StreamingUsingBatchLoadExample$ {
    public static final StreamingUsingBatchLoadExample$ MODULE$ = null;

    static {
        new StreamingUsingBatchLoadExample$();
    }

    public void main(String[] strArr) {
        String canonicalPath = new File(new StringBuilder().append(getClass().getResource("/").getPath()).append("../../../..").toString()).getCanonicalPath();
        String stringBuilder = new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "/examples/spark2/target/spark_streaming_cp_"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{canonicalPath}))).append(BoxesRunTime.boxToLong(System.currentTimeMillis()).toString()).toString();
        String s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"dstream_batch_table"})).s(Nil$.MODULE$);
        SparkSession createCarbonSession = ExampleUtils$.MODULE$.createCarbonSession("StreamingUsingBatchLoadExample", 4);
        if (1 != 0) {
            createCarbonSession.sql(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"DROP TABLE IF EXISTS ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{s})));
            createCarbonSession.sql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n           | CREATE TABLE ", "(\n           | id INT,\n           | name STRING,\n           | city STRING,\n           | salary FLOAT\n           | )\n           | STORED BY 'carbondata'\n           | TBLPROPERTIES(\n           | 'sort_columns'='name',\n           | 'dictionary_include'='city',\n           | 'AUTO_LOAD_MERGE'='true',\n           | 'COMPACTION_LEVEL_THRESHOLD'='4,10')\n           | "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{s})))).stripMargin());
            CarbonEnv$.MODULE$.getCarbonTable(new Some("default"), s, createCarbonSession);
            createCarbonSession.sql(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n           | LOAD DATA LOCAL INPATH '", "'\n           | INTO TABLE ", "\n           | OPTIONS('HEADER'='true')\n         "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "/examples/spark2/src/main/resources/streamSample.csv"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{canonicalPath})), s})))).stripMargin());
            ServerSocket serverSocket = new ServerSocket(7071);
            Thread writeSocket = writeSocket(serverSocket);
            Thread showTableCount = showTableCount(createCarbonSession, s);
            StreamingContext startStreaming = startStreaming(createCarbonSession, s, stringBuilder);
            waitForStopSignal(startStreaming);
            startStreaming.start();
            startStreaming.awaitTermination();
            writeSocket.interrupt();
            showTableCount.interrupt();
            serverSocket.close();
        }
        createCarbonSession.sql(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"select count(*) from ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{s}))).show(100, false);
        createCarbonSession.sql(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"select * from ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{s}))).show(100, false);
        createCarbonSession.sql(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"select * "})).s(Nil$.MODULE$)).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"from ", " "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{s}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"where id = 100000001 or id = 1 limit 100"})).s(Nil$.MODULE$)).toString()).show(100, false);
        createCarbonSession.sql(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"select * "})).s(Nil$.MODULE$)).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"from ", " "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{s}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"where id < 10 limit 100"})).s(Nil$.MODULE$)).toString()).show(100, false);
        createCarbonSession.sql(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"SHOW SEGMENTS FOR TABLE ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{s}))).show(false);
        createCarbonSession.sql(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"DROP TABLE IF EXISTS ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{s})));
        createCarbonSession.stop();
        System.out.println("streaming finished");
    }

    public Thread showTableCount(SparkSession sparkSession, String str) {
        StreamingUsingBatchLoadExample$$anon$1 streamingUsingBatchLoadExample$$anon$1 = new StreamingUsingBatchLoadExample$$anon$1(sparkSession, str);
        streamingUsingBatchLoadExample$$anon$1.start();
        return streamingUsingBatchLoadExample$$anon$1;
    }

    public Thread waitForStopSignal(final StreamingContext streamingContext) {
        Thread thread = new Thread(streamingContext) { // from class: org.apache.carbondata.examples.StreamingUsingBatchLoadExample$$anon$2
            private final StreamingContext ssc$1;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                new ServerSocket(7072).accept();
                this.ssc$1.stop(false, true);
            }

            {
                this.ssc$1 = streamingContext;
            }
        };
        thread.start();
        return thread;
    }

    public StreamingContext startStreaming(SparkSession sparkSession, String str, String str2) {
        StreamingContext streamingContext = null;
        try {
            streamingContext = new StreamingContext(sparkSession.sparkContext(), Seconds$.MODULE$.apply(15L));
            streamingContext.checkpoint(str2);
            streamingContext.socketTextStream("localhost", 7071, streamingContext.socketTextStream$default$3()).map(new StreamingUsingBatchLoadExample$$anonfun$1(), ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(String.class))).map(new StreamingUsingBatchLoadExample$$anonfun$2(), ClassTag$.MODULE$.apply(DStreamData.class)).foreachRDD(new StreamingUsingBatchLoadExample$$anonfun$startStreaming$1(sparkSession, str));
        } catch (Exception e) {
            e.printStackTrace();
            Predef$.MODULE$.println("Done reading and writing streaming data");
        }
        return streamingContext;
    }

    public Thread writeSocket(ServerSocket serverSocket) {
        Thread thread = new Thread(serverSocket) { // from class: org.apache.carbondata.examples.StreamingUsingBatchLoadExample$$anon$3
            private final ServerSocket serverSocket$1;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                PrintWriter printWriter = new PrintWriter(this.serverSocket$1.accept().getOutputStream());
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 1000).foreach$mVc$sp(new StreamingUsingBatchLoadExample$$anon$3$$anonfun$run$2(this, printWriter, IntRef.create(0)));
                printWriter.close();
                System.out.println("Socket closed");
            }

            {
                this.serverSocket$1 = serverSocket;
            }
        };
        thread.start();
        return thread;
    }

    private StreamingUsingBatchLoadExample$() {
        MODULE$ = this;
    }
}
