/*
 * Decompiled with CFR 0.152.
 */
package io.snappydata.examples;

import com.typesafe.config.Config;
import java.io.File;
import java.io.PrintWriter;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.streaming.SchemaDStream;
import org.apache.spark.sql.streaming.SnappyStreamingJob;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.streaming.SnappyStreamingContext;
import scala.Function1;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxesRunTime;
import spark.jobserver.SparkJobValid$;
import spark.jobserver.SparkJobValidation;

public final class TwitterPopularTagsJob$
implements SnappyStreamingJob {
    public static final TwitterPopularTagsJob$ MODULE$;

    static {
        new TwitterPopularTagsJob$();
    }

    /*
     * WARNING - void declaration
     */
    public Object runJob(SnappyStreamingContext snsc, Config jobConfig) {
        DataFrame dataFrame;
        Object stream = null;
        String outFileName = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"TwitterPopularTagsJob-", ".out"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)System.currentTimeMillis())}));
        PrintWriter pw = new PrintWriter(outFileName);
        StructType schema = StructType$.MODULE$.apply((Seq)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new StructField[]{new StructField("hashtag", (DataType)StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4())})));
        snsc.snappyContext().sql("DROP TABLE IF EXISTS topktable");
        snsc.snappyContext().sql("DROP TABLE IF EXISTS hashtagtable");
        snsc.snappyContext().sql("DROP TABLE IF EXISTS retweettable");
        if (jobConfig.hasPath("consumerKey") && jobConfig.hasPath("consumerKey") && jobConfig.hasPath("accessToken") && jobConfig.hasPath("accessTokenSecret")) {
            pw.println("##### Running example with live twitter stream #####");
            snsc.sql(new StringBuilder().append((Object)"CREATE STREAM TABLE hashtagtable (hashtag STRING) USING twitter_stream OPTIONS (").append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"consumerKey '", "', "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{jobConfig.getString("consumerKey")}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"consumerSecret '", "', "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{jobConfig.getString("consumerSecret")}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"accessToken '", "', "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{jobConfig.getString("accessToken")}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"accessTokenSecret '", "', "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{jobConfig.getString("accessTokenSecret")}))).append((Object)"rowConverter 'org.apache.spark.sql.streaming.TweetToHashtagRow')").toString());
            dataFrame = snsc.sql(new StringBuilder().append((Object)"CREATE STREAM TABLE retweettable (retweetId LONG, retweetCnt INT, retweetTxt STRING) USING twitter_stream OPTIONS (").append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"consumerKey '", "', "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{jobConfig.getString("consumerKey")}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"consumerSecret '", "', "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{jobConfig.getString("consumerSecret")}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"accessToken '", "', "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{jobConfig.getString("accessToken")}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"accessTokenSecret '", "', "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{jobConfig.getString("accessTokenSecret")}))).append((Object)"rowConverter 'org.apache.spark.sql.streaming.TweetToRetweetRow')").toString());
        } else {
            pw.println("##### Running example with stored tweet data #####");
            snsc.sql("CREATE STREAM TABLE hashtagtable (hashtag STRING) USING file_stream OPTIONS (storagelevel 'MEMORY_AND_DISK_SER_2', rowConverter 'org.apache.spark.sql.streaming.TweetToHashtagRow',directory '/tmp/copiedtwitterdata')");
            dataFrame = snsc.sql("CREATE STREAM TABLE retweettable (retweetId LONG, retweetCnt INT, retweetTxt STRING) USING file_stream OPTIONS (storagelevel 'MEMORY_AND_DISK_SER_2', rowConverter 'org.apache.spark.sql.streaming.TweetToRetweetRow',directory '/tmp/copiedtwitterdata')");
        }
        SchemaDStream retweetStream = snsc.registerCQ("SELECT * FROM retweettable WINDOW (DURATION 2 SECONDS, SLIDE 2 SECONDS)");
        Map topKOption = (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)"epoch"), (Object)((Object)BoxesRunTime.boxToLong((long)System.currentTimeMillis())).toString()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)"timeInterval"), (Object)"2000ms"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)"size"), (Object)"10"), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)"basetable"), (Object)"hashtagtable")}));
        snsc.snappyContext().createApproxTSTopK("topktable", "hashtag", schema, topKOption, snsc.snappyContext().createApproxTSTopK$default$5());
        String tableName = "retweetStore";
        snsc.snappyContext().dropTable(tableName, true);
        snsc.snappyContext().sql(new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"CREATE TABLE ", " (retweetId BIGINT PRIMARY KEY, "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{tableName}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"retweetCnt INT, retweetTxt STRING) USING row OPTIONS ()"})).s((Seq)Nil$.MODULE$)).toString());
        retweetStream.foreachDataFrame((Function1)new Serializable(tableName){
            public static final long serialVersionUID = 0L;
            private final String tableName$1;

            public final void apply(DataFrame df) {
                df.write().mode(SaveMode.Append).saveAsTable(this.tableName$1);
            }
            {
                this.tableName$1 = tableName$1;
            }
        });
        snsc.start();
        try {
            int runTime = jobConfig.hasPath("streamRunTime") ? new StringOps(Predef$.MODULE$.augmentString(jobConfig.getString("streamRunTime"))).toInt() * 1000 : 120000;
            long end = System.currentTimeMillis() + (long)runTime;
            while (end > System.currentTimeMillis()) {
                Thread.sleep(2000L);
                pw.println("\n******** Top 10 hash tags of last two seconds *******\n");
                Predef$.MODULE$.refArrayOps((Object[])snsc.snappyContext().queryApproxTSTopK("topktable", System.currentTimeMillis() - 2000L, System.currentTimeMillis()).collect()).foreach((Function1)new Serializable(pw){
                    public static final long serialVersionUID = 0L;
                    private final PrintWriter pw$1;

                    public final void apply(Row result) {
                        this.pw$1.println(result.toString());
                    }
                    {
                        this.pw$1 = pw$1;
                    }
                });
            }
            pw.println("\n************ Top 10 hash tags until now ***************\n");
            Predef$.MODULE$.refArrayOps((Object[])snsc.sql("SELECT * FROM topktable").collect()).foreach((Function1)new Serializable(pw){
                public static final long serialVersionUID = 0L;
                private final PrintWriter pw$1;

                public final void apply(Row result) {
                    this.pw$1.println(result.toString());
                }
                {
                    this.pw$1 = pw$1;
                }
            });
            pw.println("\n####### Top 10 popular tweets - Query Row table #######\n");
            Predef$.MODULE$.refArrayOps((Object[])snsc.snappyContext().sql(new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"SELECT retweetId AS RetweetId, "})).s((Seq)Nil$.MODULE$)).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"retweetCnt AS RetweetsCount, retweetTxt AS Text FROM ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{tableName}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{" ORDER BY RetweetsCount DESC LIMIT 10"})).s((Seq)Nil$.MODULE$)).toString()).collect()).foreach((Function1)new Serializable(pw){
                public static final long serialVersionUID = 0L;
                private final PrintWriter pw$1;

                public final void apply(Row row) {
                    this.pw$1.println(row.toString());
                }
                {
                    this.pw$1 = pw$1;
                }
            });
            pw.println("\n#######################################################");
            pw.close();
        }
        catch (Throwable throwable) {
            void var5_5;
            var5_5.close();
            snsc.stop(false, true);
            throw throwable;
        }
        snsc.stop(false, true);
        return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"See ", "/", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.getCurrentDirectory$1(), outFileName}));
    }

    public SparkJobValidation validate(SnappyStreamingContext snsc, Config config) {
        return SparkJobValid$.MODULE$;
    }

    private final String getCurrentDirectory$1() {
        return new File(".").getCanonicalPath();
    }

    private TwitterPopularTagsJob$() {
        MODULE$ = this;
    }
}

