/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.siddhi.core.query.processor.stream.window;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.holder.SnapshotableStreamEventQueue;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.stream.window.TimeWindowProcessor;
import org.wso2.siddhi.core.table.Table;
import org.wso2.siddhi.core.util.collection.operator.CompiledCondition;
import org.wso2.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import org.wso2.siddhi.core.util.collection.operator.Operator;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.parser.OperatorParser;
import org.wso2.siddhi.core.util.snapshot.state.SnapshotStateList;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import org.wso2.siddhi.query.api.expression.Expression;

@Extension(name="delay", namespace="", description="A delay window holds events for a specific time period that is regarded as a delay period before processing them.", parameters={@Parameter(name="window.delay", description="The time period (specified in sec, min, ms) for which  the window should delay the events.", type={DataType.INT, DataType.LONG, DataType.TIME})}, examples={@Example(syntax="define window delayWindow(symbol string, volume int) delay(1 hour);\ndefine stream PurchaseStream(symbol string, volume int);\ndefine stream DeliveryStream(symbol string);\ndefine stream OutputStream(symbol string);\n\n@info(name='query1') \nfrom PurchaseStream\nselect symbol, volume\ninsert into delayWindow;\n\n@info(name='query2') \nfrom delayWindow join DeliveryStream\non delayWindow.symbol == DeliveryStream.symbol\nselect delayWindow.symbol\ninsert into OutputStream;", description="In this example, purchase events that arrive in the 'PurchaseStream' stream are directed to a delay window. At any given time, this delay window holds purchase events that have arrived within the last hour. These purchase events in the window are matched by the 'symbol' attribute, with delivery events that arrive in the 'DeliveryStream' stream. This monitors whether the delivery of products is done with a minimum delay of one hour after the purchase.")})
public class DelayWindowProcessor
extends TimeWindowProcessor {
    private long delayInMilliSeconds;
    private SiddhiAppContext siddhiAppContext;
    private SnapshotableStreamEventQueue delayedEventQueue;
    private volatile long lastTimestamp = Long.MIN_VALUE;

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, boolean outputExpectsExpiredEvents, SiddhiAppContext siddhiAppContext) {
        this.siddhiAppContext = siddhiAppContext;
        this.delayedEventQueue = new SnapshotableStreamEventQueue(this.streamEventClonerHolder);
        if (attributeExpressionExecutors.length != 1) throw new SiddhiAppValidationException("Delay window should only have one parameter (<int|long|time> delayTime), but found " + attributeExpressionExecutors.length + " input attributes");
        if (!(attributeExpressionExecutors[0] instanceof ConstantExpressionExecutor)) throw new SiddhiAppValidationException("Delay window should have constant parameter attribute but found a dynamic attribute " + attributeExpressionExecutors[0].getClass().getCanonicalName());
        if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.INT && attributeExpressionExecutors[0].getReturnType() != Attribute.Type.LONG) {
            throw new SiddhiAppValidationException("Delay window's parameter attribute should be either int or long, but found " + attributeExpressionExecutors[0].getReturnType());
        }
        this.delayInMilliSeconds = Long.parseLong(((ConstantExpressionExecutor)attributeExpressionExecutors[0]).getValue().toString());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner) {
        DelayWindowProcessor delayWindowProcessor = this;
        synchronized (delayWindowProcessor) {
            while (streamEventChunk.hasNext()) {
                StreamEvent delayedEvent;
                long timeDiff;
                StreamEvent streamEvent = (StreamEvent)streamEventChunk.next();
                long currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
                this.delayedEventQueue.reset();
                while (this.delayedEventQueue.hasNext() && (timeDiff = (delayedEvent = this.delayedEventQueue.next()).getTimestamp() - currentTime + this.delayInMilliSeconds) <= 0L) {
                    this.delayedEventQueue.remove();
                    delayedEvent.setTimestamp(currentTime);
                    streamEventChunk.insertBeforeCurrent(delayedEvent);
                }
                if (streamEvent.getType() == ComplexEvent.Type.CURRENT) {
                    this.delayedEventQueue.add(streamEvent);
                    if (this.lastTimestamp < streamEvent.getTimestamp()) {
                        this.getScheduler().notifyAt(streamEvent.getTimestamp() + this.delayInMilliSeconds);
                        this.lastTimestamp = streamEvent.getTimestamp();
                    }
                }
                streamEventChunk.remove();
            }
            this.delayedEventQueue.reset();
        }
        if (streamEventChunk.getFirst() != null) {
            nextProcessor.process(streamEventChunk);
        }
    }

    @Override
    public synchronized StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition) {
        return ((Operator)compiledCondition).find(matchingEvent, this.delayedEventQueue, this.streamEventCloner);
    }

    @Override
    public CompiledCondition compileCondition(Expression condition, MatchingMetaInfoHolder matchingMetaInfoHolder, SiddhiAppContext siddhiAppContext, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, Table> tableMap, String queryName) {
        return OperatorParser.constructOperator(this.delayedEventQueue, condition, matchingMetaInfoHolder, siddhiAppContext, variableExpressionExecutors, tableMap, this.queryName);
    }

    @Override
    public Map<String, Object> currentState() {
        HashMap<String, Object> state = new HashMap<String, Object>();
        state.put("DelayedEventQueue", this.delayedEventQueue.getSnapshot());
        return state;
    }

    @Override
    public void restoreState(Map<String, Object> state) {
        this.delayedEventQueue.restore((SnapshotStateList)state.get("DelayedEventQueue"));
    }
}

