/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.topologies.link.check;

import com.google.common.base.Throwables;
import eu.europeana.cloud.service.dps.storm.NotificationBolt;
import eu.europeana.cloud.service.dps.storm.io.ParseFileBolt;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.MCSReaderSpout;
import eu.europeana.cloud.service.dps.storm.topologies.link.check.LinkCheckBolt;
import eu.europeana.cloud.service.dps.storm.topologies.properties.PropertyFileLoader;
import eu.europeana.cloud.service.dps.storm.utils.TopologyHelper;
import java.util.Map;
import java.util.Properties;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.grouping.ShuffleGrouping;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LinkCheckTopology {
    private static Properties topologyProperties;
    private static final String TOPOLOGY_PROPERTIES_FILE = "link-check-topology-config.properties";
    private static final Logger LOGGER;

    public LinkCheckTopology(String defaultPropertyFile, String providedPropertyFile) {
        topologyProperties = new Properties();
        PropertyFileLoader.loadPropertyFile(defaultPropertyFile, providedPropertyFile, topologyProperties);
    }

    public final StormTopology buildTopology(String mediaTopic, String ecloudMcsAddress) {
        TopologyBuilder builder = new TopologyBuilder();
        MCSReaderSpout mcsReaderSpout = TopologyHelper.getMcsReaderSpout(topologyProperties, mediaTopic, ecloudMcsAddress);
        builder.setSpout("spout", (IRichSpout)mcsReaderSpout, (Number)LinkCheckTopology.getAnInt("KAFKA_SPOUT_PARALLEL")).setNumTasks((Number)LinkCheckTopology.getAnInt("KAFKA_SPOUT_NUMBER_OF_TASKS"));
        ((BoltDeclarer)builder.setBolt("ParseFileBolt", (IRichBolt)new ParseFileBolt(ecloudMcsAddress), (Number)LinkCheckTopology.getAnInt("PARSE_FILE_BOLT_PARALLEL")).setNumTasks((Number)LinkCheckTopology.getAnInt("PARSE_FILE_BOLT_BOLT_NUMBER_OF_TASKS"))).customGrouping("spout", (CustomStreamGrouping)new ShuffleGrouping());
        ((BoltDeclarer)builder.setBolt("LinkCheckBolt", (IRichBolt)new LinkCheckBolt(), (Number)LinkCheckTopology.getAnInt("LINK_CHECK_BOLT_PARALLEL")).setNumTasks((Number)LinkCheckTopology.getAnInt("LINK_CHECK_BOLT_NUMBER_OF_TASKS"))).fieldsGrouping("ParseFileBolt", new Fields(new String[]{"INPUT_FILES"}));
        ((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)builder.setBolt("notificationBolt", (IRichBolt)new NotificationBolt(topologyProperties.getProperty("CASSANDRA_HOSTS"), Integer.parseInt(topologyProperties.getProperty("CASSANDRA_PORT")), topologyProperties.getProperty("CASSANDRA_KEYSPACE_NAME"), topologyProperties.getProperty("CASSANDRA_USERNAME"), topologyProperties.getProperty("CASSANDRA_PASSWORD")), (Number)LinkCheckTopology.getAnInt("NOTIFICATION_BOLT_PARALLEL")).setNumTasks((Number)LinkCheckTopology.getAnInt("NOTIFICATION_BOLT_NUMBER_OF_TASKS"))).fieldsGrouping("spout", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("ParseFileBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("LinkCheckBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}));
        return builder.createTopology();
    }

    private static int getAnInt(String propertyName) {
        return Integer.parseInt(topologyProperties.getProperty(propertyName));
    }

    public static void main(String[] args) {
        try {
            if (args.length <= 1) {
                String topologyName;
                String providedPropertyFile = "";
                if (args.length == 1) {
                    providedPropertyFile = args[0];
                }
                LinkCheckTopology linkCheckTopology = new LinkCheckTopology(TOPOLOGY_PROPERTIES_FILE, providedPropertyFile);
                String kafkaTopic = topologyName = topologyProperties.getProperty("TOPOLOGY_NAME");
                String ecloudMcsAddress = topologyProperties.getProperty("MCS_URL");
                StormTopology stormTopology = linkCheckTopology.buildTopology(kafkaTopic, ecloudMcsAddress);
                Config config = TopologyHelper.configureTopology(topologyProperties);
                StormSubmitter.submitTopology((String)topologyName, (Map)config, (StormTopology)stormTopology);
            }
        }
        catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
        }
    }

    static {
        LOGGER = LoggerFactory.getLogger(LinkCheckTopology.class);
    }
}

