/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.http;

import eu.europeana.cloud.harvesting.DuplicatedRecordsProcessorBolt;
import eu.europeana.cloud.http.bolts.HttpHarvestedRecordCategorizationBolt;
import eu.europeana.cloud.http.bolts.HttpHarvestingBolt;
import eu.europeana.cloud.service.dps.storm.NotificationBolt;
import eu.europeana.cloud.service.dps.storm.io.HarvestingWriteRecordBolt;
import eu.europeana.cloud.service.dps.storm.io.RevisionWriterBoltForHarvesting;
import eu.europeana.cloud.service.dps.storm.topologies.properties.PropertyFileLoader;
import eu.europeana.cloud.service.dps.storm.utils.DbConnectionDetails;
import eu.europeana.cloud.service.dps.storm.utils.TopologyHelper;
import eu.europeana.cloud.service.dps.storm.utils.TopologySubmitter;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.storm.Config;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.grouping.ShuffleGrouping;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HTTPHarvestingTopology {
    private static final Properties topologyProperties = new Properties();
    private static final String TOPOLOGY_PROPERTIES_FILE = "http-topology-config.properties";
    private static final Logger LOGGER = LoggerFactory.getLogger(HTTPHarvestingTopology.class);

    public HTTPHarvestingTopology(String defaultPropertyFile, String providedPropertyFile) {
        PropertyFileLoader.loadPropertyFile((String)defaultPropertyFile, (String)providedPropertyFile, (Properties)topologyProperties);
    }

    public final StormTopology buildTopology() {
        String ecloudMcsAddress = topologyProperties.getProperty("MCS_URL");
        String uisAddress = topologyProperties.getProperty("UIS_URL");
        TopologyBuilder builder = new TopologyBuilder();
        List spoutNames = TopologyHelper.addSpouts((TopologyBuilder)builder, (String)"http_topology", (Properties)topologyProperties);
        HarvestingWriteRecordBolt writeRecordBolt = new HarvestingWriteRecordBolt(ecloudMcsAddress, uisAddress, topologyProperties.getProperty("TOPOLOGY_USER_NAME"), topologyProperties.getProperty("TOPOLOGY_USER_PASSWORD"));
        RevisionWriterBoltForHarvesting revisionWriterBolt = new RevisionWriterBoltForHarvesting(ecloudMcsAddress, topologyProperties.getProperty("TOPOLOGY_USER_NAME"), topologyProperties.getProperty("TOPOLOGY_USER_PASSWORD"));
        TopologyHelper.addSpoutShuffleGrouping((List)spoutNames, (BoltDeclarer)((BoltDeclarer)builder.setBolt("recordHarvestingBolt", (IRichBolt)new HttpHarvestingBolt(), (Number)HTTPHarvestingTopology.getAnInt("RECORD_HARVESTING_BOLT_PARALLEL")).setNumTasks((Number)HTTPHarvestingTopology.getAnInt("RECORD_HARVESTING_BOLT_NUMBER_OF_TASKS"))));
        ((BoltDeclarer)builder.setBolt("recordCategorizationBolt", (IRichBolt)new HttpHarvestedRecordCategorizationBolt(this.prepareConnectionDetails()), (Number)HTTPHarvestingTopology.getAnInt("RECORD_HARVESTING_BOLT_PARALLEL")).setNumTasks((Number)HTTPHarvestingTopology.getAnInt("RECORD_HARVESTING_BOLT_NUMBER_OF_TASKS"))).customGrouping("recordHarvestingBolt", (CustomStreamGrouping)new ShuffleGrouping());
        ((BoltDeclarer)builder.setBolt("writeRecordBolt", (IRichBolt)writeRecordBolt, (Number)HTTPHarvestingTopology.getAnInt("WRITE_BOLT_PARALLEL")).setNumTasks((Number)HTTPHarvestingTopology.getAnInt("WRITE_BOLT_NUMBER_OF_TASKS"))).customGrouping("recordCategorizationBolt", (CustomStreamGrouping)new ShuffleGrouping());
        ((BoltDeclarer)builder.setBolt("revisionWriterBolt", (IRichBolt)revisionWriterBolt, (Number)HTTPHarvestingTopology.getAnInt("REVISION_WRITER_BOLT_PARALLEL")).setNumTasks((Number)HTTPHarvestingTopology.getAnInt("REVISION_WRITER_BOLT_NUMBER_OF_TASKS"))).customGrouping("writeRecordBolt", (CustomStreamGrouping)new ShuffleGrouping());
        ((BoltDeclarer)builder.setBolt("duplicatesDetectorBolt", (IRichBolt)new DuplicatedRecordsProcessorBolt(topologyProperties.getProperty("MCS_URL"), topologyProperties.getProperty("TOPOLOGY_USER_NAME"), topologyProperties.getProperty("TOPOLOGY_USER_PASSWORD")), (Number)HTTPHarvestingTopology.getAnInt("DUPLICATES_BOLT_PARALLEL")).setNumTasks((Number)HTTPHarvestingTopology.getAnInt("DUPLICATES_BOLT_NUMBER_OF_TASKS"))).fieldsGrouping("revisionWriterBolt", new Fields(new String[]{"TASK_ID"}));
        TopologyHelper.addSpoutsGroupingToNotificationBolt((List)spoutNames, (BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)builder.setBolt("notificationBolt", (IRichBolt)new NotificationBolt(topologyProperties.getProperty("CASSANDRA_HOSTS"), Integer.parseInt(topologyProperties.getProperty("CASSANDRA_PORT")), topologyProperties.getProperty("CASSANDRA_KEYSPACE_NAME"), topologyProperties.getProperty("CASSANDRA_USERNAME"), topologyProperties.getProperty("CASSANDRA_PASSWORD")), (Number)HTTPHarvestingTopology.getAnInt("NOTIFICATION_BOLT_PARALLEL")).setNumTasks((Number)HTTPHarvestingTopology.getAnInt("NOTIFICATION_BOLT_NUMBER_OF_TASKS"))).fieldsGrouping("recordHarvestingBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("recordCategorizationBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("writeRecordBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("revisionWriterBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("duplicatesDetectorBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))));
        return builder.createTopology();
    }

    private static int getAnInt(String propertyName) {
        return Integer.parseInt(topologyProperties.getProperty(propertyName));
    }

    private DbConnectionDetails prepareConnectionDetails() {
        return DbConnectionDetails.builder().hosts(topologyProperties.getProperty("CASSANDRA_HOSTS")).port(HTTPHarvestingTopology.getAnInt("CASSANDRA_PORT")).keyspaceName(topologyProperties.getProperty("CASSANDRA_KEYSPACE_NAME")).userName(topologyProperties.getProperty("CASSANDRA_USERNAME")).password(topologyProperties.getProperty("CASSANDRA_PASSWORD")).build();
    }

    public static void main(String[] args) {
        try {
            LOGGER.info("Assembling '{}'", (Object)"http_topology");
            if (args.length <= 1) {
                String providedPropertyFile = args.length == 1 ? args[0] : "";
                HTTPHarvestingTopology httpHarvestingTopology = new HTTPHarvestingTopology(TOPOLOGY_PROPERTIES_FILE, providedPropertyFile);
                StormTopology stormTopology = httpHarvestingTopology.buildTopology();
                Config config = TopologyHelper.buildConfig((Properties)topologyProperties);
                LOGGER.info("Submitting '{}'...", (Object)topologyProperties.getProperty("TOPOLOGY_NAME"));
                TopologySubmitter.submitTopology((String)topologyProperties.getProperty("TOPOLOGY_NAME"), (Map)config, (StormTopology)stormTopology);
            } else {
                LOGGER.error("Invalid number of parameters");
            }
        }
        catch (Exception e) {
            LOGGER.error("General error in HTTP harvesting topology", (Throwable)e);
        }
    }
}

