/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.http;

import com.google.common.base.Throwables;
import eu.europeana.cloud.http.spout.HttpKafkaSpout;
import eu.europeana.cloud.service.dps.storm.NotificationBolt;
import eu.europeana.cloud.service.dps.storm.io.AddResultToDataSetBolt;
import eu.europeana.cloud.service.dps.storm.io.HarvestingWriteRecordBolt;
import eu.europeana.cloud.service.dps.storm.io.RevisionWriterBolt;
import eu.europeana.cloud.service.dps.storm.topologies.properties.PropertyFileLoader;
import eu.europeana.cloud.service.dps.storm.utils.TopologyHelper;
import java.util.Map;
import java.util.Properties;
import kafka.api.OffsetRequest;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.grouping.ShuffleGrouping;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.Scheme;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HTTPHarvestingTopology {
    private static Properties topologyProperties;
    private final BrokerHosts brokerHosts;
    private static final String TOPOLOGY_PROPERTIES_FILE = "http-topology-config.properties";
    private static final Logger LOGGER;

    public HTTPHarvestingTopology(String defaultPropertyFile, String providedPropertyFile) {
        topologyProperties = new Properties();
        PropertyFileLoader.loadPropertyFile(defaultPropertyFile, providedPropertyFile, topologyProperties);
        this.brokerHosts = new ZkHosts(topologyProperties.getProperty("INPUT_ZOOKEEPER_ADDRESS"));
    }

    public final StormTopology buildTopology(String httpTopic, String ecloudMcsAddress, String uisAddress) {
        HarvestingWriteRecordBolt writeRecordBolt = new HarvestingWriteRecordBolt(ecloudMcsAddress, uisAddress);
        RevisionWriterBolt revisionWriterBolt = new RevisionWriterBolt(ecloudMcsAddress);
        SpoutConfig kafkaConfig = new SpoutConfig(this.brokerHosts, httpTopic, "", "storm");
        kafkaConfig.scheme = new SchemeAsMultiScheme((Scheme)new StringScheme());
        kafkaConfig.ignoreZkOffsets = true;
        kafkaConfig.startOffsetTime = OffsetRequest.LatestTime();
        TopologyBuilder builder = new TopologyBuilder();
        HttpKafkaSpout httpKafkaSpout = new HttpKafkaSpout(kafkaConfig, topologyProperties.getProperty("CASSANDRA_HOSTS"), Integer.parseInt(topologyProperties.getProperty("CASSANDRA_PORT")), topologyProperties.getProperty("CASSANDRA_KEYSPACE_NAME"), topologyProperties.getProperty("CASSANDRA_USERNAME"), topologyProperties.getProperty("CASSANDRA_PASSWORD"));
        builder.setSpout("spout", (IRichSpout)httpKafkaSpout, (Number)HTTPHarvestingTopology.getAnInt("KAFKA_SPOUT_PARALLEL")).setNumTasks((Number)HTTPHarvestingTopology.getAnInt("KAFKA_SPOUT_NUMBER_OF_TASKS"));
        ((BoltDeclarer)builder.setBolt("writeRecordBolt", (IRichBolt)writeRecordBolt, (Number)HTTPHarvestingTopology.getAnInt("WRITE_BOLT_PARALLEL")).setNumTasks((Number)HTTPHarvestingTopology.getAnInt("WRITE_BOLT_NUMBER_OF_TASKS"))).customGrouping("spout", (CustomStreamGrouping)new ShuffleGrouping());
        ((BoltDeclarer)builder.setBolt("revisionWriterBolt", (IRichBolt)revisionWriterBolt, (Number)HTTPHarvestingTopology.getAnInt("REVISION_WRITER_BOLT_PARALLEL")).setNumTasks((Number)HTTPHarvestingTopology.getAnInt("REVISION_WRITER_BOLT_NUMBER_OF_TASKS"))).customGrouping("writeRecordBolt", (CustomStreamGrouping)new ShuffleGrouping());
        AddResultToDataSetBolt addResultToDataSetBolt = new AddResultToDataSetBolt(ecloudMcsAddress);
        ((BoltDeclarer)builder.setBolt("writeToDataSetBolt", (IRichBolt)addResultToDataSetBolt, (Number)HTTPHarvestingTopology.getAnInt("ADD_TO_DATASET_BOLT_PARALLEL")).setNumTasks((Number)HTTPHarvestingTopology.getAnInt("ADD_TO_DATASET_BOLT_NUMBER_OF_TASKS"))).customGrouping("revisionWriterBolt", (CustomStreamGrouping)new ShuffleGrouping());
        ((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)builder.setBolt("notificationBolt", (IRichBolt)new NotificationBolt(topologyProperties.getProperty("CASSANDRA_HOSTS"), Integer.parseInt(topologyProperties.getProperty("CASSANDRA_PORT")), topologyProperties.getProperty("CASSANDRA_KEYSPACE_NAME"), topologyProperties.getProperty("CASSANDRA_USERNAME"), topologyProperties.getProperty("CASSANDRA_PASSWORD")), (Number)HTTPHarvestingTopology.getAnInt("NOTIFICATION_BOLT_PARALLEL")).setNumTasks((Number)HTTPHarvestingTopology.getAnInt("NOTIFICATION_BOLT_NUMBER_OF_TASKS"))).fieldsGrouping("spout", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("writeRecordBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("revisionWriterBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}))).fieldsGrouping("writeToDataSetBolt", "NotificationStream", new Fields(new String[]{"TASK_ID"}));
        return builder.createTopology();
    }

    private static int getAnInt(String propertyName) {
        return Integer.parseInt(topologyProperties.getProperty(propertyName));
    }

    public static void main(String[] args) {
        try {
            if (args.length <= 1) {
                String topologyName;
                String providedPropertyFile = "";
                if (args.length == 1) {
                    providedPropertyFile = args[0];
                }
                HTTPHarvestingTopology httpHarvestingTopology = new HTTPHarvestingTopology(TOPOLOGY_PROPERTIES_FILE, providedPropertyFile);
                String kafkaTopic = topologyName = topologyProperties.getProperty("TOPOLOGY_NAME");
                String ecloudMcsAddress = topologyProperties.getProperty("MCS_URL");
                String ecloudUisAddress = topologyProperties.getProperty("UIS_URL");
                StormTopology stormTopology = httpHarvestingTopology.buildTopology(kafkaTopic, ecloudMcsAddress, ecloudUisAddress);
                Config config = TopologyHelper.configureTopology(topologyProperties);
                config.put((Object)"topology.backpressure.enable", (Object)true);
                config.setNumAckers(0);
                StormSubmitter.submitTopology((String)topologyName, (Map)config, (StormTopology)stormTopology);
            }
        }
        catch (Exception e) {
            LOGGER.error(Throwables.getStackTraceAsString(e));
        }
    }

    static {
        LOGGER = LoggerFactory.getLogger(HTTPHarvestingTopology.class);
    }
}

