/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.mantis.examples.wordcount;

import com.netflix.mantis.examples.core.WordCountPair;
import com.netflix.mantis.examples.wordcount.sources.TwitterSource;
import io.mantisrx.common.JsonSerializer;
import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.MantisJobProvider;
import io.mantisrx.runtime.Metadata;
import io.mantisrx.runtime.core.MantisStream;
import io.mantisrx.runtime.core.WindowSpec;
import io.mantisrx.runtime.core.functions.ReduceFunction;
import io.mantisrx.runtime.core.functions.ReduceFunctionImpl;
import io.mantisrx.runtime.core.sinks.ObservableSinkImpl;
import io.mantisrx.runtime.core.sinks.SinkFunction;
import io.mantisrx.runtime.core.sources.ObservableSourceImpl;
import io.mantisrx.runtime.core.sources.SourceFunction;
import io.mantisrx.runtime.executor.LocalJobExecutorNetworked;
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.runtime.sink.SelfDocumentingSink;
import io.mantisrx.runtime.sink.Sinks;
import io.mantisrx.runtime.source.Source;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TwitterDslJob
extends MantisJobProvider<String> {
    private static final Logger log = LoggerFactory.getLogger(TwitterDslJob.class);

    public Job<String> getJobInstance() {
        JsonSerializer jsonSerializer = new JsonSerializer();
        return MantisStream.create(null).source((SourceFunction)new ObservableSourceImpl((Source)new TwitterSource())).map(event -> {
            try {
                return jsonSerializer.toMap(event);
            }
            catch (Exception e) {
                log.error("Failed to deserialize event {}", event, (Object)e);
                return null;
            }
        }).filter(eventMap -> {
            if (eventMap.containsKey("lang") && eventMap.containsKey("text")) {
                String lang = (String)eventMap.get("lang");
                return "en".equalsIgnoreCase(lang);
            }
            return false;
        }).map(eventMap -> (String)eventMap.get("text")).flatMap(this::tokenize).keyBy(WordCountPair::getWord).window(WindowSpec.timed((Duration)Duration.ofSeconds(10L))).reduce((ReduceFunction)((ReduceFunctionImpl)(acc, item) -> {
            if (acc.getWord() != null && !acc.getWord().isEmpty() && !acc.getWord().equals(item.getWord())) {
                log.warn("keys dont match: acc ({}) vs item ({})", (Object)acc.getWord(), (Object)item.getWord());
            }
            return new WordCountPair(acc.getWord(), acc.getCount() + item.getCount());
        })).map(WordCountPair::toString).sink((SinkFunction)new ObservableSinkImpl(Sinks.eagerSubscribe((SelfDocumentingSink)Sinks.sse(data -> data)))).metadata(new Metadata.Builder().name("TwitterSample").description("Connects to a Twitter feed").build()).create();
    }

    private List<WordCountPair> tokenize(String text) {
        StringTokenizer tokenizer = new StringTokenizer(text);
        ArrayList<WordCountPair> wordCountPairs = new ArrayList<WordCountPair>();
        while (tokenizer.hasMoreTokens()) {
            String word = tokenizer.nextToken().replaceAll("\\s*", "").toLowerCase();
            wordCountPairs.add(new WordCountPair(word, 1));
        }
        return wordCountPairs;
    }

    public static void main(String[] args) {
        String consumerKey = null;
        String consumerSecret = null;
        String token = null;
        String tokenSecret = null;
        if (args.length != 4) {
            System.out.println("Usage: java com.netflix.mantis.examples.TwitterJob <consumerKey> <consumerSecret> <token> <tokenSecret");
            System.exit(0);
        } else {
            consumerKey = args[0].trim();
            consumerSecret = args[1].trim();
            token = args[2].trim();
            tokenSecret = args[3].trim();
        }
        LocalJobExecutorNetworked.execute(new TwitterDslJob().getJobInstance(), (Parameter[])new Parameter[]{new Parameter("consumerKey", consumerKey), new Parameter("consumerSecret", consumerSecret), new Parameter("token", token), new Parameter("tokenSecret", tokenSecret)});
    }
}

