/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.normalization;

import com.google.common.base.Throwables;
import eu.europeana.cloud.normalization.bolts.NormalizationBolt;
import eu.europeana.cloud.service.dps.storm.NotificationBolt;
import eu.europeana.cloud.service.dps.storm.io.AddResultToDataSetBolt;
import eu.europeana.cloud.service.dps.storm.io.ReadFileBolt;
import eu.europeana.cloud.service.dps.storm.io.RevisionWriterBolt;
import eu.europeana.cloud.service.dps.storm.io.WriteRecordBolt;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.MCSReaderSpout;
import eu.europeana.cloud.service.dps.storm.topologies.properties.PropertyFileLoader;
import eu.europeana.cloud.service.dps.storm.utils.TopologyHelper;
import java.util.Map;
import java.util.Properties;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.grouping.ShuffleGrouping;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NormalizationTopology {
    private static final Logger LOGGER = LoggerFactory.getLogger(NormalizationTopology.class);
    private static final String TOPOLOGY_PROPERTIES_FILE = "normalization-topology-config.properties";
    private static Properties topologyProperties;

    public NormalizationTopology(String defaultPropertyFile, String providedPropertyFile) {
        topologyProperties = new Properties();
        PropertyFileLoader.loadPropertyFile((String)defaultPropertyFile, (String)providedPropertyFile, (Properties)topologyProperties);
    }

    public StormTopology buildTopology(String normalizationTopic, String ecloudMcsAddress) {
        MCSReaderSpout mcsReaderSpout = TopologyHelper.getMcsReaderSpout((Properties)topologyProperties, (String)normalizationTopic, (String)ecloudMcsAddress);
        TopologyBuilder builder = new TopologyBuilder();
        ReadFileBolt retrieveFileBolt = new ReadFileBolt(ecloudMcsAddress);
        WriteRecordBolt writeRecordBolt = new WriteRecordBolt(ecloudMcsAddress);
        RevisionWriterBolt revisionWriterBolt = new RevisionWriterBolt(ecloudMcsAddress);
        NormalizationBolt normalizationBolt = new NormalizationBolt();
        builder.setSpout("spout", (IRichSpout)mcsReaderSpout, (Number)NormalizationTopology.getAnInt("KAFKA_SPOUT_PARALLEL")).setNumTasks((Number)NormalizationTopology.getAnInt("KAFKA_SPOUT_NUMBER_OF_TASKS"));
        ((BoltDeclarer)builder.setBolt("retrieveFileBolt", (IRichBolt)retrieveFileBolt, (Number)NormalizationTopology.getAnInt("RETRIEVE_FILE_BOLT_PARALLEL")).setNumTasks((Number)NormalizationTopology.getAnInt("RETRIEVE_FILE_BOLT_NUMBER_OF_TASKS"))).customGrouping("spout", (CustomStreamGrouping)new ShuffleGrouping());
        ((BoltDeclarer)builder.setBolt("normalizationBolt", (IRichBolt)normalizationBolt, (Number)NormalizationTopology.getAnInt("NORMALIZATION_BOLT_PARALLEL")).setNumTasks((Number)NormalizationTopology.getAnInt("NORMALIZATION_BOLT_NUMBER_OF_TASKS"))).customGrouping("retrieveFileBolt", (CustomStreamGrouping)new ShuffleGrouping());
        ((BoltDeclarer)builder.setBolt("writeRecordBolt", (IRichBolt)writeRecordBolt, (Number)NormalizationTopology.getAnInt("WRITE_BOLT_PARALLEL")).setNumTasks((Number)NormalizationTopology.getAnInt("WRITE_BOLT_NUMBER_OF_TASKS"))).customGrouping("normalizationBolt", (CustomStreamGrouping)new ShuffleGrouping());
        ((BoltDeclarer)builder.setBolt("revisionWriterBolt", (IRichBolt)revisionWriterBolt, (Number)NormalizationTopology.getAnInt("REVISION_WRITER_BOLT_PARALLEL")).setNumTasks((Number)NormalizationTopology.getAnInt("REVISION_WRITER_BOLT_NUMBER_OF_TASKS"))).customGrouping("writeRecordBolt", (CustomStreamGrouping)new ShuffleGrouping());
        AddResultToDataSetBolt addResultToDataSetBolt = new AddResultToDataSetBolt(ecloudMcsAddress);
        ((BoltDeclarer)builder.setBolt("writeToDataSetBolt", (IRichBolt)addResultToDataSetBolt, (Number)NormalizationTopology.getAnInt("ADD_TO_DATASET_BOLT_PARALLEL")).setNumTasks((Number)NormalizationTopology.getAnInt("ADD_TO_DATASET_BOLT_NUMBER_OF_TASKS"))).shuffleGrouping("revisionWriterBolt");
        ((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)builder.setBolt("notificationBolt", (IRichBolt)new NotificationBolt(topologyProperties.getProperty("CASSANDRA_HOSTS"), NormalizationTopology.getAnInt("CASSANDRA_PORT"), topologyProperties.getProperty("CASSANDRA_KEYSPACE_NAME"), topologyProperties.getProperty("CASSANDRA_USERNAME"), topologyProperties.getProperty("CASSANDRA_PASSWORD")), (Number)NormalizationTopology.getAnInt("NOTIFICATION_BOLT_PARALLEL")).setNumTasks((Number)NormalizationTopology.getAnInt("NOTIFICATION_BOLT_NUMBER_OF_TASKS"))).fieldsGrouping("spout", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("retrieveFileBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("normalizationBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("writeRecordBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("revisionWriterBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("writeToDataSetBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}));
        return builder.createTopology();
    }

    public static void main(String ... args) {
        try {
            if (args.length <= 1) {
                String topologyName;
                String providedPropertyFile = "";
                if (args.length == 1) {
                    providedPropertyFile = args[0];
                }
                NormalizationTopology normalizationTopology = new NormalizationTopology(TOPOLOGY_PROPERTIES_FILE, providedPropertyFile);
                String kafkaTopic = topologyName = topologyProperties.getProperty("TOPOLOGY_NAME");
                String ecloudMcsAddress = topologyProperties.getProperty("MCS_URL");
                StormTopology stormTopology = normalizationTopology.buildTopology(kafkaTopic, ecloudMcsAddress);
                Config config = TopologyHelper.configureTopology((Properties)topologyProperties);
                StormSubmitter.submitTopology((String)topologyName, (Map)config, (StormTopology)stormTopology);
            }
        }
        catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString((Throwable)e));
        }
    }

    private static int getAnInt(String propertyName) {
        return Integer.parseInt(topologyProperties.getProperty(propertyName));
    }
}

