/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.utils;

import eu.europeana.cloud.service.dps.storm.spouts.kafka.MCSReaderSpout;
import java.util.Arrays;
import java.util.Properties;
import kafka.api.OffsetRequest;
import org.apache.storm.Config;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.Scheme;
import org.apache.storm.spout.SchemeAsMultiScheme;

public final class TopologyHelper {
    public static final String SPOUT = "spout";
    public static final String RETRIEVE_FILE_BOLT = "retrieveFileBolt";
    public static final String IC_BOLT = "icBolt";
    public static final String NOTIFICATION_BOLT = "notificationBolt";
    public static final String WRITE_RECORD_BOLT = "writeRecordBolt";
    public static final String XSLT_BOLT = "XSLT_BOLT";
    public static final String WRITE_TO_DATA_SET_BOLT = "writeToDataSetBolt";
    public static final String REVISION_WRITER_BOLT = "revisionWriterBolt";
    public static final String VALIDATION_BOLT = "validationBolt";
    public static final String INDEXING_BOLT = "indexingBolt";
    public static final String STATISTICS_BOLT = "statisticsBolt";
    public static final String ENRICHMENT_BOLT = "enrichmentBolt";
    public static final String NORMALIZATION_BOLT = "normalizationBolt";
    public static final String RECORD_HARVESTING_BOLT = "recordHarvestingBolt";

    public static Config configureTopology(Properties topologyProperties) {
        Config config = new Config();
        config.setNumWorkers(Integer.parseInt(topologyProperties.getProperty("WORKER_COUNT")));
        config.setMaxTaskParallelism(Integer.parseInt(topologyProperties.getProperty("MAX_TASK_PARALLELISM")));
        config.put((Object)"nimbus.thrift.port", (Object)Integer.parseInt(topologyProperties.getProperty("THRIFT_PORT")));
        config.put((Object)topologyProperties.getProperty("INPUT_ZOOKEEPER_ADDRESS"), (Object)topologyProperties.getProperty("INPUT_ZOOKEEPER_PORT"));
        config.put((Object)"nimbus.seeds", Arrays.asList(topologyProperties.getProperty("NIMBUS_SEEDS")));
        config.put((Object)"storm.zookeeper.servers", Arrays.asList(topologyProperties.getProperty("STORM_ZOOKEEPER_ADDRESS")));
        config.put((Object)"CASSANDRA_HOSTS", (Object)topologyProperties.getProperty("CASSANDRA_HOSTS"));
        config.put((Object)"CASSANDRA_PORT", (Object)topologyProperties.getProperty("CASSANDRA_PORT"));
        config.put((Object)"CASSANDRA_KEYSPACE_NAME", (Object)topologyProperties.getProperty("CASSANDRA_KEYSPACE_NAME"));
        config.put((Object)"CASSANDRA_USERNAME", (Object)topologyProperties.getProperty("CASSANDRA_USERNAME"));
        config.put((Object)"CASSANDRA_PASSWORD", (Object)topologyProperties.getProperty("CASSANDRA_PASSWORD"));
        config.put((Object)"topology.backpressure.enable", (Object)true);
        config.setNumAckers(0);
        return config;
    }

    public static MCSReaderSpout getMcsReaderSpout(Properties topologyProperties, String topic, String ecloudMcsAddress) {
        ZkHosts brokerHosts = new ZkHosts(topologyProperties.getProperty("INPUT_ZOOKEEPER_ADDRESS"));
        SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, topic, "", "storm");
        kafkaConfig.scheme = new SchemeAsMultiScheme((Scheme)new StringScheme());
        kafkaConfig.ignoreZkOffsets = true;
        kafkaConfig.startOffsetTime = OffsetRequest.LatestTime();
        return new MCSReaderSpout(kafkaConfig, topologyProperties.getProperty("CASSANDRA_HOSTS"), Integer.parseInt(topologyProperties.getProperty("CASSANDRA_PORT")), topologyProperties.getProperty("CASSANDRA_KEYSPACE_NAME"), topologyProperties.getProperty("CASSANDRA_USERNAME"), topologyProperties.getProperty("CASSANDRA_PASSWORD"), ecloudMcsAddress);
    }
}

