/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.examples.streaming;

import com.twitter.algebird.HLL;
import com.twitter.algebird.HyperLogLog$;
import com.twitter.algebird.HyperLogLogMonoid;
import org.apache.spark.SparkConf;
import org.apache.spark.examples.streaming.StreamingExamples$;
import org.apache.spark.examples.streaming.TwitterAlgebirdHLL$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.storage.StorageLevel$;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.ReceiverInputDStream;
import org.apache.spark.streaming.twitter.TwitterUtils$;
import scala.Function1;
import scala.Function2;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import twitter4j.Status;

public final class TwitterAlgebirdHLL$ {
    public static final TwitterAlgebirdHLL$ MODULE$;

    static {
        new TwitterAlgebirdHLL$();
    }

    public void main(String[] args) {
        StreamingExamples$.MODULE$.setStreamingLogLevels();
        int BIT_SIZE = 12;
        String[] filters = args;
        SparkConf sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL");
        StreamingContext ssc = new StreamingContext(sparkConf, Seconds$.MODULE$.apply(5L));
        ReceiverInputDStream stream = TwitterUtils$.MODULE$.createStream(ssc, (Option)None$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])filters), StorageLevel$.MODULE$.MEMORY_ONLY_SER());
        DStream users = stream.map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final long apply(Status status) {
                return status.getUser().getId();
            }
        }, ClassTag$.MODULE$.Long());
        HyperLogLogMonoid hll = new HyperLogLogMonoid(BIT_SIZE);
        ObjectRef globalHll = new ObjectRef((Object)hll.zero());
        ObjectRef userSet = new ObjectRef((Object)((Set)Predef$.MODULE$.Set().apply((Seq)Nil$.MODULE$)));
        DStream approxUsers = users.mapPartitions((Function1)new Serializable(hll){
            public static final long serialVersionUID = 0L;
            public final HyperLogLogMonoid hll$1;

            public final Iterator<HLL> apply(Iterator<Object> ids) {
                return ids.map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ anonfun.2 $outer;

                    public final HLL apply(long id) {
                        return this.$outer.hll$1.apply((Object)BoxesRunTime.boxToLong((long)id), (Function1)new Serializable(this){
                            public static final long serialVersionUID = 0L;

                            public final byte[] apply(long i) {
                                return HyperLogLog$.MODULE$.long2Bytes(i);
                            }
                        });
                    }
                    {
                        if ($outer == null) {
                            throw new NullPointerException();
                        }
                        this.$outer = $outer;
                    }
                });
            }
            {
                this.hll$1 = hll$1;
            }
        }, users.mapPartitions$default$2(), ClassTag$.MODULE$.apply(HLL.class)).reduce((Function2)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final HLL apply(HLL x$1, HLL x$2) {
                return x$1.$plus(x$2);
            }
        });
        DStream exactUsers = users.map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Set<Object> apply(long id) {
                return (Set)Predef$.MODULE$.Set().apply((Seq)Predef$.MODULE$.wrapLongArray(new long[]{id}));
            }
        }, ClassTag$.MODULE$.apply(Set.class)).reduce((Function2)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Set<Object> apply(Set<Object> x$3, Set<Object> x$4) {
                return (Set)x$3.$plus$plus(x$4);
            }
        });
        approxUsers.foreachRDD((Function1)new Serializable(globalHll){
            public static final long serialVersionUID = 0L;
            private final ObjectRef globalHll$1;

            public final void apply(RDD<HLL> rdd) {
                if (rdd.count() != 0L) {
                    HLL partial = (HLL)rdd.first();
                    this.globalHll$1.elem = ((HLL)this.globalHll$1.elem).$plus(partial);
                    Predef$.MODULE$.println((Object)new StringOps(Predef$.MODULE$.augmentString("Approx distinct users this batch: %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)((int)partial.estimatedSize()))})));
                    Predef$.MODULE$.println((Object)new StringOps(Predef$.MODULE$.augmentString("Approx distinct users overall: %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)((int)((HLL)this.globalHll$1.elem).estimatedSize()))})));
                }
            }
            {
                this.globalHll$1 = globalHll$1;
            }
        });
        exactUsers.foreachRDD((Function1)new Serializable(globalHll, userSet){
            public static final long serialVersionUID = 0L;
            private final ObjectRef globalHll$1;
            private final ObjectRef userSet$1;

            public final void apply(RDD<Set<Object>> rdd) {
                if (rdd.count() != 0L) {
                    Set partial = (Set)rdd.first();
                    this.userSet$1.elem = (Set)((Set)this.userSet$1.elem).$plus$plus((GenTraversableOnce)partial);
                    Predef$.MODULE$.println((Object)new StringOps(Predef$.MODULE$.augmentString("Exact distinct users this batch: %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)partial.size())})));
                    Predef$.MODULE$.println((Object)new StringOps(Predef$.MODULE$.augmentString("Exact distinct users overall: %d")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)((Set)this.userSet$1.elem).size())})));
                    Predef$.MODULE$.println((Object)new StringOps(Predef$.MODULE$.augmentString("Error rate: %2.5f%%")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToDouble((double)((((HLL)this.globalHll$1.elem).estimatedSize() / (double)((Set)this.userSet$1.elem).size() - 1.0) * (double)100))})));
                }
            }
            {
                this.globalHll$1 = globalHll$1;
                this.userSet$1 = userSet$1;
            }
        });
        ssc.start();
        ssc.awaitTermination();
    }

    private TwitterAlgebirdHLL$() {
        MODULE$ = this;
    }
}

