/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.topologies.xslt;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.spout.Scheme;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import eu.europeana.cloud.service.dps.storm.EndBolt;
import eu.europeana.cloud.service.dps.storm.NotificationBolt;
import eu.europeana.cloud.service.dps.storm.ParseTaskBolt;
import eu.europeana.cloud.service.dps.storm.io.GrantPermissionsToFileBolt;
import eu.europeana.cloud.service.dps.storm.io.ReadFileBolt;
import eu.europeana.cloud.service.dps.storm.io.RemovePermissionsToFileBolt;
import eu.europeana.cloud.service.dps.storm.io.WriteRecordBolt;
import eu.europeana.cloud.service.dps.storm.topologies.eCloudAbstractTopology;
import eu.europeana.cloud.service.dps.storm.xslt.XsltBolt;
import java.util.Arrays;
import java.util.Map;
import kafka.api.OffsetRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;

public class XSLTTopology
extends eCloudAbstractTopology {
    private final BrokerHosts brokerHosts = new ZkHosts(topologyProperties.getProperty("INPUT_ZOOKEEPER_ADDRESS"));
    private static final String TOPOLOGY_PROPERTIES_FILE = "xslt-topology-config.properties";
    public static final Logger LOGGER = LoggerFactory.getLogger(XSLTTopology.class);

    public XSLTTopology(String defaultPropertyFile, String providedPropertyFile) {
        super(defaultPropertyFile, providedPropertyFile);
    }

    public StormTopology buildTopology(String dpsZkAddress, String xsltTopic, String ecloudMcsAddress, String username, String password) {
        ReadFileBolt retrieveFileBolt = new ReadFileBolt(ecloudMcsAddress, username, password);
        WriteRecordBolt writeRecordBolt = new WriteRecordBolt(ecloudMcsAddress, username, password);
        GrantPermissionsToFileBolt grantPermBolt = new GrantPermissionsToFileBolt(ecloudMcsAddress, username, password);
        RemovePermissionsToFileBolt removePermBolt = new RemovePermissionsToFileBolt(ecloudMcsAddress, username, password);
        SpoutConfig kafkaConfig = new SpoutConfig(this.brokerHosts, xsltTopic, "", "storm");
        kafkaConfig.scheme = new SchemeAsMultiScheme((Scheme)new StringScheme());
        kafkaConfig.forceFromStart = true;
        kafkaConfig.startOffsetTime = OffsetRequest.LatestTime();
        TopologyBuilder builder = new TopologyBuilder();
        KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
        builder.setSpout("kafkaReader", (IRichSpout)kafkaSpout, (Number)Integer.parseInt(topologyProperties.getProperty("KAFKA_SPOUT_PARALLEL"))).setNumTasks((Number)Integer.parseInt(topologyProperties.getProperty("NUMBER_OF_TASKS")));
        ((BoltDeclarer)builder.setBolt("parseKafkaInput", (IRichBolt)new ParseTaskBolt(), (Number)Integer.parseInt(topologyProperties.getProperty("PARSE_TASKS_BOLT_PARALLEL"))).setNumTasks((Number)Integer.parseInt(topologyProperties.getProperty("NUMBER_OF_TASKS")))).shuffleGrouping("kafkaReader");
        ((BoltDeclarer)builder.setBolt("retrieveFileBolt", (IRichBolt)retrieveFileBolt, (Number)Integer.parseInt(topologyProperties.getProperty("RETRIEVE_FILE_BOLT_PARALLEL"))).setNumTasks((Number)Integer.parseInt(topologyProperties.getProperty("NUMBER_OF_TASKS")))).shuffleGrouping("parseKafkaInput");
        ((BoltDeclarer)builder.setBolt("xsltTransformationBolt", (IRichBolt)new XsltBolt(), (Number)Integer.parseInt(topologyProperties.getProperty("XSLT_BOLT_PARALLEL"))).setNumTasks((Number)Integer.parseInt(topologyProperties.getProperty("NUMBER_OF_TASKS")))).shuffleGrouping("retrieveFileBolt");
        ((BoltDeclarer)builder.setBolt("writeRecordBolt", (IRichBolt)writeRecordBolt, (Number)Integer.parseInt(topologyProperties.getProperty("WRITE_BOLT_PARALLEL"))).setNumTasks((Number)Integer.parseInt(topologyProperties.getProperty("NUMBER_OF_TASKS")))).shuffleGrouping("xsltTransformationBolt");
        ((BoltDeclarer)builder.setBolt("grantPermBolt", (IRichBolt)grantPermBolt, (Number)Integer.parseInt(topologyProperties.getProperty("GRANT_BOLT_PARALLEL"))).setNumTasks((Number)Integer.parseInt(topologyProperties.getProperty("NUMBER_OF_TASKS")))).shuffleGrouping("writeRecordBolt");
        ((BoltDeclarer)builder.setBolt("removePermBolt", (IRichBolt)removePermBolt, (Number)Integer.parseInt(topologyProperties.getProperty("REMOVE_BOLT_PARALLEL"))).setNumTasks((Number)Integer.parseInt(topologyProperties.getProperty("NUMBER_OF_TASKS")))).shuffleGrouping("grantPermBolt");
        builder.setBolt("endBolt", (IRichBolt)new EndBolt(), (Number)Integer.parseInt(topologyProperties.getProperty("END_BOLT_PARALLEL"))).shuffleGrouping("removePermBolt");
        ((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)builder.setBolt("notificationBolt", (IRichBolt)new NotificationBolt(topologyProperties.getProperty("CASSANDRA_HOSTS"), Integer.parseInt(topologyProperties.getProperty("CASSANDRA_PORT")), topologyProperties.getProperty("CASSANDRA_KEYSPACE_NAME"), topologyProperties.getProperty("CASSANDRA_USERNAME"), topologyProperties.getProperty("CASSANDRA_PASSWORD"), Boolean.valueOf(true)), (Number)Integer.parseInt(topologyProperties.getProperty("NOTIFICATION_BOLT_PARALLEL"))).fieldsGrouping("parseKafkaInput", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("retrieveFileBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("xsltTransformationBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("writeRecordBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("grantPermBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("removePermBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("endBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}));
        return builder.createTopology();
    }

    public static void main(String[] args) throws Exception {
        Config config = new Config();
        config.put((Object)"topology.trident.batch.emit.interval.millis", (Object)2000);
        if (args.length <= 1) {
            String topologyName;
            String providedPropertyFile = "";
            if (args.length == 1) {
                providedPropertyFile = args[0];
            }
            XSLTTopology XsltTopology = new XSLTTopology(TOPOLOGY_PROPERTIES_FILE, providedPropertyFile);
            String kafkaTopic = topologyName = topologyProperties.getProperty("TOPOLOGY_NAME");
            String ecloudMcsAddress = topologyProperties.getProperty("MCS_URL");
            String username = topologyProperties.getProperty("MCS_USER_NAME");
            String password = topologyProperties.getProperty("MCS_USER_PASS");
            StormTopology stormTopology = XsltTopology.buildTopology(topologyProperties.getProperty("INPUT_ZOOKEEPER_ADDRESS"), kafkaTopic, ecloudMcsAddress, username, password);
            config.setNumWorkers(Integer.parseInt(topologyProperties.getProperty("WORKER_COUNT")));
            config.setMaxTaskParallelism(Integer.parseInt(topologyProperties.getProperty("MAX_TASK_PARALLELISM")));
            config.put((Object)"nimbus.thrift.port", (Object)Integer.parseInt(topologyProperties.getProperty("THRIFT_PORT")));
            config.put((Object)topologyProperties.getProperty("INPUT_ZOOKEEPER_ADDRESS"), (Object)topologyProperties.getProperty("INPUT_ZOOKEEPER_PORT"));
            config.put((Object)"nimbus.host", (Object)"localhost");
            config.put((Object)"storm.zookeeper.servers", Arrays.asList(topologyProperties.getProperty("INPUT_ZOOKEEPER_ADDRESS")));
            StormSubmitter.submitTopology((String)topologyName, (Map)config, (StormTopology)stormTopology);
        }
    }
}

