/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.mantis.examples.wordcount;

import com.mantisrx.common.utils.JsonUtility;
import com.netflix.mantis.examples.config.StageConfigs;
import com.netflix.mantis.examples.core.WordCountPair;
import com.netflix.mantis.examples.wordcount.sources.TwitterSource;
import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.MantisJob;
import io.mantisrx.runtime.MantisJobProvider;
import io.mantisrx.runtime.Metadata;
import io.mantisrx.runtime.executor.LocalJobExecutorNetworked;
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.runtime.sink.SelfDocumentingSink;
import io.mantisrx.runtime.sink.Sinks;
import io.mantisrx.runtime.source.Source;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

public class TwitterJob
extends MantisJobProvider<String> {
    private static final Logger log = LoggerFactory.getLogger(TwitterJob.class);

    public Job<String> getJobInstance() {
        return MantisJob.source((Source)new TwitterSource()).stage((context, dataO) -> dataO.map(JsonUtility::jsonToMap).filter(eventMap -> {
            if (eventMap.containsKey("lang") && eventMap.containsKey("text")) {
                String lang = (String)eventMap.get("lang");
                return "en".equalsIgnoreCase(lang);
            }
            return false;
        }).map(eventMap -> (String)eventMap.get("text")).flatMap(text -> Observable.from(this.tokenize((String)text))).window(10L, TimeUnit.SECONDS).flatMap(wordCountPairObservable -> wordCountPairObservable.groupBy(WordCountPair::getWord).flatMap(groupO -> groupO.reduce((Object)0, (cnt, wordCntPair) -> cnt + 1).map(cnt -> new WordCountPair((String)groupO.getKey(), (int)cnt)))).map(WordCountPair::toString).doOnNext(cnt -> log.info(cnt)), StageConfigs.scalarToScalarConfig()).sink(Sinks.eagerSubscribe((SelfDocumentingSink)Sinks.sse(data -> data))).metadata(new Metadata.Builder().name("TwitterSample").description("Connects to a Twitter feed").build()).create();
    }

    private List<WordCountPair> tokenize(String text) {
        StringTokenizer tokenizer = new StringTokenizer(text);
        ArrayList<WordCountPair> wordCountPairs = new ArrayList<WordCountPair>();
        while (tokenizer.hasMoreTokens()) {
            String word = tokenizer.nextToken().replaceAll("\\s*", "").toLowerCase();
            wordCountPairs.add(new WordCountPair(word, 1));
        }
        return wordCountPairs;
    }

    public static void main(String[] args) {
        String consumerKey = null;
        String consumerSecret = null;
        String token = null;
        String tokenSecret = null;
        if (args.length != 4) {
            System.out.println("Usage: java com.netflix.mantis.examples.TwitterJob <consumerKey> <consumerSecret> <token> <tokenSecret");
            System.exit(0);
        } else {
            consumerKey = args[0].trim();
            consumerSecret = args[1].trim();
            token = args[2].trim();
            tokenSecret = args[3].trim();
        }
        LocalJobExecutorNetworked.execute(new TwitterJob().getJobInstance(), (Parameter[])new Parameter[]{new Parameter("consumerKey", consumerKey), new Parameter("consumerSecret", consumerSecret), new Parameter("token", token), new Parameter("tokenSecret", tokenSecret)});
    }
}

