/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.topologies.xslt;

import eu.europeana.cloud.service.dps.storm.NotificationBolt;
import eu.europeana.cloud.service.dps.storm.io.AddResultToDataSetBolt;
import eu.europeana.cloud.service.dps.storm.io.ReadFileBolt;
import eu.europeana.cloud.service.dps.storm.io.RevisionWriterBolt;
import eu.europeana.cloud.service.dps.storm.io.WriteRecordBolt;
import eu.europeana.cloud.service.dps.storm.topologies.properties.PropertyFileLoader;
import eu.europeana.cloud.service.dps.storm.topologies.xslt.bolt.XsltBolt;
import eu.europeana.cloud.service.dps.storm.utils.TopologyHelper;
import eu.europeana.cloud.service.dps.storm.utils.TopologyPropertiesValidator;
import eu.europeana.cloud.service.dps.storm.utils.TopologySubmitter;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.storm.Config;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.grouping.ShuffleGrouping;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class XSLTTopology {
    private static final Properties topologyProperties = new Properties();
    private static final String TOPOLOGY_PROPERTIES_FILE = "xslt-topology-config.properties";
    private static final Logger LOGGER = LoggerFactory.getLogger(XSLTTopology.class);

    public XSLTTopology(String defaultPropertyFile, String providedPropertyFile) {
        PropertyFileLoader.loadPropertyFile((String)defaultPropertyFile, (String)providedPropertyFile, (Properties)topologyProperties);
        TopologyPropertiesValidator.validateFor((String)"xslt_topology", (Properties)topologyProperties);
    }

    public StormTopology buildTopology() {
        TopologyBuilder builder = new TopologyBuilder();
        List spoutNames = TopologyHelper.addSpouts((TopologyBuilder)builder, (String)"xslt_topology", (Properties)topologyProperties);
        String mcsServer = topologyProperties.getProperty("MCS_URL");
        ReadFileBolt readFileBolt = new ReadFileBolt(mcsServer);
        WriteRecordBolt writeRecordBolt = new WriteRecordBolt(mcsServer);
        RevisionWriterBolt revisionWriterBolt = new RevisionWriterBolt(mcsServer);
        TopologyHelper.addSpoutShuffleGrouping((List)spoutNames, (BoltDeclarer)((BoltDeclarer)builder.setBolt("retrieveFileBolt", (IRichBolt)readFileBolt, (Number)XSLTTopology.getAnInt("RETRIEVE_FILE_BOLT_PARALLEL")).setNumTasks((Number)XSLTTopology.getAnInt("RETRIEVE_FILE_BOLT_NUMBER_OF_TASKS"))));
        ((BoltDeclarer)builder.setBolt("XSLT_BOLT", (IRichBolt)new XsltBolt(), (Number)XSLTTopology.getAnInt("XSLT_BOLT_PARALLEL")).setNumTasks((Number)XSLTTopology.getAnInt("XSLT_BOLT_NUMBER_OF_TASKS"))).customGrouping("retrieveFileBolt", (CustomStreamGrouping)new ShuffleGrouping());
        ((BoltDeclarer)builder.setBolt("writeRecordBolt", (IRichBolt)writeRecordBolt, (Number)XSLTTopology.getAnInt("WRITE_BOLT_PARALLEL")).setNumTasks((Number)XSLTTopology.getAnInt("WRITE_BOLT_NUMBER_OF_TASKS"))).customGrouping("XSLT_BOLT", (CustomStreamGrouping)new ShuffleGrouping());
        ((BoltDeclarer)builder.setBolt("revisionWriterBolt", (IRichBolt)revisionWriterBolt, (Number)XSLTTopology.getAnInt("REVISION_WRITER_BOLT_PARALLEL")).setNumTasks((Number)XSLTTopology.getAnInt("REVISION_WRITER_BOLT_NUMBER_OF_TASKS"))).customGrouping("writeRecordBolt", (CustomStreamGrouping)new ShuffleGrouping());
        ((BoltDeclarer)builder.setBolt("writeToDataSetBolt", (IRichBolt)new AddResultToDataSetBolt(mcsServer), (Number)XSLTTopology.getAnInt("ADD_TO_DATASET_BOLT_PARALLEL")).setNumTasks((Number)XSLTTopology.getAnInt("ADD_TO_DATASET_BOLT_NUMBER_OF_TASKS"))).customGrouping("revisionWriterBolt", (CustomStreamGrouping)new ShuffleGrouping());
        TopologyHelper.addSpoutsGroupingToNotificationBolt((List)spoutNames, (BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)builder.setBolt("notificationBolt", (IRichBolt)new NotificationBolt(topologyProperties.getProperty("CASSANDRA_HOSTS"), XSLTTopology.getAnInt("CASSANDRA_PORT"), topologyProperties.getProperty("CASSANDRA_KEYSPACE_NAME"), topologyProperties.getProperty("CASSANDRA_USERNAME"), topologyProperties.getProperty("CASSANDRA_PASSWORD")), (Number)XSLTTopology.getAnInt("NOTIFICATION_BOLT_PARALLEL")).setNumTasks((Number)XSLTTopology.getAnInt("NOTIFICATION_BOLT_NUMBER_OF_TASKS"))).fieldsGrouping("retrieveFileBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("XSLT_BOLT", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("writeRecordBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("revisionWriterBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("writeToDataSetBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))));
        return builder.createTopology();
    }

    private static int getAnInt(String parseTasksBoltParallel) {
        return Integer.parseInt(topologyProperties.getProperty(parseTasksBoltParallel));
    }

    public static void main(String[] args) {
        try {
            LOGGER.info("Assembling '{}'", (Object)"xslt_topology");
            if (args.length <= 1) {
                String providedPropertyFile = args.length == 1 ? args[0] : "";
                XSLTTopology xsltTopology = new XSLTTopology(TOPOLOGY_PROPERTIES_FILE, providedPropertyFile);
                StormTopology stormTopology = xsltTopology.buildTopology();
                Config config = TopologyHelper.buildConfig((Properties)topologyProperties);
                LOGGER.info("Submitting '{}'...", (Object)topologyProperties.getProperty("TOPOLOGY_NAME"));
                TopologySubmitter.submitTopology((String)topologyProperties.getProperty("TOPOLOGY_NAME"), (Map)config, (StormTopology)stormTopology);
            } else {
                LOGGER.error("Invalid number of parameters");
            }
        }
        catch (Exception e) {
            LOGGER.error("General error while setting up topology", (Throwable)e);
        }
    }
}

