package stream.storm;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.io.ByteArrayInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.xml.parsers.DocumentBuilderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import stream.Data;
import stream.Processor;
import stream.ProcessorList;
import stream.StormRunner;
import stream.Subscription;
import stream.data.DataFactory;
import stream.io.Sink;
import stream.runtime.Variables;
import stream.runtime.setup.ObjectFactory;
import stream.runtime.setup.ProcessorFactory;

/* loaded from: input_file:stream/storm/ProcessBolt.class */
public class ProcessBolt extends AbstractBolt {
    private static final long serialVersionUID = -924312414467186051L;
    static Logger log = LoggerFactory.getLogger(ProcessBolt.class);
    transient ProcessorList process;
    transient List<OutputRef> outputRefs;
    protected final Variables variables;
    protected String[] outputs;
    final BoltContext ctx;
    final Set<Subscription> subscriptions;

    /* loaded from: input_file:stream/storm/ProcessBolt$DataForwarder.class */
    public final class DataForwarder implements Sink {
        String id;
        final OutputCollector output;

        public DataForwarder(String str, OutputCollector outputCollector) {
            this.id = str;
            this.output = outputCollector;
        }

        public String getId() {
            return this.id;
        }

        public boolean write(Data data) throws Exception {
            if (data == null) {
                return false;
            }
            this.output.emit(this.id, new Values(new Object[]{data}));
            return true;
        }

        public void close() throws Exception {
        }

        public boolean write(Collection<Data> collection) throws Exception {
            Iterator<Data> it = collection.iterator();
            while (it.hasNext()) {
                this.output.emit(this.id, new Values(new Object[]{it.next()}));
            }
            return true;
        }

        public void setId(String str) {
            this.id = str;
        }

        public void init() throws Exception {
        }
    }

    /* loaded from: input_file:stream/storm/ProcessBolt$OutputRef.class */
    public final class OutputRef {
        final Processor processor;
        final String property;
        final String[] refs;

        public OutputRef(Processor processor, String str, String[] strArr) {
            this.processor = processor;
            this.property = str;
            this.refs = strArr;
        }
    }

    public ProcessBolt(String str, String str2, Map<String, String> map) throws Exception {
        super(str, str2);
        this.outputRefs = new ArrayList();
        this.ctx = new BoltContext();
        this.subscriptions = new LinkedHashSet();
        this.variables = new Variables(map);
        createProcess();
    }

    public Set<Subscription> getSubscriptions() {
        return this.subscriptions;
    }

    protected ProcessorList createProcess() throws Exception {
        Element findElementByUUID = StormRunner.findElementByUUID(DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(new ByteArrayInputStream(this.xmlConfig.getBytes())).getDocumentElement(), this.uuid);
        if (findElementByUUID == null) {
            log.error("Failed to find process for uuid '{}' in the XML!", this.uuid);
            throw new Exception("Failed to find process for uuid '" + this.uuid + "' in the XML!");
        }
        ProcessorFactory processorFactory = new ProcessorFactory(ObjectFactory.newInstance());
        QueueInjection queueInjection = new QueueInjection(this.uuid, this.output);
        processorFactory.addCreationHandler(queueInjection);
        log.debug("Creating processor-list from element {}", findElementByUUID);
        List createNestedProcessors = processorFactory.createNestedProcessors(findElementByUUID);
        this.process = new ProcessorList();
        Iterator it = createNestedProcessors.iterator();
        while (it.hasNext()) {
            this.process.getProcessors().add((Processor) it.next());
        }
        if (findElementByUUID.hasAttribute("output")) {
            String attribute = findElementByUUID.getAttribute("output");
            if (attribute.indexOf(",") > 0) {
                this.outputs = attribute.split(",");
            } else {
                this.outputs = new String[]{attribute};
            }
        }
        this.subscriptions.addAll(queueInjection.getSubscriptions());
        log.debug("Found {} subscribers for bolt '{}': " + this.subscriptions, Integer.valueOf(this.subscriptions.size()), this.uuid);
        return this.process;
    }

    public List<Processor> getAllProcessors() {
        return getAllProcessors(this.process);
    }

    protected List<Processor> getAllProcessors(ProcessorList processorList) {
        ArrayList arrayList = new ArrayList();
        for (Processor processor : processorList.getProcessors()) {
            if (processor instanceof ProcessorList) {
                arrayList.addAll(getAllProcessors((ProcessorList) processor));
            } else {
                arrayList.add(processor);
            }
        }
        return arrayList;
    }

    protected Processor createProcessor(Element element, ProcessorFactory processorFactory) throws Exception {
        ProcessorList createProcessor = processorFactory.createProcessor(element);
        if (createProcessor instanceof ProcessorList) {
            ProcessorList processorList = createProcessor;
            NodeList childNodes = element.getChildNodes();
            for (int i = 0; i < childNodes.getLength(); i++) {
                Node item = childNodes.item(i);
                if (item.getNodeType() == 1) {
                    processorList.getProcessors().add(createProcessor((Element) item, processorFactory));
                }
            }
        }
        return createProcessor;
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        log.debug("Preparing ProcessBolt {}", this.uuid);
        this.output = outputCollector;
        log.debug("   output collector: {}", this.output);
        try {
            this.process = createProcess();
            this.process.init(this.ctx);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("Failed to create process!");
        }
    }

    public void execute(Tuple tuple) {
        Data create;
        log.debug("Tuple received: {}", tuple);
        try {
            Object valueByField = tuple.getValueByField("stream.Data");
            log.debug("Data is: {}", valueByField);
            create = valueByField != null ? (Data) valueByField : null;
        } catch (Exception e) {
            log.debug("Error processing tuple: {}", e.getMessage());
            create = DataFactory.create();
            Fields fields = tuple.getFields();
            for (int i = 0; i < fields.size(); i++) {
                String str = fields.get(i);
                Object value = tuple.getValue(i);
                if (value instanceof Serializable) {
                    create.put(str, (Serializable) value);
                }
            }
        }
        if (create == null) {
            log.debug("No item to process!");
            return;
        }
        log.debug("Processing item...");
        Data process = this.process.process(create);
        if (this.outputs == null) {
            log.debug("Emitting item {}", process);
            this.output.emit(new Values(new Object[]{process}));
            return;
        }
        for (String str2 : this.outputs) {
            log.debug("Emitting result item to {}: {}", str2, process);
            this.output.emit(str2, new Values(new Object[]{process}));
        }
    }
}
