/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.mantis.examples.wordcount.sources;

import com.netflix.mantis.examples.core.ObservableQueue;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.StringParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import io.mantisrx.runtime.source.Index;
import io.mantisrx.runtime.source.Source;
import io.mantisrx.shaded.com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import rx.Observable;

public class TwitterSource
implements Source<String> {
    public static final String CONSUMER_KEY_PARAM = "consumerKey";
    public static final String CONSUMER_SECRET_PARAM = "consumerSecret";
    public static final String TOKEN_PARAM = "token";
    public static final String TOKEN_SECRET_PARAM = "tokenSecret";
    public static final String TERMS_PARAM = "terms";
    private final ObservableQueue<String> twitterObservable = new ObservableQueue();
    private transient BasicClient client;

    public Observable<Observable<String>> call(Context context, Index index) {
        return Observable.just(this.twitterObservable.observe());
    }

    public List<ParameterDefinition<?>> getParameters() {
        ArrayList params = Lists.newArrayList();
        params.add(new StringParameter().name(CONSUMER_KEY_PARAM).description("twitter consumer key").validator(Validators.notNullOrEmpty()).required().build());
        params.add(new StringParameter().name(CONSUMER_SECRET_PARAM).description("twitter consumer secret").validator(Validators.notNullOrEmpty()).required().build());
        params.add(new StringParameter().name(TOKEN_PARAM).description("twitter token").validator(Validators.notNullOrEmpty()).required().build());
        params.add(new StringParameter().name(TOKEN_SECRET_PARAM).description("twitter token secret").validator(Validators.notNullOrEmpty()).required().build());
        params.add(new StringParameter().name(TERMS_PARAM).description("terms to follow").validator(Validators.notNullOrEmpty()).defaultValue((Object)"Netflix,Dark").build());
        return params;
    }

    public void init(Context context, Index index) {
        String consumerKey = (String)context.getParameters().get(CONSUMER_KEY_PARAM);
        String consumerSecret = (String)context.getParameters().get(CONSUMER_SECRET_PARAM);
        String token = (String)context.getParameters().get(TOKEN_PARAM);
        String tokenSecret = (String)context.getParameters().get(TOKEN_SECRET_PARAM);
        String terms = (String)context.getParameters().get(TERMS_PARAM);
        OAuth1 auth = new OAuth1(consumerKey, consumerSecret, token, tokenSecret);
        StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
        String[] termArray = terms.split(",");
        List<String> termsList = Arrays.asList(termArray);
        endpoint.trackTerms(termsList);
        this.client = new ClientBuilder().name("twitter-source").hosts("https://stream.twitter.com").endpoint((StreamingEndpoint)endpoint).authentication((Authentication)auth).processor((HosebirdMessageProcessor)new StringDelimitedProcessor(this.twitterObservable)).build();
        this.client.connect();
    }

    public void close() throws IOException {
        this.client.stop();
    }
}

