/*
 * Decompiled with CFR 0.152.
 */
package io.snappydata.examples;

import com.typesafe.config.Config;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.JSparkJobValid;
import org.apache.spark.sql.JSparkJobValidation;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.streaming.SchemaDStream;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.JavaSnappyStreamingJob;
import org.apache.spark.streaming.api.java.JavaSnappyStreamingContext;

public class JavaTwitterPopularTagsJob
extends JavaSnappyStreamingJob {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Object runJavaJob(JavaSnappyStreamingContext snsc, Config jobConfig) {
        Object stream = null;
        PrintWriter pw = null;
        String currentDirectory = null;
        boolean success = false;
        String outFileName = null;
        try {
            Row[] result;
            currentDirectory = new File(".").getCanonicalPath();
            outFileName = String.format("JavaTwitterPopularTagsJob-%d.out", System.currentTimeMillis());
            pw = new PrintWriter(outFileName);
            StructType schema = new StructType(new StructField[]{DataTypes.createStructField((String)"hashtag", (DataType)DataTypes.StringType, (boolean)false)});
            snsc.snappyContext().sql("DROP TABLE IF EXISTS topktable");
            snsc.snappyContext().sql("DROP TABLE IF EXISTS hashtagtable");
            snsc.snappyContext().sql("DROP TABLE IF EXISTS retweettable");
            if (jobConfig.hasPath("consumerKey") && jobConfig.hasPath("consumerKey") && jobConfig.hasPath("accessToken") && jobConfig.hasPath("accessTokenSecret")) {
                pw.println("##### Running example with live twitter stream #####");
                String consumerKey = jobConfig.getString("consumerKey");
                String consumerSecret = jobConfig.getString("consumerSecret");
                String accessToken = jobConfig.getString("accessToken");
                String accessTokenSecret = jobConfig.getString("accessTokenSecret");
                snsc.sql("CREATE STREAM TABLE hashtagtable (hashtag STRING) USING twitter_stream OPTIONS (" + String.format("consumerKey '%s',", consumerKey) + String.format("consumerSecret '%s',", consumerSecret) + String.format("accessToken '%s',", accessToken) + String.format("accessTokenSecret '%s',", accessTokenSecret) + String.format("rowConverter '%s'", "org.apache.spark.sql.streaming.TweetToHashtagRow") + ")");
                snsc.sql("CREATE STREAM TABLE retweettable (retweetId LONG, retweetCnt INT, retweetTxt STRING) USING twitter_stream OPTIONS (" + String.format("consumerKey '%s',", consumerKey) + String.format("consumerSecret '%s',", consumerSecret) + String.format("accessToken '%s',", accessToken) + String.format("accessTokenSecret '%s',", accessTokenSecret) + String.format("rowConverter '%s'", "org.apache.spark.sql.streaming.TweetToRetweetRow") + ")");
            } else {
                pw.println("##### Running example with stored tweet data #####");
                snsc.sql("CREATE STREAM TABLE hashtagtable (hashtag STRING) USING file_stream OPTIONS (storagelevel 'MEMORY_AND_DISK_SER_2', rowConverter 'org.apache.spark.sql.streaming.TweetToHashtagRow',directory '/tmp/copiedtwitterdata')");
                snsc.sql("CREATE STREAM TABLE retweettable (retweetId LONG, retweetCnt INT, retweetTxt STRING) USING file_stream OPTIONS (storagelevel 'MEMORY_AND_DISK_SER_2', rowConverter 'org.apache.spark.sql.streaming.TweetToRetweetRow',directory '/tmp/copiedtwitterdata')");
            }
            SchemaDStream retweetStream = snsc.registerCQ("SELECT * FROM retweettable WINDOW (DURATION 2 SECONDS, SLIDE 2 SECONDS)");
            HashMap<String, String> topKOption = new HashMap<String, String>();
            topKOption.put("epoch", new Long(System.currentTimeMillis()).toString());
            topKOption.put("timeInterval", "2000ms");
            topKOption.put("size", "10");
            topKOption.put("basetable", "hashtagtable");
            snsc.snappyContext().createApproxTSTopK("topktable", "hashtag", schema, topKOption, false);
            String tableName = "retweetStore";
            snsc.snappyContext().dropTable("retweetStore", true);
            snsc.snappyContext().sql(String.format("CREATE TABLE %s (retweetId BIGINT PRIMARY KEY, retweetCnt INT, retweetTxt STRING) USING row OPTIONS ()", "retweetStore"));
            retweetStream.foreachDataFrame((VoidFunction)new VoidFunction<DataFrame>(){

                public void call(DataFrame df) {
                    df.write().mode(SaveMode.Append).saveAsTable("retweetStore");
                }
            });
            snsc.start();
            int runTime = jobConfig.hasPath("streamRunTime") ? Integer.parseInt(jobConfig.getString("streamRunTime")) * 1000 : 120000;
            long end = System.currentTimeMillis() + (long)runTime;
            while (end > System.currentTimeMillis()) {
                Thread.sleep(2000L);
                pw.println("\n******** Top 10 hash tags of last two seconds *******\n");
                result = snsc.snappyContext().queryApproxTSTopK("topktable", System.currentTimeMillis() - 2000L, System.currentTimeMillis()).collect();
                this.printResult(result, pw);
            }
            result = snsc.sql("SELECT * FROM topktable").collect();
            this.printResult(result, pw);
            pw.println("\n####### Top 10 popular tweets - Query Row table #######\n");
            result = snsc.snappyContext().sql("SELECT retweetId AS RetweetId, retweetCnt AS RetweetsCount, retweetTxt AS Text FROM retweetStore ORDER BY RetweetsCount DESC LIMIT 10").collect();
            this.printResult(result, pw);
            pw.println("\n#######################################################");
            success = true;
        }
        catch (IOException iOException) {
        }
        catch (InterruptedException interruptedException) {
        }
        finally {
            pw.close();
            snsc.stop(false, true);
        }
        return "See " + currentDirectory + "/" + outFileName;
    }

    private void printResult(Row[] result, PrintWriter pw) {
        for (Row row : result) {
            pw.println(row.toString());
        }
    }

    public JSparkJobValidation isValidJob(JavaSnappyStreamingContext snc, Config jobConfig) {
        return new JSparkJobValid();
    }
}

