/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.utils;

import eu.europeana.cloud.common.model.Revision;
import eu.europeana.cloud.common.properties.CassandraProperties;
import eu.europeana.cloud.service.dps.DpsRecord;
import eu.europeana.cloud.service.dps.DpsRecordDeserializer;
import eu.europeana.cloud.service.dps.OAIPMHHarvestingDetails;
import eu.europeana.cloud.service.dps.metis.indexing.DataSetCleanerParameters;
import eu.europeana.cloud.service.dps.storm.spout.ECloudSpout;
import eu.europeana.cloud.service.dps.storm.spout.MediaSpout;
import eu.europeana.cloud.service.dps.storm.utils.FastCancelingSpoutWaitStrategy;
import eu.europeana.cloud.service.dps.storm.utils.SpoutProperties;
import eu.europeana.enrichment.rest.client.report.Report;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.storm.Config;
import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public final class TopologyHelper {
    public static final String SPOUT_NAME_PREFIX = "spout_";
    public static final String RETRIEVE_FILE_BOLT = "retrieveFileBolt";
    public static final String NOTIFICATION_BOLT = "notificationBolt";
    public static final String WRITE_RECORD_BOLT = "writeRecordBolt";
    public static final String XSLT_BOLT = "XSLT_BOLT";
    public static final String REVISION_WRITER_BOLT = "revisionWriterBolt";
    public static final String DUPLICATES_DETECTOR_BOLT = "duplicatesDetectorBolt";
    public static final String VALIDATION_BOLT = "validationBolt";
    public static final String INDEXING_BOLT = "indexingBolt";
    public static final String STATISTICS_BOLT = "statisticsBolt";
    public static final String ENRICHMENT_BOLT = "enrichmentBolt";
    public static final String NORMALIZATION_BOLT = "normalizationBolt";
    public static final String RECORD_HARVESTING_BOLT = "recordHarvestingBolt";
    public static final String RECORD_CATEGORIZATION_BOLT = "recordCategorizationBolt";
    public static final String PARSE_FILE_BOLT = "ParseFileBolt";
    public static final String EDM_ENRICHMENT_BOLT = "EDMEnrichmentBolt";
    public static final String EDM_OBJECT_PROCESSOR_BOLT = "EDMObjectProcessorBolt";
    public static final String RESOURCE_PROCESSING_BOLT = "ResourceProcessingBolt";
    public static final String LINK_CHECK_BOLT = "LinkCheckBolt";

    private TopologyHelper() {
    }

    public static SpoutProperties createSpoutProperties(Properties topologyProperties) {
        return SpoutProperties.builder().workerCount(TopologyHelper.getIntegerProperty(topologyProperties, "WORKER_COUNT")).maxTaskParallelism(TopologyHelper.getIntegerProperty(topologyProperties, "MAX_TASK_PARALLELISM")).nimbusThriftPort(TopologyHelper.getIntegerProperty(topologyProperties, "THRIFT_PORT")).inputZookeeperAddress(topologyProperties.getProperty("INPUT_ZOOKEEPER_ADDRESS")).inputZookeeperPort(topologyProperties.getProperty("INPUT_ZOOKEEPER_PORT")).nimbusSeeds(Collections.singletonList(topologyProperties.getProperty("NIMBUS_SEEDS"))).stormZookeeperAddress(Collections.singletonList(topologyProperties.getProperty("STORM_ZOOKEEPER_ADDRESS"))).messageTimeoutInSeconds(TopologyHelper.getIntegerProperty(topologyProperties, "MESSAGE_TIMEOUT_IN_SECONDS")).maxSpoutPending(TopologyHelper.getIntegerProperty(topologyProperties, "MAX_SPOUT_PENDING")).spoutSleepMilliseconds(TopologyHelper.getIntegerProperty(topologyProperties, "SPOUT_SLEEP_MS")).spoutSleepEveryNIterations(TopologyHelper.getIntegerProperty(topologyProperties, "SPOUT_SLEEP_EVERY_N_IDLE_ITERATIONS")).maxPollRecords(TopologyHelper.getIntegerProperty(topologyProperties, "MAX_POLL_RECORDS")).fetchMaxBytes(TopologyHelper.getIntegerProperty(topologyProperties, "FETCH_MAX_BYTES")).topics(topologyProperties.getProperty("TOPICS")).bootstrapServers(topologyProperties.getProperty("BOOTSTRAP_SERVERS")).build();
    }

    public static CassandraProperties createCassandraProperties(Properties topologyProperties) {
        return CassandraProperties.builder().hosts(topologyProperties.getProperty("CASSANDRA_HOSTS")).keyspace(topologyProperties.getProperty("CASSANDRA_KEYSPACE_NAME")).password(topologyProperties.getProperty("CASSANDRA_PASSWORD")).user(topologyProperties.getProperty("CASSANDRA_USERNAME")).port(TopologyHelper.getValue(TopologyHelper.getIntegerProperty(topologyProperties, "CASSANDRA_PORT"), 9042).intValue()).build();
    }

    private static Integer getIntegerProperty(Properties topologyProperties, String propertyKey) {
        return topologyProperties.getProperty(propertyKey) != null ? Integer.valueOf(topologyProperties.getProperty(propertyKey)) : null;
    }

    public static Config buildConfig(Properties topologyProperties) {
        SpoutProperties spoutProperties = TopologyHelper.createSpoutProperties(topologyProperties);
        Config config = new Config();
        config.setNumWorkers(spoutProperties.getWorkerCount().intValue());
        config.setMaxTaskParallelism(spoutProperties.getMaxTaskParallelism().intValue());
        config.put((Object)"nimbus.thrift.port", (Object)spoutProperties.getNimbusThriftPort());
        config.put((Object)topologyProperties.getProperty("INPUT_ZOOKEEPER_ADDRESS"), (Object)spoutProperties.getInputZookeeperPort());
        config.put((Object)"nimbus.seeds", spoutProperties.getNimbusSeeds());
        config.put((Object)"storm.zookeeper.servers", spoutProperties.getStormZookeeperAddress());
        config.put((Object)"topology.backpressure.enable", (Object)true);
        config.setDebug(false);
        config.setMessageTimeoutSecs(TopologyHelper.getValue(spoutProperties.getMessageTimeoutInSeconds(), 300).intValue());
        config.setMaxSpoutPending(TopologyHelper.getValue(spoutProperties.getMaxSpoutPending(), 500).intValue());
        List kryoClassesToBeSerialized = Stream.of(Report.class.getDeclaredFields()).filter(field -> Arrays.asList("messageType", "mode", "status").contains(field.getName())).map(field -> field.getType().getName()).collect(Collectors.toList());
        kryoClassesToBeSerialized.addAll(Arrays.asList(LinkedHashMap.class.getName(), OAIPMHHarvestingDetails.class.getName(), Revision.class.getName(), Date.class.getName(), DataSetCleanerParameters.class.getName(), Report.class.getName(), CassandraProperties.class.getName()));
        config.put((Object)"topology.kryo.register", kryoClassesToBeSerialized);
        config.put((Object)"topology.spout.wait.strategy", (Object)FastCancelingSpoutWaitStrategy.class.getName());
        config.put((Object)"SPOUT_SLEEP_MS", (Object)TopologyHelper.getValue(spoutProperties.getSpoutSleepMilliseconds(), 1));
        config.put((Object)"SPOUT_SLEEP_EVERY_N_IDLE_ITERATIONS", (Object)TopologyHelper.getValue(spoutProperties.getSpoutSleepEveryNIterations(), 32));
        return config;
    }

    private static <T> T getValue(T value, T defaultValue) {
        return value != null ? value : defaultValue;
    }

    public static ECloudSpout createECloudSpout(String topologyName, SpoutProperties spoutProperties, CassandraProperties cassandraProperties, String topic) {
        return new ECloudSpout(topologyName, topic, TopologyHelper.createKafkaSpoutConfig(topologyName, spoutProperties, topic, KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE), cassandraProperties);
    }

    private static KafkaSpoutConfig<String, DpsRecord> createKafkaSpoutConfig(String topologyName, SpoutProperties spoutProperties, String topic, KafkaSpoutConfig.ProcessingGuarantee processingGuarantee) {
        KafkaSpoutConfig.Builder configBuilder = (KafkaSpoutConfig.Builder)((KafkaSpoutConfig.Builder)((KafkaSpoutConfig.Builder)((KafkaSpoutConfig.Builder)((KafkaSpoutConfig.Builder)((KafkaSpoutConfig.Builder)new KafkaSpoutConfig.Builder(spoutProperties.getBootstrapServers(), new String[]{topic}).setProcessingGuarantee(processingGuarantee).setProp("key.deserializer", StringDeserializer.class)).setProp("value.deserializer", DpsRecordDeserializer.class)).setProp("group.id", (Object)topologyName)).setProp("max.poll.records", (Object)TopologyHelper.getValue(spoutProperties.getMaxPollRecords(), 100))).setProp("fetch.max.bytes", (Object)TopologyHelper.getValue(spoutProperties.getFetchMaxBytes(), 20000))).setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_LATEST);
        return configBuilder.build();
    }

    public static List<String> addSpouts(TopologyBuilder builder, String topology, Properties topologyProperties) {
        String[] topics = TopologyHelper.getTopics(topologyProperties);
        SpoutProperties spoutProperties = TopologyHelper.createSpoutProperties(topologyProperties);
        CassandraProperties cassandraProperties = TopologyHelper.createCassandraProperties(topologyProperties);
        ArrayList<String> result = new ArrayList<String>();
        for (int i = 0; i < topics.length; ++i) {
            String spoutName = SPOUT_NAME_PREFIX + (i + 1);
            ECloudSpout eCloudSpout = TopologyHelper.createECloudSpout(topology, spoutProperties, cassandraProperties, topics[i]);
            builder.setSpout(spoutName, (IRichSpout)eCloudSpout, (Number)1).setNumTasks((Number)1);
            result.add(spoutName);
        }
        return result;
    }

    public static List<String> addMediaSpouts(TopologyBuilder builder, String topology, Properties topologyProperties) {
        String[] topics = TopologyHelper.getTopics(topologyProperties);
        SpoutProperties spoutProperties = TopologyHelper.createSpoutProperties(topologyProperties);
        CassandraProperties cassandraProperties = TopologyHelper.createCassandraProperties(topologyProperties);
        ArrayList<String> result = new ArrayList<String>();
        for (int i = 0; i < topics.length; ++i) {
            String spoutName = SPOUT_NAME_PREFIX + (i + 1);
            ECloudSpout eCloudSpout = TopologyHelper.createMediaSpout(topology, spoutProperties, cassandraProperties, topics[i]);
            builder.setSpout(spoutName, (IRichSpout)eCloudSpout, (Number)1).setNumTasks((Number)1);
            result.add(spoutName);
        }
        return result;
    }

    public static ECloudSpout createMediaSpout(String topologyName, SpoutProperties spoutProperties, CassandraProperties cassandraProperties, String topic) {
        return new MediaSpout(topologyName, topic, TopologyHelper.createKafkaSpoutConfig(topologyName, spoutProperties, topic, KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE), spoutProperties, cassandraProperties);
    }

    public static void addSpoutShuffleGrouping(List<String> spoutNames, BoltDeclarer boltDeclarer) {
        for (String spout : spoutNames) {
            boltDeclarer.shuffleGrouping(spout);
        }
    }

    public static void addSpoutsGroupingToNotificationBolt(List<String> spoutNames, BoltDeclarer boltDeclarer) {
        TopologyHelper.addSpoutFieldGrouping(spoutNames, boltDeclarer, "NotificationStream", "TASK_ID");
    }

    private static void addSpoutFieldGrouping(List<String> spoutNames, BoltDeclarer boltDeclarer, String streamName, String fieldName) {
        for (String spout : spoutNames) {
            boltDeclarer.fieldsGrouping(spout, streamName, new Fields(new String[]{fieldName}));
        }
    }

    public static void addSpoutFieldGrouping(List<String> spoutNames, BoltDeclarer boltDeclarer, String fieldName) {
        for (String spout : spoutNames) {
            boltDeclarer.fieldsGrouping(spout, new Fields(new String[]{fieldName}));
        }
    }

    private static String[] getTopics(Properties topologyProperties) {
        SpoutProperties configParameters = TopologyHelper.createSpoutProperties(topologyProperties);
        return configParameters.getTopics().split(",");
    }
}

