/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.topology.twister2;

import edu.iu.dsc.tws.api.comms.structs.Tuple;
import edu.iu.dsc.tws.api.compute.IMessage;
import edu.iu.dsc.tws.api.compute.TaskContext;
import edu.iu.dsc.tws.api.compute.nodes.ICompute;
import edu.iu.dsc.tws.api.config.Config;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import org.apache.storm.task.IOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.WindowedBoltExecutor;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.topology.twister2.EdgeFieldMap;
import org.apache.storm.topology.twister2.MadeASourceListener;
import org.apache.storm.topology.twister2.Twister2BoltDeclarer;
import org.apache.storm.topology.twister2.Twister2StormNode;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Twister2Tuple;
import org.apache.storm.tuple.Twister2TupleWrapper;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

public class Twister2Bolt
implements ICompute,
Twister2StormNode {
    private static final Logger LOG = Logger.getLogger(Twister2Bolt.class.getName());
    private IRichBolt stormBolt;
    private IBasicBolt stormBasicBolt;
    private BaseWindowedBolt stormWindowedBolt;
    private WindowedBoltExecutor stormWindowedBoltExecutor;
    private Twister2BoltDeclarer boltDeclarer;
    private Integer parallelism = 1;
    private String id;
    private OutputCollector outputCollector;
    private BasicOutputCollector basicOutputCollector;
    private HashMap<String, Fields> inboundEdgeToFieldsMap = new HashMap();
    private final EdgeFieldMap outFieldsForEdge;
    private final EdgeFieldMap keyedOutEdges;

    public Twister2Bolt(String id, Object bolt, MadeASourceListener madeASourceListener) {
        this.id = id;
        this.boltDeclarer = new Twister2BoltDeclarer(madeASourceListener);
        this.outFieldsForEdge = new EdgeFieldMap(Utils.getDefaultStream(id));
        this.keyedOutEdges = new EdgeFieldMap(Utils.getDefaultStream(id));
        if (bolt instanceof IRichBolt) {
            this.stormBolt = (IRichBolt)bolt;
            this.stormBolt.declareOutputFields(this.outFieldsForEdge);
        } else if (bolt instanceof BaseWindowedBolt) {
            this.stormWindowedBolt = (BaseWindowedBolt)bolt;
            this.stormWindowedBolt.declareOutputFields(this.outFieldsForEdge);
            this.stormWindowedBoltExecutor = new WindowedBoltExecutor(this.stormWindowedBolt);
        } else {
            this.stormBasicBolt = (IBasicBolt)bolt;
            this.stormBasicBolt.declareOutputFields(this.outFieldsForEdge);
        }
    }

    public Integer getParallelism() {
        return this.parallelism;
    }

    public void setParallelism(Integer parallelism) {
        this.parallelism = parallelism;
    }

    public Twister2BoltDeclarer getBoltDeclarer() {
        return this.boltDeclarer;
    }

    @Override
    public String getId() {
        return this.id;
    }

    @Override
    public Fields getOutFieldsForEdge(String edge) {
        return (Fields)this.outFieldsForEdge.get(edge);
    }

    @Override
    public void setKeyedOutEdges(String edge, Fields keys) {
        LOG.info(String.format("[Storm-Bolt : %s] Setting out edge %s with key fields %s", this.id, edge, keys));
        this.keyedOutEdges.put(edge, keys);
    }

    public void addInboundFieldsForEdge(String edge, Fields fields) {
        LOG.info(String.format("[Storm-Bolt : %s] Adding inbound fields for edge %s with fields %s", this.id, edge, fields));
        this.inboundEdgeToFieldsMap.put(edge, fields);
    }

    private void createAndFireTuple(Object values, IMessage iMessage) {
        Object extractedValues = values;
        if (values instanceof Twister2TupleWrapper) {
            extractedValues = ((Twister2TupleWrapper)values).getStormValue();
        }
        if (extractedValues instanceof Values) {
            Twister2Tuple twister2Tuple = new Twister2Tuple(this.inboundEdgeToFieldsMap.get(iMessage.edge()), (Values)extractedValues, iMessage);
            if (this.stormBolt != null) {
                this.stormBolt.execute(twister2Tuple);
            } else if (this.stormWindowedBolt != null) {
                this.stormWindowedBoltExecutor.execute(twister2Tuple);
            } else {
                this.stormBasicBolt.execute(twister2Tuple, this.basicOutputCollector);
            }
        } else {
            throw new RuntimeException("Unexpected message format. Expected " + Values.class + " found " + values.getClass());
        }
    }

    public boolean execute(IMessage message) {
        LOG.finest("Message received from edge " + message.edge() + " to " + this.id);
        Object messageContent = message.getContent();
        if (messageContent instanceof Iterator) {
            Iterator valuesIterator = (Iterator)messageContent;
            while (valuesIterator.hasNext()) {
                this.createAndFireTuple(valuesIterator.next(), message);
            }
        } else if (messageContent instanceof List) {
            List valuesList = (List)messageContent;
            for (Object values : valuesList) {
                if (values instanceof Tuple) {
                    this.createAndFireTuple(((Tuple)values).getValue(), message);
                    continue;
                }
                this.createAndFireTuple(values, message);
            }
        } else if (messageContent instanceof Tuple) {
            this.createAndFireTuple(((Tuple)messageContent).getValue(), message);
        } else if (messageContent instanceof Twister2TupleWrapper) {
            this.createAndFireTuple(messageContent, message);
        } else {
            System.out.println(messageContent.getClass());
            throw new RuntimeException("Unexpected message content format.");
        }
        return false;
    }

    public void prepare(Config cfg, final TaskContext context) {
        LOG.info("Preparing storm-bolt : " + this.id);
        this.outputCollector = new OutputCollector(new IOutputCollector(){

            @Override
            public List<Integer> emit(String streamId, Collection<org.apache.storm.tuple.Tuple> anchors, List<Object> tuple) {
                Twister2TupleWrapper tupleWrapper = new Twister2TupleWrapper(tuple);
                if (!Twister2Bolt.this.keyedOutEdges.containsKey(streamId)) {
                    context.write(streamId, (Object)tupleWrapper);
                } else {
                    Fields allFields = (Fields)Twister2Bolt.this.outFieldsForEdge.get(streamId);
                    Fields fieldsForKey = (Fields)Twister2Bolt.this.keyedOutEdges.get(streamId);
                    List<Object> key = allFields.select(fieldsForKey, tuple);
                    context.write(streamId, key, (Object)tupleWrapper);
                }
                return Collections.singletonList(0);
            }

            @Override
            public void emitDirect(int taskId, String streamId, Collection<org.apache.storm.tuple.Tuple> anchors, List<Object> tuple) {
                throw new UnsupportedOperationException("Emit direct is not supported yet");
            }

            @Override
            public void ack(org.apache.storm.tuple.Tuple input) {
            }

            @Override
            public void fail(org.apache.storm.tuple.Tuple input) {
            }

            @Override
            public void reportError(Throwable error) {
                LOG.warning("Error occurred when emitting : " + error.getMessage());
            }
        });
        TopologyContext topologyContext = new TopologyContext(context);
        topologyContext.setTempBoltDeclarer(this.getBoltDeclarer());
        if (this.stormBolt != null) {
            this.stormBolt.prepare(cfg.toMap(), topologyContext, this.outputCollector);
        } else if (this.stormWindowedBolt != null) {
            Map<String, Object> windowConfiguration = this.stormWindowedBolt.getComponentConfiguration();
            Map tw2Configs = cfg.toMap();
            windowConfiguration.putAll(tw2Configs);
            this.stormWindowedBoltExecutor.prepare(windowConfiguration, topologyContext, this.outputCollector);
        } else {
            this.basicOutputCollector = new BasicOutputCollector(this.outputCollector, Utils.getDefaultStream(this.id));
            this.stormBasicBolt.prepare(cfg.toMap(), topologyContext);
        }
    }
}

