/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.siddhi.core.query.processor.stream.window;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.log4j.Logger;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.SessionComplexEventChunk;
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.SchedulingProcessor;
import org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor;
import org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor;
import org.wso2.siddhi.core.table.Table;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.core.util.SessionContainer;
import org.wso2.siddhi.core.util.collection.operator.CompiledCondition;
import org.wso2.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import org.wso2.siddhi.core.util.collection.operator.Operator;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.parser.OperatorParser;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import org.wso2.siddhi.query.api.expression.Expression;

@Extension(name="session", namespace="", description="This is a session window that holds events that belong to a specific session. The events that belong to a specific session are identified by a grouping attribute (i.e., a session key). A session gap period is specified to determine the time period after which the session is considered to be expired. A new event that arrives with a specific value for the session key is matched with the session window with the same session key.\n  When performing aggregations for a specific session, you can include events with the matching session key that arrive after the session is expired if required. This is done by specifying a latency time period that is less than the session gap period.\nTo have aggregate functions with session windows, the events need to be grouped by the session key via a 'group by' clause.", parameters={@Parameter(name="window.session", description="The time period for which the session considered is valid. This is specified in seconds, minutes, or milliseconds (i.e., 'min', 'sec', or 'ms'.", type={DataType.INT, DataType.LONG, DataType.TIME}), @Parameter(name="window.key", description="The grouping attribute for events.", type={DataType.STRING}, optional=true, defaultValue="default-key"), @Parameter(name="window.allowedlatency", description="This specifies the time period for which the session window is valid after the expiration of the session. The time period specified here should be less than the session time gap (which is specified via the 'window.session' parameter).", type={DataType.INT, DataType.LONG, DataType.TIME}, optional=true, defaultValue="0")}, examples={@Example(syntax="define stream PurchaseEventStream (user string, item_number int, price float, quantity int);\n\n@info(name='query0) \nfrom PurchaseEventStream#window.session(5 sec, user, 2 sec) \nselect * \ninsert all events into OutputStream;", description="This query processes events that arrive at the PurchaseEvent input stream. The 'user' attribute is the session key, and the session gap is 5 seconds. '2 sec' is specified as the allowed latency. Therefore, events with the matching user name that arrive 2 seconds after the expiration of the session are also considered when performing aggregations for the session identified by the given user name.")})
public class SessionWindowProcessor
extends WindowProcessor
implements SchedulingProcessor,
FindableProcessor {
    private static final Logger log = Logger.getLogger(SessionWindowProcessor.class);
    private long sessionGap = 0L;
    private long allowedLatency = 0L;
    private VariableExpressionExecutor sessionKeyExecutor;
    private Scheduler scheduler;
    private Map<String, SessionContainer> sessionMap;
    private Map<String, Long> sessionKeyEndTimeMap;
    private SessionContainer sessionContainer;
    private SessionComplexEventChunk<StreamEvent> expiredEventChunk;
    private static final String DEFAULT_KEY = "default-key";

    @Override
    public Scheduler getScheduler() {
        return this.scheduler;
    }

    @Override
    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, boolean outputExpectsExpiredEvents, SiddhiAppContext siddhiAppContext) {
        this.sessionMap = new ConcurrentHashMap<String, SessionContainer>();
        this.sessionKeyEndTimeMap = new HashMap<String, Long>();
        this.sessionContainer = new SessionContainer();
        this.expiredEventChunk = new SessionComplexEventChunk();
        if (attributeExpressionExecutors.length < 1 || attributeExpressionExecutors.length > 3) throw new SiddhiAppValidationException("Session window should only have one to three parameters (<int|long|time> sessionGap, <String> sessionKey, <int|long|time> allowedLatency, but found " + attributeExpressionExecutors.length + " input attributes");
        if (!(attributeExpressionExecutors[0] instanceof ConstantExpressionExecutor)) throw new SiddhiAppValidationException("Session window's 1st parameter, session gap should be a constant parameter attribute but found a dynamic attribute " + attributeExpressionExecutors[0].getClass().getCanonicalName());
        if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.INT && attributeExpressionExecutors[0].getReturnType() != Attribute.Type.LONG) {
            throw new SiddhiAppValidationException("Session window's session gap parameter should be either int or long, but found " + attributeExpressionExecutors[0].getReturnType());
        }
        this.sessionGap = (Long)((ConstantExpressionExecutor)attributeExpressionExecutors[0]).getValue();
        if (attributeExpressionExecutors.length == 3) {
            if (!(attributeExpressionExecutors[1] instanceof VariableExpressionExecutor)) throw new SiddhiAppValidationException("Session window's 2nd parameter, session key should be a dynamic parameter attribute but found a constant attribute " + attributeExpressionExecutors[1].getClass().getCanonicalName());
            if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
                throw new SiddhiAppValidationException("Session window's session key parameter type should be string, but found " + attributeExpressionExecutors[1].getReturnType());
            }
            this.sessionKeyExecutor = (VariableExpressionExecutor)attributeExpressionExecutors[1];
            if (!(attributeExpressionExecutors[2] instanceof ConstantExpressionExecutor)) throw new SiddhiAppValidationException("Session window's 3rd parameter, allowedLatency should be a constant parameter attribute but found a dynamic attribute " + attributeExpressionExecutors[2].getClass().getCanonicalName());
            if (attributeExpressionExecutors[2].getReturnType() != Attribute.Type.INT && attributeExpressionExecutors[2].getReturnType() != Attribute.Type.LONG) throw new SiddhiAppValidationException("Session window's allowedLatency parameter should be either int or long, but found " + attributeExpressionExecutors[2].getReturnType());
            this.allowedLatency = (Long)((ConstantExpressionExecutor)attributeExpressionExecutors[2]).getValue();
            this.validateAllowedLatency(this.allowedLatency, this.sessionGap);
        }
        if (attributeExpressionExecutors.length != 2) return;
        if (attributeExpressionExecutors[1] instanceof VariableExpressionExecutor) {
            if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) throw new SiddhiAppValidationException("Session window's session key parameter type should be string, but found " + attributeExpressionExecutors[1].getReturnType());
            this.sessionKeyExecutor = (VariableExpressionExecutor)attributeExpressionExecutors[1];
            return;
        } else {
            if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.INT && attributeExpressionExecutors[1].getReturnType() != Attribute.Type.LONG) throw new SiddhiAppValidationException("Session window's allowedLatency parameter should be either int or long, but found " + attributeExpressionExecutors[1].getReturnType());
            this.allowedLatency = (Long)((ConstantExpressionExecutor)attributeExpressionExecutors[1]).getValue();
            this.validateAllowedLatency(this.allowedLatency, this.sessionGap);
        }
    }

    private void validateAllowedLatency(long allowedLatency, long sessionGap) {
        if (allowedLatency > sessionGap) {
            throw new SiddhiAppValidationException("Session window's allowedLatency parameter value should not be greater than the session gap parameter value");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner) {
        String key = DEFAULT_KEY;
        SessionWindowProcessor sessionWindowProcessor = this;
        synchronized (sessionWindowProcessor) {
            while (streamEventChunk.hasNext()) {
                StreamEvent streamEvent = (StreamEvent)streamEventChunk.next();
                long eventTimestamp = streamEvent.getTimestamp();
                long maxTimestamp = eventTimestamp + this.sessionGap;
                long aliveTimestamp = maxTimestamp + this.allowedLatency;
                if (streamEvent.getType() == ComplexEvent.Type.CURRENT) {
                    if (this.sessionKeyExecutor != null) {
                        key = (String)this.sessionKeyExecutor.execute(streamEvent);
                    }
                    if ((this.sessionContainer = this.sessionMap.get(key)) == null) {
                        this.sessionContainer = new SessionContainer(key);
                    }
                    this.sessionMap.put(key, this.sessionContainer);
                    StreamEvent clonedStreamEvent = streamEventCloner.copyStreamEvent(streamEvent);
                    clonedStreamEvent.setType(ComplexEvent.Type.EXPIRED);
                    SessionComplexEventChunk<StreamEvent> currentSession = this.sessionContainer.getCurrentSession();
                    if (this.sessionContainer.getCurrentSession().getFirst() == null) {
                        currentSession.add(clonedStreamEvent);
                        currentSession.setTimestamps(eventTimestamp, maxTimestamp, aliveTimestamp);
                        this.scheduler.notifyAt(maxTimestamp);
                        continue;
                    }
                    if (eventTimestamp >= currentSession.getStartTimestamp()) {
                        if (eventTimestamp <= currentSession.getEndTimestamp()) {
                            currentSession.setTimestamps(currentSession.getStartTimestamp(), maxTimestamp, aliveTimestamp);
                            currentSession.add(clonedStreamEvent);
                            this.scheduler.notifyAt(maxTimestamp);
                            continue;
                        }
                        if (this.allowedLatency <= 0L) continue;
                        this.moveCurrentSessionToPreviousSession();
                        currentSession.clear();
                        currentSession.setTimestamps(eventTimestamp, maxTimestamp, aliveTimestamp);
                        currentSession.add(clonedStreamEvent);
                        this.scheduler.notifyAt(maxTimestamp);
                        continue;
                    }
                    this.addLateEvent(streamEventChunk, eventTimestamp, clonedStreamEvent);
                    continue;
                }
                this.currentSessionTimeout(eventTimestamp);
                if (this.allowedLatency <= 0L) continue;
                this.previousSessionTimeout(eventTimestamp);
            }
        }
        nextProcessor.process(streamEventChunk);
        if (this.expiredEventChunk != null && this.expiredEventChunk.getFirst() != null) {
            nextProcessor.process(this.expiredEventChunk);
            this.expiredEventChunk.clear();
        }
    }

    private void mergeWindows(SessionComplexEventChunk<StreamEvent> previousWindow, SessionComplexEventChunk<StreamEvent> nextWindow) {
        if (previousWindow.getFirst() != null && previousWindow.getEndTimestamp() >= nextWindow.getStartTimestamp() - this.sessionGap) {
            if (nextWindow.hasNext()) {
                nextWindow.next();
            }
            nextWindow.insertBeforeCurrent((StreamEvent)previousWindow.getFirst());
            nextWindow.setStartTimestamp(previousWindow.getStartTimestamp());
            previousWindow.clear();
        }
    }

    private void moveCurrentSessionToPreviousSession() {
        SessionComplexEventChunk<StreamEvent> currentSession = this.sessionContainer.getCurrentSession();
        SessionComplexEventChunk<StreamEvent> previousSession = this.sessionContainer.getPreviousSession();
        if (previousSession.getFirst() == null) {
            previousSession.add((StreamEvent)currentSession.getFirst());
        } else {
            this.expiredEventChunk.setKey(previousSession.getKey());
            this.expiredEventChunk.setTimestamps(previousSession.getStartTimestamp(), previousSession.getEndTimestamp(), previousSession.getAliveTimestamp());
            this.expiredEventChunk.add((StreamEvent)previousSession.getFirst());
            previousSession.clear();
            previousSession.add((StreamEvent)currentSession.getFirst());
        }
        previousSession.setTimestamps(currentSession.getStartTimestamp(), currentSession.getEndTimestamp(), currentSession.getAliveTimestamp());
        this.scheduler.notifyAt(currentSession.getAliveTimestamp());
    }

    private void addLateEvent(ComplexEventChunk<StreamEvent> streamEventChunk, long eventTimestamp, StreamEvent streamEvent) {
        SessionComplexEventChunk<StreamEvent> currentSession = this.sessionContainer.getCurrentSession();
        SessionComplexEventChunk<StreamEvent> previousSession = this.sessionContainer.getPreviousSession();
        if (this.allowedLatency > 0L) {
            if (eventTimestamp >= currentSession.getStartTimestamp() - this.sessionGap) {
                if (currentSession.hasNext()) {
                    currentSession.next();
                }
                currentSession.insertBeforeCurrent(streamEvent);
                currentSession.setStartTimestamp(eventTimestamp);
                this.mergeWindows(previousSession, currentSession);
            } else if (previousSession.getFirst() == null && eventTimestamp < currentSession.getStartTimestamp() - this.sessionGap) {
                streamEventChunk.remove();
                log.info((Object)("The event, " + streamEvent + " is late and it's session window has been timeout"));
            } else if (eventTimestamp >= previousSession.getStartTimestamp() - this.sessionGap) {
                previousSession.add(streamEvent);
                if (eventTimestamp <= previousSession.getEndTimestamp() - this.sessionGap && eventTimestamp < previousSession.getStartTimestamp()) {
                    previousSession.setStartTimestamp(eventTimestamp);
                } else {
                    previousSession.setEndTimestamp(eventTimestamp + this.sessionGap);
                    previousSession.setAliveTimestamp(eventTimestamp + this.sessionGap + this.allowedLatency);
                    this.mergeWindows(previousSession, currentSession);
                }
            } else {
                streamEventChunk.remove();
                log.info((Object)("The event, " + streamEvent + " is late and it's session window has been timeout"));
            }
        } else if (eventTimestamp >= currentSession.getStartTimestamp() - this.sessionGap) {
            if (currentSession.hasNext()) {
                currentSession.next();
            }
            currentSession.insertBeforeCurrent(streamEvent);
            currentSession.setStartTimestamp(eventTimestamp);
        } else {
            streamEventChunk.remove();
            log.info((Object)("The event, " + streamEvent + " is late and it's session window has been timeout"));
        }
    }

    private void currentSessionTimeout(long eventTimestamp) {
        Map currentEndTimestamps = this.findAllCurrentEndTimestamps(this.sessionMap);
        if (currentEndTimestamps.size() > 1) {
            currentEndTimestamps = currentEndTimestamps.entrySet().stream().sorted(Map.Entry.comparingByValue()).collect(Collectors.toMap(e -> (String)e.getKey(), e -> (Long)e.getValue(), (e1, e2) -> e1, LinkedHashMap::new));
        }
        for (Map.Entry<String, Long> entry : currentEndTimestamps.entrySet()) {
            long sessionEndTime = entry.getValue();
            SessionComplexEventChunk<StreamEvent> currentSession = this.sessionMap.get(entry.getKey()).getCurrentSession();
            SessionComplexEventChunk<StreamEvent> previousSession = this.sessionMap.get(entry.getKey()).getPreviousSession();
            if (currentSession.getFirst() == null || eventTimestamp < sessionEndTime) break;
            if (this.allowedLatency > 0L) {
                previousSession.add((StreamEvent)currentSession.getFirst());
                previousSession.setTimestamps(currentSession.getStartTimestamp(), currentSession.getEndTimestamp(), currentSession.getAliveTimestamp());
                this.scheduler.notifyAt(currentSession.getAliveTimestamp());
                currentSession.clear();
                continue;
            }
            this.expiredEventChunk.setKey(currentSession.getKey());
            this.expiredEventChunk.setTimestamps(currentSession.getStartTimestamp(), currentSession.getEndTimestamp(), currentSession.getAliveTimestamp());
            this.expiredEventChunk.add((StreamEvent)currentSession.getFirst());
            currentSession.clear();
        }
    }

    private void previousSessionTimeout(long eventTimestamp) {
        Map.Entry<String, Long> entry;
        SessionComplexEventChunk<StreamEvent> previousSession;
        Map previousEndTimestamps = this.findAllPreviousEndTimestamps(this.sessionMap);
        if (previousEndTimestamps.size() > 1) {
            previousEndTimestamps = previousEndTimestamps.entrySet().stream().sorted(Map.Entry.comparingByValue()).collect(Collectors.toMap(e -> (String)e.getKey(), e -> (Long)e.getValue(), (e1, e2) -> e1, LinkedHashMap::new));
        }
        Iterator<Map.Entry<String, Long>> iterator = previousEndTimestamps.entrySet().iterator();
        while (iterator.hasNext() && (previousSession = this.sessionMap.get((entry = iterator.next()).getKey()).getPreviousSession()) != null && previousSession.getFirst() != null && eventTimestamp >= previousSession.getAliveTimestamp()) {
            this.expiredEventChunk.setKey(previousSession.getKey());
            this.expiredEventChunk.setTimestamps(previousSession.getStartTimestamp(), previousSession.getEndTimestamp(), previousSession.getAliveTimestamp());
            this.expiredEventChunk.add((StreamEvent)previousSession.getFirst());
            previousSession.clear();
        }
    }

    private Map<String, Long> findAllCurrentEndTimestamps(Map<String, SessionContainer> sessionMap) {
        Collection<SessionContainer> sessionContainerList = sessionMap.values();
        if (!this.sessionKeyEndTimeMap.isEmpty()) {
            this.sessionKeyEndTimeMap.clear();
        }
        for (SessionContainer sessionContainer : sessionContainerList) {
            if (sessionContainer.getCurrentSessionEndTimestamp() == -1L) continue;
            this.sessionKeyEndTimeMap.put(sessionContainer.getKey(), sessionContainer.getCurrentSessionEndTimestamp());
        }
        return this.sessionKeyEndTimeMap;
    }

    private Map<String, Long> findAllPreviousEndTimestamps(Map<String, SessionContainer> sessionMap) {
        Collection<SessionContainer> sessionContainerList = sessionMap.values();
        if (!this.sessionKeyEndTimeMap.isEmpty()) {
            this.sessionKeyEndTimeMap.clear();
        }
        for (SessionContainer sessionContainer : sessionContainerList) {
            if (sessionContainer.getPreviousSessionEndTimestamp() == -1L) continue;
            this.sessionKeyEndTimeMap.put(sessionContainer.getKey(), sessionContainer.getPreviousSessionEndTimestamp());
        }
        return this.sessionKeyEndTimeMap;
    }

    @Override
    public void start() {
    }

    @Override
    public void stop() {
    }

    @Override
    public synchronized Map<String, Object> currentState() {
        HashMap<String, Object> state = new HashMap<String, Object>();
        state.put("sessionMap", this.sessionMap);
        state.put("sessionContainer", this.sessionContainer);
        state.put("expiredEventChunk", this.expiredEventChunk);
        return state;
    }

    @Override
    public synchronized void restoreState(Map<String, Object> state) {
        this.sessionMap = (ConcurrentHashMap)state.get("sessionMap");
        this.sessionContainer = (SessionContainer)state.get("sessionContainer");
        this.expiredEventChunk = (SessionComplexEventChunk)state.get("expiredEventChunk");
    }

    @Override
    public synchronized StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition) {
        return ((Operator)compiledCondition).find(matchingEvent, this.expiredEventChunk, this.streamEventCloner);
    }

    @Override
    public CompiledCondition compileCondition(Expression condition, MatchingMetaInfoHolder matchingMetaInfoHolder, SiddhiAppContext siddhiAppContext, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, Table> tableMap, String queryName) {
        return OperatorParser.constructOperator(this.expiredEventChunk, condition, matchingMetaInfoHolder, siddhiAppContext, variableExpressionExecutors, tableMap, this.queryName);
    }
}

