/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.utils;

import eu.europeana.cloud.common.model.Revision;
import eu.europeana.cloud.service.dps.DpsRecord;
import eu.europeana.cloud.service.dps.DpsRecordDeserializer;
import eu.europeana.cloud.service.dps.OAIPMHHarvestingDetails;
import eu.europeana.cloud.service.dps.metis.indexing.DataSetCleanerParameters;
import eu.europeana.cloud.service.dps.storm.spout.ECloudSpout;
import eu.europeana.cloud.service.dps.storm.spout.MediaSpout;
import eu.europeana.cloud.service.dps.storm.utils.FastCancelingSpoutWaitStrategy;
import eu.europeana.enrichment.rest.client.report.Report;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.storm.Config;
import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public final class TopologyHelper {
    public static final String SPOUT_NAME_PREFIX = "spout_";
    public static final String RETRIEVE_FILE_BOLT = "retrieveFileBolt";
    public static final String NOTIFICATION_BOLT = "notificationBolt";
    public static final String WRITE_RECORD_BOLT = "writeRecordBolt";
    public static final String XSLT_BOLT = "XSLT_BOLT";
    public static final String WRITE_TO_DATA_SET_BOLT = "writeToDataSetBolt";
    public static final String REVISION_WRITER_BOLT = "revisionWriterBolt";
    public static final String DUPLICATES_DETECTOR_BOLT = "duplicatesDetectorBolt";
    public static final String VALIDATION_BOLT = "validationBolt";
    public static final String INDEXING_BOLT = "indexingBolt";
    public static final String STATISTICS_BOLT = "statisticsBolt";
    public static final String ENRICHMENT_BOLT = "enrichmentBolt";
    public static final String NORMALIZATION_BOLT = "normalizationBolt";
    public static final String RECORD_HARVESTING_BOLT = "recordHarvestingBolt";
    public static final String RECORD_CATEGORIZATION_BOLT = "recordCategorizationBolt";
    public static final String PARSE_FILE_BOLT = "ParseFileBolt";
    public static final String EDM_ENRICHMENT_BOLT = "EDMEnrichmentBolt";
    public static final String EDM_OBJECT_PROCESSOR_BOLT = "EDMObjectProcessorBolt";
    public static final String RESOURCE_PROCESSING_BOLT = "ResourceProcessingBolt";
    public static final String LINK_CHECK_BOLT = "LinkCheckBolt";

    private TopologyHelper() {
    }

    public static Config buildConfig(Properties topologyProperties) {
        return TopologyHelper.buildConfig(topologyProperties, false);
    }

    public static Config buildConfig(Properties topologyProperties, boolean staticMode) {
        Config config = new Config();
        if (!staticMode) {
            config.setNumWorkers(Integer.parseInt(topologyProperties.getProperty("WORKER_COUNT")));
            config.setMaxTaskParallelism(Integer.parseInt(topologyProperties.getProperty("MAX_TASK_PARALLELISM")));
            config.put((Object)"nimbus.thrift.port", (Object)Integer.parseInt(topologyProperties.getProperty("THRIFT_PORT")));
            config.put((Object)topologyProperties.getProperty("INPUT_ZOOKEEPER_ADDRESS"), (Object)topologyProperties.getProperty("INPUT_ZOOKEEPER_PORT"));
            config.put((Object)"nimbus.seeds", Collections.singletonList(topologyProperties.getProperty("NIMBUS_SEEDS")));
            config.put((Object)"storm.zookeeper.servers", Collections.singletonList(topologyProperties.getProperty("STORM_ZOOKEEPER_ADDRESS")));
            config.put((Object)"topology.backpressure.enable", (Object)true);
        }
        config.setDebug(staticMode);
        config.setMessageTimeoutSecs(TopologyHelper.getValue(topologyProperties, "MESSAGE_TIMEOUT_IN_SECONDS", 300));
        config.put((Object)"CASSANDRA_HOSTS", (Object)TopologyHelper.getValue(topologyProperties, "CASSANDRA_HOSTS", staticMode ? "localhost" : null));
        config.put((Object)"CASSANDRA_PORT", (Object)TopologyHelper.getValue(topologyProperties, "CASSANDRA_PORT", staticMode ? "9042" : null));
        config.put((Object)"CASSANDRA_KEYSPACE_NAME", (Object)TopologyHelper.getValue(topologyProperties, "CASSANDRA_KEYSPACE_NAME", staticMode ? "ecloud_dps" : null));
        config.put((Object)"CASSANDRA_USERNAME", (Object)TopologyHelper.getValue(topologyProperties, "CASSANDRA_USERNAME", staticMode ? "cassandra" : null));
        config.put((Object)"CASSANDRA_PASSWORD", (Object)TopologyHelper.getValue(topologyProperties, "CASSANDRA_PASSWORD", staticMode ? "cassandra" : null));
        config.setMaxSpoutPending(TopologyHelper.getValue(topologyProperties, "MAX_SPOUT_PENDING", 500));
        List kryoClassesToBeSerialized = Stream.of(Report.class.getDeclaredFields()).filter(field -> Arrays.asList("messageType", "mode", "status").contains(field.getName())).map(field -> field.getType().getName()).collect(Collectors.toList());
        kryoClassesToBeSerialized.addAll(Arrays.asList(LinkedHashMap.class.getName(), OAIPMHHarvestingDetails.class.getName(), Revision.class.getName(), Date.class.getName(), DataSetCleanerParameters.class.getName(), Report.class.getName()));
        config.put((Object)"topology.kryo.register", kryoClassesToBeSerialized);
        config.put((Object)"topology.spout.wait.strategy", (Object)FastCancelingSpoutWaitStrategy.class.getName());
        config.put((Object)"SPOUT_SLEEP_MS", (Object)TopologyHelper.getValue(topologyProperties, "SPOUT_SLEEP_MS", 1));
        config.put((Object)"SPOUT_SLEEP_EVERY_N_IDLE_ITERATIONS", (Object)TopologyHelper.getValue(topologyProperties, "SPOUT_SLEEP_EVERY_N_IDLE_ITERATIONS", 32));
        return config;
    }

    private static String getValue(Properties properties, String key, String defaultValue) {
        if (properties != null && properties.containsKey(key)) {
            return properties.getProperty(key);
        }
        return defaultValue;
    }

    private static int getValue(Properties properties, String key, int defaultValue) {
        if (properties != null && properties.containsKey(key)) {
            return Integer.parseInt(properties.getProperty(key));
        }
        return defaultValue;
    }

    public static ECloudSpout createECloudSpout(String topologyName, Properties topologyProperties, String topic) {
        return new ECloudSpout(topologyName, topic, TopologyHelper.createKafkaSpoutConfig(topologyName, topologyProperties, topic, KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE), topologyProperties.getProperty("CASSANDRA_HOSTS"), Integer.parseInt(topologyProperties.getProperty("CASSANDRA_PORT")), topologyProperties.getProperty("CASSANDRA_KEYSPACE_NAME"), topologyProperties.getProperty("CASSANDRA_USERNAME"), topologyProperties.getProperty("CASSANDRA_PASSWORD"));
    }

    private static KafkaSpoutConfig<String, DpsRecord> createKafkaSpoutConfig(String topologyName, Properties topologyProperties, String topic, KafkaSpoutConfig.ProcessingGuarantee processingGuarantee) {
        KafkaSpoutConfig.Builder configBuilder = (KafkaSpoutConfig.Builder)((KafkaSpoutConfig.Builder)((KafkaSpoutConfig.Builder)((KafkaSpoutConfig.Builder)((KafkaSpoutConfig.Builder)((KafkaSpoutConfig.Builder)new KafkaSpoutConfig.Builder(topologyProperties.getProperty("BOOTSTRAP_SERVERS"), new String[]{topic}).setProcessingGuarantee(processingGuarantee).setProp("key.deserializer", StringDeserializer.class)).setProp("value.deserializer", DpsRecordDeserializer.class)).setProp("group.id", (Object)topologyName)).setProp("max.poll.records", (Object)TopologyHelper.getValue(topologyProperties, "MAX_POLL_RECORDS", 100))).setProp("fetch.max.bytes", (Object)TopologyHelper.getValue(topologyProperties, "FETCH_MAX_BYTES", 20000))).setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_LATEST);
        return configBuilder.build();
    }

    public static List<String> addSpouts(TopologyBuilder builder, String topology, Properties topologyProperties) {
        String[] topics = TopologyHelper.getTopics(topologyProperties);
        ArrayList<String> result = new ArrayList<String>();
        for (int i = 0; i < topics.length; ++i) {
            String spoutName = SPOUT_NAME_PREFIX + (i + 1);
            ECloudSpout eCloudSpout = TopologyHelper.createECloudSpout(topology, topologyProperties, topics[i]);
            builder.setSpout(spoutName, (IRichSpout)eCloudSpout, (Number)1).setNumTasks((Number)1);
            result.add(spoutName);
        }
        return result;
    }

    public static List<String> addMediaSpouts(TopologyBuilder builder, String topology, Properties topologyProperties) {
        String[] topics = TopologyHelper.getTopics(topologyProperties);
        ArrayList<String> result = new ArrayList<String>();
        for (int i = 0; i < topics.length; ++i) {
            String spoutName = SPOUT_NAME_PREFIX + (i + 1);
            ECloudSpout eCloudSpout = TopologyHelper.createMediaSpout(topology, topologyProperties, topics[i]);
            builder.setSpout(spoutName, (IRichSpout)eCloudSpout, (Number)1).setNumTasks((Number)1);
            result.add(spoutName);
        }
        return result;
    }

    public static ECloudSpout createMediaSpout(String topologyName, Properties topologyProperties, String topic) {
        return new MediaSpout(topologyName, topic, TopologyHelper.createKafkaSpoutConfig(topologyName, topologyProperties, topic, KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE), topologyProperties.getProperty("CASSANDRA_HOSTS"), Integer.parseInt(topologyProperties.getProperty("CASSANDRA_PORT")), topologyProperties.getProperty("CASSANDRA_KEYSPACE_NAME"), topologyProperties.getProperty("CASSANDRA_USERNAME"), topologyProperties.getProperty("CASSANDRA_PASSWORD"), topologyProperties.getProperty("DEFAULT_MAXIMUM_PARALLELIZATION"));
    }

    public static void addSpoutShuffleGrouping(List<String> spoutNames, BoltDeclarer boltDeclarer) {
        for (String spout : spoutNames) {
            boltDeclarer.shuffleGrouping(spout);
        }
    }

    public static void addSpoutsGroupingToNotificationBolt(List<String> spoutNames, BoltDeclarer boltDeclarer) {
        TopologyHelper.addSpoutFieldGrouping(spoutNames, boltDeclarer, "NotificationStream", "TASK_ID");
    }

    private static void addSpoutFieldGrouping(List<String> spoutNames, BoltDeclarer boltDeclarer, String streamName, String fieldName) {
        for (String spout : spoutNames) {
            boltDeclarer.fieldsGrouping(spout, streamName, new Fields(new String[]{fieldName}));
        }
    }

    public static void addSpoutFieldGrouping(List<String> spoutNames, BoltDeclarer boltDeclarer, String fieldName) {
        for (String spout : spoutNames) {
            boltDeclarer.fieldsGrouping(spout, new Fields(new String[]{fieldName}));
        }
    }

    private static String[] getTopics(Properties topologyProperties) {
        return topologyProperties.getProperty("TOPICS").split(",");
    }
}

