/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.topologies.xslt;

import eu.europeana.cloud.service.dps.storm.NotificationBolt;
import eu.europeana.cloud.service.dps.storm.ParseTaskBolt;
import eu.europeana.cloud.service.dps.storm.io.AddResultToDataSetBolt;
import eu.europeana.cloud.service.dps.storm.io.ReadDatasetBolt;
import eu.europeana.cloud.service.dps.storm.io.ReadDatasetsBolt;
import eu.europeana.cloud.service.dps.storm.io.ReadFileBolt;
import eu.europeana.cloud.service.dps.storm.io.ReadRepresentationBolt;
import eu.europeana.cloud.service.dps.storm.io.RevisionWriterBolt;
import eu.europeana.cloud.service.dps.storm.io.WriteRecordBolt;
import eu.europeana.cloud.service.dps.storm.topologies.properties.PropertyFileLoader;
import eu.europeana.cloud.service.dps.storm.xslt.XsltBolt;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import kafka.api.OffsetRequest;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.Scheme;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class XSLTTopology {
    private static Properties topologyProperties;
    private final BrokerHosts brokerHosts;
    private static final String TOPOLOGY_PROPERTIES_FILE = "xslt-topology-config.properties";
    private final String DATASET_STREAM = "DATASET_URLS";
    private final String FILE_STREAM = "FILE_URLS";

    public XSLTTopology(String defaultPropertyFile, String providedPropertyFile) {
        topologyProperties = new Properties();
        PropertyFileLoader.loadPropertyFile((String)defaultPropertyFile, (String)providedPropertyFile, (Properties)topologyProperties);
        this.brokerHosts = new ZkHosts(topologyProperties.getProperty("INPUT_ZOOKEEPER_ADDRESS"));
    }

    public StormTopology buildTopology(String xsltTopic, String ecloudMcsAddress) {
        SpoutConfig kafkaConfig = new SpoutConfig(this.brokerHosts, xsltTopic, "", "storm");
        kafkaConfig.scheme = new SchemeAsMultiScheme((Scheme)new StringScheme());
        kafkaConfig.startOffsetTime = OffsetRequest.LatestTime();
        KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
        TopologyBuilder builder = new TopologyBuilder();
        HashMap<String, String> routingRules = new HashMap<String, String>();
        routingRules.put("FILE_URLS", "DATASET_URLS");
        routingRules.put("DATASET_URLS", "FILE_URLS");
        ReadFileBolt retrieveFileBolt = new ReadFileBolt(ecloudMcsAddress);
        WriteRecordBolt writeRecordBolt = new WriteRecordBolt(ecloudMcsAddress);
        RevisionWriterBolt revisionWriterBolt = new RevisionWriterBolt(ecloudMcsAddress);
        builder.setSpout("spout", (IRichSpout)kafkaSpout, (Number)Integer.parseInt(topologyProperties.getProperty("KAFKA_SPOUT_PARALLEL"))).setNumTasks((Number)Integer.parseInt(topologyProperties.getProperty("KAFKA_SPOUT_NUMBER_OF_TASKS")));
        ((BoltDeclarer)builder.setBolt("parseTaskBolt", (IRichBolt)new ParseTaskBolt(routingRules), (Number)Integer.parseInt(topologyProperties.getProperty("PARSE_TASKS_BOLT_PARALLEL"))).setNumTasks((Number)Integer.parseInt(topologyProperties.getProperty("PARSE_TASKS_BOLT_NUMBER_OF_TASKS")))).shuffleGrouping("spout");
        ((BoltDeclarer)builder.setBolt("readDatasetsBolt", (IRichBolt)new ReadDatasetsBolt(), (Number)Integer.parseInt(topologyProperties.getProperty("READ_DATASETS_BOLT_PARALLEL"))).setNumTasks((Number)Integer.parseInt(topologyProperties.getProperty("READ_DATASETS_BOLT_NUMBER_OF_TASKS")))).shuffleGrouping("parseTaskBolt", "DATASET_URLS");
        ((BoltDeclarer)builder.setBolt("readDatasetBolt", (IRichBolt)new ReadDatasetBolt(ecloudMcsAddress), (Number)Integer.parseInt(topologyProperties.getProperty("READ_DATASET_BOLT_PARALLEL"))).setNumTasks((Number)Integer.parseInt(topologyProperties.getProperty("READ_DATASET_BOLT_NUMBER_OF_TASKS")))).shuffleGrouping("readDatasetsBolt");
        ((BoltDeclarer)builder.setBolt("readRepresentationBolt", (IRichBolt)new ReadRepresentationBolt(ecloudMcsAddress), (Number)Integer.parseInt(topologyProperties.getProperty("READ_REPRESENTATION_BOLT_PARALLEL"))).setNumTasks((Number)Integer.parseInt(topologyProperties.getProperty("READ_REPRESENTATION_BOLT_NUMBER_OF_TASKS")))).shuffleGrouping("readDatasetBolt");
        ((BoltDeclarer)((BoltDeclarer)builder.setBolt("retrieveFileBolt", (IRichBolt)retrieveFileBolt, (Number)Integer.parseInt(topologyProperties.getProperty("RETRIEVE_FILE_BOLT_PARALLEL"))).setNumTasks((Number)Integer.parseInt(topologyProperties.getProperty("RETRIEVE_FILE_BOLT_NUMBER_OF_TASKS")))).shuffleGrouping("parseTaskBolt", "FILE_URLS")).shuffleGrouping("readRepresentationBolt");
        ((BoltDeclarer)builder.setBolt("XSLT_BOLT", (IRichBolt)new XsltBolt(), (Number)Integer.parseInt(topologyProperties.getProperty("XSLT_BOLT_PARALLEL"))).setNumTasks((Number)Integer.parseInt(topologyProperties.getProperty("XSLT_BOLT_NUMBER_OF_TASKS")))).shuffleGrouping("retrieveFileBolt");
        ((BoltDeclarer)builder.setBolt("writeRecordBolt", (IRichBolt)writeRecordBolt, (Number)Integer.parseInt(topologyProperties.getProperty("WRITE_BOLT_PARALLEL"))).setNumTasks((Number)Integer.parseInt(topologyProperties.getProperty("WRITE_BOLT_NUMBER_OF_TASKS")))).shuffleGrouping("XSLT_BOLT");
        ((BoltDeclarer)builder.setBolt("revisionWriterBolt", (IRichBolt)revisionWriterBolt, (Number)Integer.parseInt(topologyProperties.getProperty("REVISION_WRITER_BOLT_PARALLEL"))).setNumTasks((Number)Integer.parseInt(topologyProperties.getProperty("Revision_WRITER_BOLT_NUMBER_OF_TASKS")))).shuffleGrouping("writeRecordBolt");
        AddResultToDataSetBolt addResultToDataSetBolt = new AddResultToDataSetBolt(ecloudMcsAddress);
        ((BoltDeclarer)builder.setBolt("writeToDataSetBolt", (IRichBolt)addResultToDataSetBolt, (Number)Integer.parseInt(topologyProperties.getProperty("ADD_TO_DATASET_BOLT_PARALLEL"))).setNumTasks((Number)Integer.parseInt(topologyProperties.getProperty("ADD_TO_DATASET_BOLT_NUMBER_OF_TASKS")))).shuffleGrouping("revisionWriterBolt");
        ((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)builder.setBolt("notificationBolt", (IRichBolt)new NotificationBolt(topologyProperties.getProperty("CASSANDRA_HOSTS"), Integer.parseInt(topologyProperties.getProperty("CASSANDRA_PORT")), topologyProperties.getProperty("CASSANDRA_KEYSPACE_NAME"), topologyProperties.getProperty("CASSANDRA_USERNAME"), topologyProperties.getProperty("CASSANDRA_PASSWORD")), (Number)Integer.parseInt(topologyProperties.getProperty("NOTIFICATION_BOLT_PARALLEL"))).setNumTasks((Number)Integer.parseInt(topologyProperties.getProperty("NOTIFICATION_BOLT_NUMBER_OF_TASKS")))).fieldsGrouping("parseTaskBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("retrieveFileBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("readDatasetsBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("readDatasetBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("readRepresentationBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("XSLT_BOLT", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("writeRecordBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("revisionWriterBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("writeToDataSetBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}));
        return builder.createTopology();
    }

    public static void main(String[] args) throws Exception {
        Config config = new Config();
        config.put((Object)"topology.trident.batch.emit.interval.millis", (Object)2000);
        if (args.length <= 1) {
            String topologyName;
            String providedPropertyFile = "";
            if (args.length == 1) {
                providedPropertyFile = args[0];
            }
            XSLTTopology XsltTopology = new XSLTTopology(TOPOLOGY_PROPERTIES_FILE, providedPropertyFile);
            String kafkaTopic = topologyName = topologyProperties.getProperty("TOPOLOGY_NAME");
            String ecloudMcsAddress = topologyProperties.getProperty("MCS_URL");
            StormTopology stormTopology = XsltTopology.buildTopology(kafkaTopic, ecloudMcsAddress);
            config.setNumWorkers(Integer.parseInt(topologyProperties.getProperty("WORKER_COUNT")));
            config.setMaxTaskParallelism(Integer.parseInt(topologyProperties.getProperty("MAX_TASK_PARALLELISM")));
            config.put((Object)"nimbus.thrift.port", (Object)Integer.parseInt(topologyProperties.getProperty("THRIFT_PORT")));
            config.put((Object)topologyProperties.getProperty("INPUT_ZOOKEEPER_ADDRESS"), (Object)topologyProperties.getProperty("INPUT_ZOOKEEPER_PORT"));
            config.put((Object)"nimbus.seeds", Arrays.asList(topologyProperties.getProperty("NIMBUS_SEEDS")));
            config.put((Object)"storm.zookeeper.servers", Arrays.asList(topologyProperties.getProperty("STORM_ZOOKEEPER_ADDRESS")));
            StormSubmitter.submitTopology((String)topologyName, (Map)config, (StormTopology)stormTopology);
        }
    }
}

