/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.utils;

import eu.europeana.cloud.service.dps.service.kafka.util.DpsRecordDeserializer;
import eu.europeana.cloud.service.dps.storm.spout.ECloudSpout;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.storm.Config;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;

public final class TopologyHelper {
    public static final String SPOUT = "spout";
    public static final String RETRIEVE_FILE_BOLT = "retrieveFileBolt";
    public static final String NOTIFICATION_BOLT = "notificationBolt";
    public static final String WRITE_RECORD_BOLT = "writeRecordBolt";
    public static final String XSLT_BOLT = "XSLT_BOLT";
    public static final String WRITE_TO_DATA_SET_BOLT = "writeToDataSetBolt";
    public static final String REVISION_WRITER_BOLT = "revisionWriterBolt";
    public static final String DUPLICATES_DETECTOR_BOLT = "duplicatesDetectorBolt";
    public static final String VALIDATION_BOLT = "validationBolt";
    public static final String INDEXING_BOLT = "indexingBolt";
    public static final String STATISTICS_BOLT = "statisticsBolt";
    public static final String ENRICHMENT_BOLT = "enrichmentBolt";
    public static final String NORMALIZATION_BOLT = "normalizationBolt";
    public static final String RECORD_HARVESTING_BOLT = "recordHarvestingBolt";
    public static final String RECORD_CATEGORIZATION_BOLT = "recordCategorizationBolt";
    public static final String PARSE_FILE_BOLT = "ParseFileBolt";
    public static final String EDM_ENRICHMENT_BOLT = "EDMEnrichmentBolt";
    public static final String EDM_OBJECT_PROCESSOR_BOLT = "EDMObjectProcessorBolt";
    public static final String RESOURCE_PROCESSING_BOLT = "ResourceProcessingBolt";
    public static final String LINK_CHECK_BOLT = "LinkCheckBolt";

    private TopologyHelper() {
    }

    public static Config buildConfig(Properties topologyProperties) {
        return TopologyHelper.buildConfig(topologyProperties, false);
    }

    public static Config buildConfig(Properties topologyProperties, boolean staticMode) {
        Config config = new Config();
        if (!staticMode) {
            config.setNumWorkers(Integer.parseInt(topologyProperties.getProperty("WORKER_COUNT")));
            config.setMaxTaskParallelism(Integer.parseInt(topologyProperties.getProperty("MAX_TASK_PARALLELISM")));
            config.put((Object)"nimbus.thrift.port", (Object)Integer.parseInt(topologyProperties.getProperty("THRIFT_PORT")));
            config.put((Object)topologyProperties.getProperty("INPUT_ZOOKEEPER_ADDRESS"), (Object)topologyProperties.getProperty("INPUT_ZOOKEEPER_PORT"));
            config.put((Object)"nimbus.seeds", Arrays.asList(topologyProperties.getProperty("NIMBUS_SEEDS")));
            config.put((Object)"storm.zookeeper.servers", Arrays.asList(topologyProperties.getProperty("STORM_ZOOKEEPER_ADDRESS")));
            config.put((Object)"topology.backpressure.enable", (Object)true);
        }
        config.setDebug(staticMode);
        config.setMessageTimeoutSecs(TopologyHelper.getValue(topologyProperties, "MESSAGE_TIMEOUT_IN_SECONDS", 300));
        config.put((Object)"CASSANDRA_HOSTS", (Object)TopologyHelper.getValue(topologyProperties, "CASSANDRA_HOSTS", staticMode ? "localhost" : null));
        config.put((Object)"CASSANDRA_PORT", (Object)TopologyHelper.getValue(topologyProperties, "CASSANDRA_PORT", staticMode ? "9042" : null));
        config.put((Object)"CASSANDRA_KEYSPACE_NAME", (Object)TopologyHelper.getValue(topologyProperties, "CASSANDRA_KEYSPACE_NAME", staticMode ? "ecloud_dps" : null));
        config.put((Object)"CASSANDRA_USERNAME", (Object)TopologyHelper.getValue(topologyProperties, "CASSANDRA_USERNAME", staticMode ? "cassandra" : null));
        config.put((Object)"CASSANDRA_PASSWORD", (Object)TopologyHelper.getValue(topologyProperties, "CASSANDRA_PASSWORD", staticMode ? "cassandra" : null));
        config.setMaxSpoutPending(TopologyHelper.getValue(topologyProperties, "MAX_SPOUT_PENDING", 500));
        return config;
    }

    private static String getValue(Properties properties, String key, String defaultValue) {
        if (properties != null && properties.containsKey(key)) {
            return properties.getProperty(key);
        }
        return defaultValue;
    }

    private static int getValue(Properties properties, String key, int defaultValue) {
        if (properties != null && properties.containsKey(key)) {
            return Integer.parseInt(properties.getProperty(key));
        }
        return defaultValue;
    }

    public static ECloudSpout createECloudSpout(String topologyName, Properties topologyProperties) {
        return TopologyHelper.createECloudSpout(topologyName, topologyProperties, KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE);
    }

    public static ECloudSpout createECloudSpout(String topologyName, Properties topologyProperties, KafkaSpoutConfig.ProcessingGuarantee processingGuarantee) {
        KafkaSpoutConfig.Builder configBuilder = new KafkaSpoutConfig.Builder(topologyProperties.getProperty("BOOTSTRAP_SERVERS"), topologyProperties.getProperty("TOPICS").split(",")).setProcessingGuarantee(processingGuarantee).setProp("key.deserializer", StringDeserializer.class).setProp("value.deserializer", DpsRecordDeserializer.class).setProp("group.id", topologyName).setProp("max.poll.records", TopologyHelper.getValue(topologyProperties, "MAX_POLL_RECORDS", 100)).setProp("fetch.max.bytes", TopologyHelper.getValue(topologyProperties, "FETCH_MAX_BYTES", 20000)).setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST);
        return new ECloudSpout(topologyName, configBuilder.build(), topologyProperties.getProperty("CASSANDRA_HOSTS"), Integer.parseInt(topologyProperties.getProperty("CASSANDRA_PORT")), topologyProperties.getProperty("CASSANDRA_KEYSPACE_NAME"), topologyProperties.getProperty("CASSANDRA_USERNAME"), topologyProperties.getProperty("CASSANDRA_PASSWORD"));
    }
}

