/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.mantis.examples.wordcount;

import com.netflix.mantis.examples.core.WordCountPair;
import com.netflix.mantis.examples.wordcount.sources.IlliadSource;
import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.MantisJobProvider;
import io.mantisrx.runtime.Metadata;
import io.mantisrx.runtime.core.MantisStream;
import io.mantisrx.runtime.core.WindowSpec;
import io.mantisrx.runtime.core.functions.ReduceFunction;
import io.mantisrx.runtime.core.functions.ReduceFunctionImpl;
import io.mantisrx.runtime.core.sinks.ObservableSinkImpl;
import io.mantisrx.runtime.core.sinks.SinkFunction;
import io.mantisrx.runtime.core.sources.ObservableSourceImpl;
import io.mantisrx.runtime.core.sources.SourceFunction;
import io.mantisrx.runtime.executor.LocalJobExecutorNetworked;
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.runtime.sink.SelfDocumentingSink;
import io.mantisrx.runtime.sink.Sinks;
import io.mantisrx.runtime.source.Source;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WordCountDslJob
extends MantisJobProvider<String> {
    private static final Logger log = LoggerFactory.getLogger(WordCountDslJob.class);

    public Job<String> getJobInstance() {
        return MantisStream.create(null).source((SourceFunction)new ObservableSourceImpl((Source)new IlliadSource())).flatMap(this::tokenize).map(x -> {
            try {
                Thread.sleep(0L, 10000);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            return x;
        }).keyBy(WordCountPair::getWord).window(WindowSpec.timed((Duration)Duration.ofSeconds(10L))).reduce((ReduceFunction)((ReduceFunctionImpl)(acc, item) -> {
            if (acc.getWord() != null && !acc.getWord().isEmpty() && !acc.getWord().equals(item.getWord())) {
                log.warn("keys dont match: acc ({}) vs item ({})", (Object)acc.getWord(), (Object)item.getWord());
            }
            return new WordCountPair(acc.getWord(), acc.getCount() + item.getCount());
        })).map(WordCountPair::toString).sink((SinkFunction)new ObservableSinkImpl(Sinks.eagerSubscribe((SelfDocumentingSink)Sinks.sse(data -> data)))).metadata(new Metadata.Builder().name("WordCount").description("Reads Homer's The Illiad faster than we can.").build()).create();
    }

    private List<WordCountPair> tokenize(String text) {
        StringTokenizer tokenizer = new StringTokenizer(text);
        ArrayList<WordCountPair> wordCountPairs = new ArrayList<WordCountPair>();
        while (tokenizer.hasMoreTokens()) {
            String word = tokenizer.nextToken().replaceAll("\\s*", "").toLowerCase();
            wordCountPairs.add(new WordCountPair(word, 1));
        }
        return wordCountPairs;
    }

    public static void main(String[] args) {
        LocalJobExecutorNetworked.execute(new WordCountDslJob().getJobInstance(), (Parameter[])new Parameter[0]);
    }
}

