/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.examples.snappydata;

import java.io.File;
import java.sql.Connection;
import java.sql.PreparedStatement;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.examples.snappydata.EmbeddedKafkaUtils;
import org.apache.spark.jdbc.ConnectionConf;
import org.apache.spark.jdbc.ConnectionConfBuilder;
import org.apache.spark.jdbc.ConnectionUtil$;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.streaming.SchemaDStream;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.SnappyStreamingContext;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.util.Random$;

public final class StreamingExample$ {
    public static final StreamingExample$ MODULE$;

    static {
        new StreamingExample$();
    }

    public void main(String[] args) {
        Logger.getLogger((String)"org").setLevel(Level.ERROR);
        Logger.getLogger((String)"akka").setLevel(Level.ERROR);
        String dataDirAbsolutePath = this.createAndGetDataDir();
        Predef$.MODULE$.println((Object)"Initializing a SnappyStreamingContext");
        SparkSession spark = SparkSession$.MODULE$.builder().appName(this.getClass().getSimpleName()).master("local[*]").config("snappydata.store.sys-disk-dir", dataDirAbsolutePath).config("snappydata.store.log-file", new StringBuilder().append((Object)dataDirAbsolutePath).append((Object)"/SnappyDataExample.log").toString()).getOrCreate();
        SnappyStreamingContext snsc = new SnappyStreamingContext(spark.sparkContext(), Seconds$.MODULE$.apply(1L));
        Predef$.MODULE$.println();
        Predef$.MODULE$.println((Object)"Initializing embedded Kafka");
        EmbeddedKafkaUtils utils = new EmbeddedKafkaUtils();
        utils.setup();
        String topic = "kafka_topic";
        utils.createTopic(topic);
        String add = utils.brokerAddress();
        Predef$.MODULE$.println();
        Predef$.MODULE$.println((Object)"Creating a stream table to read data from Kafka");
        snsc.sql("drop table if exists adImpressionStream");
        snsc.sql(new StringBuilder().append((Object)"create stream table adImpressionStream ( time_stamp timestamp, publisher string, advertiser string, website string, geo string, bid double, cookie string)  using directkafka_stream options( rowConverter 'org.apache.spark.examples.snappydata.RowsConverter',").append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{" kafkaParams 'metadata.broker.list->", ";auto.offset.reset->smallest',"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{add}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{" topics '", "')"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{topic}))).toString());
        snsc.sql("create table publisher_bid_counts(publisher string, bidCount int) using row");
        snsc.sql("insert into publisher_bid_counts values('publisher1', 0)");
        snsc.sql("insert into publisher_bid_counts values('publisher2', 0)");
        snsc.sql("insert into publisher_bid_counts values('publisher3', 0)");
        snsc.sql("insert into publisher_bid_counts values('publisher4', 0)");
        Predef$.MODULE$.println();
        Predef$.MODULE$.println((Object)"Registering a continuous query to to be executed every second on the stream table");
        SchemaDStream resultStream = snsc.registerCQ("select publisher, count(bid) as bidCount from adImpressionStream window (duration 1 seconds, slide 1 seconds) group by publisher");
        ConnectionConf conf = new ConnectionConfBuilder(snsc.snappySession()).build();
        Predef$.MODULE$.println();
        resultStream.foreachDataFrame((Function1)new Serializable(conf){
            public static final long serialVersionUID = 0L;
            private final ConnectionConf conf$1;

            public final void apply(Dataset<Row> df) {
                if (df.count() > 0L) {
                    Predef$.MODULE$.println((Object)"Data received in streaming window");
                    df.show();
                    Predef$.MODULE$.println((Object)"Updating table publisher_bid_counts");
                    Connection conn = ConnectionUtil$.MODULE$.getConnection(this.conf$1);
                    Row[] result = (Row[])df.collect();
                    PreparedStatement stmt = conn.prepareStatement(new StringBuilder().append((Object)"update publisher_bid_counts set ").append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"bidCount = bidCount + ? where publisher = ?"})).s((Seq)Nil$.MODULE$)).toString());
                    Predef$.MODULE$.refArrayOps((Object[])result).foreach((Function1)new Serializable(this, stmt){
                        public static final long serialVersionUID = 0L;
                        private final PreparedStatement stmt$1;

                        public final void apply(Row row) {
                            String publisher = row.getString(0);
                            long bidCount = row.getLong(1);
                            this.stmt$1.setLong(1, bidCount);
                            this.stmt$1.setString(2, publisher);
                            this.stmt$1.addBatch();
                        }
                        {
                            this.stmt$1 = stmt$1;
                        }
                    });
                    stmt.executeBatch();
                    conn.close();
                } else {
                    Predef$.MODULE$.println((Object)"No data received in streaming window");
                }
            }
            {
                this.conf$1 = conf$1;
            }
        });
        snsc.start();
        Predef$.MODULE$.println((Object)"Publishing messages on Kafka");
        this.publishKafkaMessages(utils, topic);
        Thread.sleep(3000L);
        Predef$.MODULE$.println((Object)"***Total no of bids per publisher are***");
        snsc.snappySession().sql("select publisher, bidCount from publisher_bid_counts").show();
        Predef$.MODULE$.println((Object)"Exiting");
        snsc.stop(false);
        utils.shutdown();
        System.exit(0);
    }

    /*
     * WARNING - void declaration
     */
    public String createAndGetDataDir() {
        void var2_2;
        String dataDir = "./snappydata_examples_data";
        new File(dataDir).mkdir();
        String dataDirAbsolutePath = new File(dataDir).getAbsolutePath();
        return var2_2;
    }

    public void publishKafkaMessages(EmbeddedKafkaUtils utils, String topic) {
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 10).foreach$mVc$sp((Function1)new Serializable(utils, topic){
            public static final long serialVersionUID = 0L;
            private final EmbeddedKafkaUtils utils$1;
            private final String topic$1;

            public final void apply(int i) {
                this.apply$mcVI$sp(i);
            }

            public void apply$mcVI$sp(int i) {
                long currentTime = System.currentTimeMillis();
                String bid1 = new StringBuilder().append(currentTime).append((Object)",publisher1,advt1,pb1.web,US,").append((Object)BoxesRunTime.boxToDouble((double)Random$.MODULE$.nextDouble())).append((Object)",23543").toString();
                String bid2 = new StringBuilder().append(currentTime).append((Object)",publisher2,advt1,pb1.web,US,").append((Object)BoxesRunTime.boxToDouble((double)Random$.MODULE$.nextDouble())).append((Object)",45445").toString();
                String bid3 = new StringBuilder().append(currentTime).append((Object)",publisher3,advt2,pb1.web,US,").append((Object)BoxesRunTime.boxToDouble((double)Random$.MODULE$.nextDouble())).append((Object)",13434").toString();
                String bid4 = new StringBuilder().append(currentTime).append((Object)",publisher4,advt2,pb1.web,US,").append((Object)BoxesRunTime.boxToDouble((double)Random$.MODULE$.nextDouble())).append((Object)",34324").toString();
                String bid5 = new StringBuilder().append(currentTime).append((Object)",publisher2,advt1,pb1.web,US,").append((Object)BoxesRunTime.boxToDouble((double)Random$.MODULE$.nextDouble())).append((Object)",23233").toString();
                String bid6 = new StringBuilder().append(currentTime).append((Object)",publisher4,advt2,pb1.web,US,").append((Object)BoxesRunTime.boxToDouble((double)Random$.MODULE$.nextDouble())).append((Object)",43545").toString();
                this.utils$1.sendMessages(this.topic$1, (String[])((Object[])new String[]{bid1, bid2, bid3, bid4, bid5, bid6}));
                Predef$.MODULE$.println((Object)"Published message containing 6 rows");
                Thread.sleep(1000L);
            }
            {
                this.utils$1 = utils$1;
                this.topic$1 = topic$1;
            }
        });
        Predef$.MODULE$.println((Object)"Done publishing all messages");
    }

    private StreamingExample$() {
        MODULE$ = this;
    }
}

