package org.streampipes.wrapper.siddhi.engine;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.streampipes.wrapper.params.binding.EventProcessorBindingParams;
import org.streampipes.wrapper.routing.SpOutputCollector;
import org.streampipes.wrapper.runtime.EventProcessor;
import org.streampipes.wrapper.siddhi.manager.SpSiddhiManager;
import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.stream.output.StreamCallback;

/* loaded from: input_file:org/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.class */
public abstract class SiddhiEventEngine<T extends EventProcessorBindingParams> extends EventProcessor<T> {
    private StringBuilder siddhiAppString;
    private SiddhiAppRuntime siddhiAppRuntime;
    private Map<String, InputHandler> siddhiInputHandlers;
    private List<String> inputStreamNames;
    private static final Logger LOG = LoggerFactory.getLogger(SiddhiEventEngine.class);

    public SiddhiEventEngine(T t) {
        super(t);
        this.siddhiAppString = new StringBuilder();
        this.siddhiInputHandlers = new HashMap();
        this.inputStreamNames = new ArrayList();
    }

    public void bind(final T t, final SpOutputCollector spOutputCollector) {
        if (t.getInEventTypes().size() != t.getGraph().getInputStreams().size()) {
            throw new IllegalArgumentException("Input parameters do not match!");
        }
        SiddhiManager siddhiManager = SpSiddhiManager.INSTANCE.getSiddhiManager();
        LOG.info("Configuring event types for graph " + t.getGraph().getName());
        t.getInEventTypes().forEach((str, map) -> {
            registerEventTypeIfNotExists(str, map);
            this.inputStreamNames.add(fixEventName(str));
        });
        registerStatements(fromStatement(this.inputStreamNames, t), selectStatement(t), getOutputTopicName(t));
        this.siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(this.siddhiAppString.toString());
        t.getInEventTypes().forEach((str2, map2) -> {
            this.siddhiInputHandlers.put(str2, this.siddhiAppRuntime.getInputHandler(fixEventName(str2)));
        });
        this.siddhiAppRuntime.addCallback(fixEventName(getOutputTopicName(t)), new StreamCallback() { // from class: org.streampipes.wrapper.siddhi.engine.SiddhiEventEngine.1
            public void receive(Event[] eventArr) {
                for (Event event : eventArr) {
                    spOutputCollector.onEvent(SiddhiEventEngine.this.toMap(event, t));
                }
            }
        });
    }

    private String getOutputTopicName(T t) {
        return t.getGraph().getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<String, Object> toMap(Event event, T t) {
        HashMap hashMap = new HashMap();
        int i = 0;
        Iterator it = t.getOutEventType().keySet().iterator();
        while (it.hasNext()) {
            hashMap.put((String) it.next(), event.getData(i));
            i++;
        }
        return hashMap;
    }

    private void registerEventTypeIfNotExists(String str, Map<String, Object> map) {
        String str2 = "define stream " + fixEventName(str);
        StringJoiner stringJoiner = new StringJoiner(",");
        for (String str3 : map.keySet()) {
            stringJoiner.add(str3 + " " + toType((Class) map.get(str3)));
        }
        this.siddhiAppString.append(str2);
        this.siddhiAppString.append("(");
        this.siddhiAppString.append(stringJoiner.toString());
        this.siddhiAppString.append(");\n");
    }

    private String toType(Class<?> cls) {
        System.out.println(cls.getCanonicalName());
        return cls.equals(Long.class) ? "LONG" : cls.equals(Integer.class) ? "INT" : cls.equals(Double.class) ? "DOUBLE" : cls.equals(Float.class) ? "FLOAT" : cls.equals(Boolean.class) ? "BOOL" : "STRING";
    }

    private void registerStatements(String str, String str2, String str3) {
        this.siddhiAppString.append(str).append("\n").append(str2).append("\n").append("insert into ").append(fixEventName(str3)).append(";");
        LOG.info("Registering statement: \n" + this.siddhiAppString.toString());
    }

    public void onEvent(Map<String, Object> map, String str) {
        try {
            this.siddhiInputHandlers.get(str).send(toObjArr(map));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private Object[] toObjArr(Map<String, Object> map) {
        return map.values().toArray();
    }

    public void discard() {
        this.siddhiAppRuntime.shutdown();
    }

    protected abstract String fromStatement(List<String> list, T t);

    protected abstract String selectStatement(T t);

    private String fixEventName(String str) {
        return str.replaceAll("\\.", "");
    }
}
