/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.topology;

import edu.iu.dsc.tws.api.comms.messaging.types.MessageType;
import edu.iu.dsc.tws.api.comms.messaging.types.MessageTypes;
import edu.iu.dsc.tws.api.compute.graph.OperationMode;
import edu.iu.dsc.tws.api.compute.nodes.ICompute;
import edu.iu.dsc.tws.api.compute.nodes.ISource;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.task.impl.ComputeConnection;
import edu.iu.dsc.tws.task.impl.ComputeGraphBuilder;
import edu.iu.dsc.tws.task.impl.ops.BroadcastConfig;
import edu.iu.dsc.tws.task.impl.ops.DirectConfig;
import edu.iu.dsc.tws.task.impl.ops.KeyedPartitionConfig;
import edu.iu.dsc.tws.task.impl.ops.PartitionConfig;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.logging.Logger;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.IWindowedBolt;
import org.apache.storm.topology.SpoutDeclarer;
import org.apache.storm.topology.twister2.Twister2Bolt;
import org.apache.storm.topology.twister2.Twister2BoltGrouping;
import org.apache.storm.topology.twister2.Twister2Spout;
import org.apache.storm.topology.twister2.Twister2StormNode;

public class TopologyBuilder
implements Serializable {
    private static final Logger LOG = Logger.getLogger(TopologyBuilder.class.getName());
    private transient ComputeGraphBuilder computeGraphBuilder;
    private HashMap<String, Twister2StormNode> nodes = new HashMap();
    private Set<String> sinkNodes = new HashSet<String>();
    private Set<String> sourceNodes = new HashSet<String>();
    private Set<String> computeNodes = new HashSet<String>();

    public TopologyBuilder() {
        this.computeGraphBuilder = ComputeGraphBuilder.newBuilder((Config)Config.newBuilder().build());
    }

    private String generateEdgeName(Twister2BoltGrouping t2BoltGrouping, String destination) {
        if (t2BoltGrouping.getStreamId() != null) {
            return t2BoltGrouping.getStreamId();
        }
        return String.format("%s_%s", t2BoltGrouping.getComponentId(), t2BoltGrouping.getStreamId());
    }

    private void defineGrouping(Twister2Bolt twister2Bolt, ComputeConnection computeConnection) {
        String nodeId = twister2Bolt.getId();
        twister2Bolt.getBoltDeclarer().getGroupings().forEach(grouping -> {
            twister2Bolt.addInboundFieldsForEdge(grouping.getStreamId(), this.nodes.get(grouping.getComponentId()).getOutFieldsForEdge(grouping.getStreamId()));
            switch (grouping.getGroupingTechnique()) {
                case DIRECT: {
                    LOG.info("Adding direct grouping : " + grouping);
                    ((DirectConfig)computeConnection.direct(grouping.getComponentId()).viaEdge(this.generateEdgeName((Twister2BoltGrouping)grouping, nodeId))).withDataType((MessageType)MessageTypes.OBJECT);
                    break;
                }
                case SHUFFLE: {
                    LOG.info("Adding shuffle grouping : " + grouping + "{" + grouping.getComponentId() + "," + this.generateEdgeName((Twister2BoltGrouping)grouping, nodeId));
                    ((PartitionConfig)computeConnection.partition(grouping.getComponentId()).viaEdge(this.generateEdgeName((Twister2BoltGrouping)grouping, nodeId))).withDataType((MessageType)MessageTypes.OBJECT);
                    break;
                }
                case FIELD: {
                    ((KeyedPartitionConfig)((KeyedPartitionConfig)computeConnection.keyedPartition(grouping.getComponentId()).viaEdge(this.generateEdgeName((Twister2BoltGrouping)grouping, nodeId))).withDataType((MessageType)MessageTypes.OBJECT)).withKeyType((MessageType)MessageTypes.OBJECT);
                    this.nodes.get(grouping.getComponentId()).setKeyedOutEdges(grouping.getStreamId(), grouping.getGroupingKey());
                    break;
                }
                case ALL: {
                    ((BroadcastConfig)computeConnection.broadcast(grouping.getComponentId()).viaEdge(this.generateEdgeName((Twister2BoltGrouping)grouping, nodeId))).withDataType((MessageType)MessageTypes.OBJECT);
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("Unsupported grouping technique : " + (Object)((Object)grouping.getGroupingTechnique()));
                }
            }
        });
    }

    public StormTopology createTopology() {
        this.sourceNodes.forEach(source -> {
            Twister2Spout twister2Spout = (Twister2Spout)this.nodes.get(source);
            LOG.info("Adding source : " + source);
            this.computeGraphBuilder.addSource(source, (ISource)twister2Spout, twister2Spout.getParallelism().intValue());
        });
        this.computeNodes.forEach(compute -> {
            Twister2Bolt twister2Bolt = (Twister2Bolt)this.nodes.get(compute);
            ComputeConnection computeConnection = this.computeGraphBuilder.addCompute(compute, (ICompute)twister2Bolt, twister2Bolt.getParallelism().intValue());
            this.defineGrouping(twister2Bolt, computeConnection);
        });
        this.sinkNodes.forEach(sink -> {
            Twister2Bolt twister2Bolt = (Twister2Bolt)this.nodes.get(sink);
            ComputeConnection computeConnection = this.computeGraphBuilder.addCompute(sink, (ICompute)twister2Bolt, twister2Bolt.getParallelism().intValue());
            this.defineGrouping(twister2Bolt, computeConnection);
        });
        this.computeGraphBuilder.setMode(OperationMode.STREAMING);
        return new StormTopology(this.computeGraphBuilder.build());
    }

    public BoltDeclarer setBolt(String id, IRichBolt bolt) {
        return this.setBolt(id, bolt, (Number)1);
    }

    public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelismHint) {
        return this.createT2Bolt(id, bolt, parallelismHint);
    }

    public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
        return this.setBolt(id, bolt, (Number)1);
    }

    public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelismHint) {
        return this.createT2Bolt(id, bolt, parallelismHint);
    }

    private BoltDeclarer createT2Bolt(String id, Object bolt, Number parallelismHint) {
        Twister2Bolt twister2Bolt = new Twister2Bolt(id, bolt, source -> {
            boolean sourceWasInSinkNodes = this.sinkNodes.remove(source);
            if (sourceWasInSinkNodes) {
                this.computeNodes.add(source);
            }
        });
        twister2Bolt.setParallelism(parallelismHint.intValue());
        this.nodes.put(id, twister2Bolt);
        this.sinkNodes.add(id);
        return twister2Bolt.getBoltDeclarer();
    }

    public BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws IllegalArgumentException {
        return this.setBolt(id, bolt, (Number)1);
    }

    public BoltDeclarer setBolt(String id, IWindowedBolt bolt, Number parallelismHint) throws IllegalArgumentException {
        return this.createT2Bolt(id, bolt, parallelismHint);
    }

    public SpoutDeclarer setSpout(String id, IRichSpout spout) {
        return this.setSpout(id, spout, 1);
    }

    public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelismHint) {
        Twister2Spout twister2Spout = new Twister2Spout(id, spout);
        twister2Spout.setParallelism(parallelismHint.intValue());
        this.nodes.put(id, twister2Spout);
        this.sourceNodes.add(id);
        return twister2Spout.getSpoutDeclarer();
    }
}

