package org.apache.streams.example;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.converter.ActivityConverterProcessor;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.elasticsearch.ElasticsearchPersistDeleter;
import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
import org.apache.streams.filters.VerbDefinitionDropFilter;
import org.apache.streams.filters.VerbDefinitionKeepFilter;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.local.LocalRuntimeConfiguration;
import org.apache.streams.local.builders.LocalStreamBuilder;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.twitter.TwitterStreamConfiguration;
import org.apache.streams.twitter.provider.TwitterStreamProvider;
import org.apache.streams.verbs.ObjectCombination;
import org.apache.streams.verbs.VerbDefinition;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsInstanceOf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/example/TwitterUserstreamElasticsearch.class */
public class TwitterUserstreamElasticsearch implements Runnable {
    public static final String STREAMS_ID = "TwitterUserstreamElasticsearch";
    private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserstreamElasticsearch.class);
    private static VerbDefinition deleteVerbDefinition = new VerbDefinition().withValue("delete").withObjects((List) Stream.of(new ObjectCombination()).collect(Collectors.toList()));
    private TwitterUserstreamElasticsearchConfiguration config;

    /* loaded from: input_file:org/apache/streams/example/TwitterUserstreamElasticsearch$SetDeleteIdProcessor.class */
    protected class SetDeleteIdProcessor implements StreamsProcessor {
        protected SetDeleteIdProcessor() {
        }

        public String getId() {
            return "TwitterUserstreamElasticsearch.SetDeleteIdProcessor";
        }

        public List<StreamsDatum> process(StreamsDatum streamsDatum) {
            MatcherAssert.assertThat(streamsDatum.getDocument(), IsInstanceOf.instanceOf(Activity.class));
            streamsDatum.setId(StringUtils.replace(streamsDatum.getId(), "delete", "post"));
            return (List) Stream.of(streamsDatum).collect(Collectors.toList());
        }

        public void prepare(Object obj) {
        }

        public void cleanUp() {
        }
    }

    public TwitterUserstreamElasticsearch() {
        this((TwitterUserstreamElasticsearchConfiguration) new ComponentConfigurator(TwitterUserstreamElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
    }

    public TwitterUserstreamElasticsearch(TwitterUserstreamElasticsearchConfiguration twitterUserstreamElasticsearchConfiguration) {
        this.config = twitterUserstreamElasticsearchConfiguration;
    }

    public static void main(String[] strArr) {
        LOGGER.info(StreamsConfigurator.config.toString());
        new Thread(new TwitterUserstreamElasticsearch()).start();
    }

    @Override // java.lang.Runnable
    public void run() {
        TwitterStreamConfiguration twitter = this.config.getTwitter();
        ElasticsearchWriterConfiguration elasticsearch = this.config.getElasticsearch();
        TwitterStreamProvider twitterStreamProvider = new TwitterStreamProvider(twitter);
        ActivityConverterProcessor activityConverterProcessor = new ActivityConverterProcessor();
        VerbDefinitionDropFilter verbDefinitionDropFilter = new VerbDefinitionDropFilter((Set) Stream.of(deleteVerbDefinition).collect(Collectors.toSet()));
        ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(elasticsearch);
        VerbDefinitionKeepFilter verbDefinitionKeepFilter = new VerbDefinitionKeepFilter((Set) Stream.of(deleteVerbDefinition).collect(Collectors.toSet()));
        SetDeleteIdProcessor setDeleteIdProcessor = new SetDeleteIdProcessor();
        ElasticsearchPersistDeleter elasticsearchPersistDeleter = new ElasticsearchPersistDeleter(elasticsearch);
        LocalStreamBuilder localStreamBuilder = new LocalStreamBuilder((LocalRuntimeConfiguration) StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class));
        localStreamBuilder.newPerpetualStream(TwitterStreamProvider.class.getCanonicalName(), twitterStreamProvider);
        localStreamBuilder.addStreamsProcessor(ActivityConverterProcessor.class.getCanonicalName(), activityConverterProcessor, 2, new String[]{TwitterStreamProvider.class.getCanonicalName()});
        localStreamBuilder.addStreamsProcessor(VerbDefinitionDropFilter.class.getCanonicalName(), verbDefinitionDropFilter, 1, new String[]{ActivityConverterProcessor.class.getCanonicalName()});
        localStreamBuilder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, new String[]{VerbDefinitionDropFilter.class.getCanonicalName()});
        localStreamBuilder.addStreamsProcessor(VerbDefinitionKeepFilter.class.getCanonicalName(), verbDefinitionKeepFilter, 1, new String[]{ActivityConverterProcessor.class.getCanonicalName()});
        localStreamBuilder.addStreamsProcessor(SetDeleteIdProcessor.class.getCanonicalName(), setDeleteIdProcessor, 1, new String[]{VerbDefinitionKeepFilter.class.getCanonicalName()});
        localStreamBuilder.addStreamsPersistWriter(ElasticsearchPersistDeleter.class.getCanonicalName(), elasticsearchPersistDeleter, 1, new String[]{SetDeleteIdProcessor.class.getCanonicalName()});
        localStreamBuilder.start();
    }
}
