package org.streampipes.wrapper.siddhi.engine;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.streampipes.model.graph.DataProcessorInvocation;
import org.streampipes.model.runtime.EventFactory;
import org.streampipes.model.runtime.SchemaInfo;
import org.streampipes.model.runtime.SourceInfo;
import org.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.streampipes.wrapper.params.binding.EventProcessorBindingParams;
import org.streampipes.wrapper.routing.SpOutputCollector;
import org.streampipes.wrapper.runtime.EventProcessor;
import org.streampipes.wrapper.siddhi.manager.SpSiddhiManager;
import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.stream.output.StreamCallback;

/* loaded from: input_file:org/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.class */
public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> implements EventProcessor<B> {
    private StringBuilder siddhiAppString;
    private SiddhiAppRuntime siddhiAppRuntime;
    private Map<String, InputHandler> siddhiInputHandlers;
    private List<String> inputStreamNames;
    private List<String> sortedEventKeys;
    private Boolean debugMode;
    private SiddhiDebugCallback debugCallback;
    private static final Logger LOG = LoggerFactory.getLogger(SiddhiEventEngine.class);

    public SiddhiEventEngine() {
        this.siddhiAppString = new StringBuilder();
        this.siddhiInputHandlers = new HashMap();
        this.inputStreamNames = new ArrayList();
        this.sortedEventKeys = new ArrayList();
        this.debugMode = false;
    }

    public SiddhiEventEngine(SiddhiDebugCallback siddhiDebugCallback) {
        this();
        this.debugCallback = siddhiDebugCallback;
        this.debugMode = true;
    }

    public void onInvocation(final B b, final SpOutputCollector spOutputCollector, final EventProcessorRuntimeContext eventProcessorRuntimeContext) {
        if (b.getInEventTypes().size() != b.getGraph().getInputStreams().size()) {
            throw new IllegalArgumentException("Input parameters do not match!");
        }
        SiddhiManager siddhiManager = SpSiddhiManager.INSTANCE.getSiddhiManager();
        LOG.info("Configuring event types for graph " + b.getGraph().getName());
        b.getInEventTypes().forEach((str, map) -> {
            registerEventTypeIfNotExists(str, map);
            this.inputStreamNames.add(prepareName(str));
        });
        registerStatements(fromStatement(this.inputStreamNames, b), selectStatement(b), getOutputTopicName(b));
        this.siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(this.siddhiAppString.toString());
        b.getInEventTypes().forEach((str2, map2) -> {
            this.siddhiInputHandlers.put(str2, this.siddhiAppRuntime.getInputHandler(prepareName(str2)));
        });
        if (this.debugMode.booleanValue()) {
            this.siddhiAppRuntime.addCallback(prepareName(getOutputTopicName(b)), new StreamCallback() { // from class: org.streampipes.wrapper.siddhi.engine.SiddhiEventEngine.2
                public void receive(Event[] eventArr) {
                    SiddhiEventEngine.LOG.info("Siddhi is firing");
                    if (eventArr.length > 0) {
                        SiddhiEventEngine.this.debugCallback.onEvent(eventArr[eventArr.length - 1]);
                    }
                }
            });
        } else {
            this.siddhiAppRuntime.addCallback(prepareName(getOutputTopicName(b)), new StreamCallback() { // from class: org.streampipes.wrapper.siddhi.engine.SiddhiEventEngine.1
                public void receive(Event[] eventArr) {
                    if (eventArr.length > 0) {
                        spOutputCollector.collect(SiddhiEventEngine.this.toSpEvent(eventArr[eventArr.length - 1], b, eventProcessorRuntimeContext.getOutputSchemaInfo(), eventProcessorRuntimeContext.getOutputSourceInfo()));
                    }
                }
            });
        }
    }

    private String getOutputTopicName(B b) {
        return b.getGraph().getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public org.streampipes.model.runtime.Event toSpEvent(Event event, B b, SchemaInfo schemaInfo, SourceInfo sourceInfo) {
        HashMap hashMap = new HashMap();
        for (int i = 0; i < this.sortedEventKeys.size(); i++) {
            hashMap.put(this.sortedEventKeys.get(i), event.getData(i));
        }
        return EventFactory.fromMap(hashMap, sourceInfo, schemaInfo);
    }

    private void registerEventTypeIfNotExists(String str, Map<String, Object> map) {
        String str2 = "define stream " + prepareName(str);
        StringJoiner stringJoiner = new StringJoiner(",");
        Iterator<String> it = map.keySet().iterator();
        while (it.hasNext()) {
            this.sortedEventKeys.add(it.next());
            Collections.sort(this.sortedEventKeys);
        }
        for (String str3 : this.sortedEventKeys) {
            stringJoiner.add("s0" + str3 + " " + toType((Class) map.get(str3)));
        }
        this.siddhiAppString.append(str2);
        this.siddhiAppString.append("(");
        this.siddhiAppString.append(stringJoiner.toString());
        this.siddhiAppString.append(");\n");
    }

    private String toType(Class<?> cls) {
        return cls.equals(Long.class) ? "LONG" : cls.equals(Integer.class) ? "INT" : (cls.equals(Double.class) || cls.equals(Float.class)) ? "DOUBLE" : cls.equals(Boolean.class) ? "BOOL" : "STRING";
    }

    private void registerStatements(String str, String str2, String str3) {
        this.siddhiAppString.append(str).append("\n").append(str2).append("\n").append("insert into ").append(prepareName(str3)).append(";");
        LOG.info("Registering statement: \n" + this.siddhiAppString.toString());
    }

    public void onEvent(org.streampipes.model.runtime.Event event, SpOutputCollector spOutputCollector) {
        try {
            this.siddhiInputHandlers.get(event.getSourceInfo().getSourceId()).send(toObjArr(event.getRaw()));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private Object[] toObjArr(Map<String, Object> map) {
        Object[] objArr = new Object[this.sortedEventKeys.size()];
        for (int i = 0; i < this.sortedEventKeys.size(); i++) {
            objArr[i] = map.get(this.sortedEventKeys.get(i));
        }
        return objArr;
    }

    public void onDetach() {
        this.siddhiAppRuntime.shutdown();
    }

    protected abstract String fromStatement(List<String> list, B b);

    protected abstract String selectStatement(B b);

    protected String prepareName(String str) {
        return str.replaceAll("\\.", "").replaceAll("-", "").replaceAll("::", "");
    }

    protected String getCustomOutputSelectStatement(DataProcessorInvocation dataProcessorInvocation, String str) {
        StringBuilder sb = new StringBuilder();
        sb.append("select ");
        if (this.sortedEventKeys.size() > 0) {
            for (int i = 0; i < this.sortedEventKeys.size() - 1; i++) {
                sb.append(str + ".s0" + this.sortedEventKeys.get(i) + ",");
            }
            sb.append(str + ".s0" + this.sortedEventKeys.get(this.sortedEventKeys.size() - 1));
        }
        return sb.toString();
    }

    protected String getCustomOutputSelectStatement(DataProcessorInvocation dataProcessorInvocation) {
        return getCustomOutputSelectStatement(dataProcessorInvocation, "e1");
    }

    public void setSortedEventKeys(List<String> list) {
        this.sortedEventKeys = list;
    }
}
