/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.siddhi.core.query.processor.stream.window;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.SchedulingProcessor;
import org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor;
import org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor;
import org.wso2.siddhi.core.table.Table;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.core.util.collection.operator.CompiledCondition;
import org.wso2.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import org.wso2.siddhi.core.util.collection.operator.Operator;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.parser.OperatorParser;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import org.wso2.siddhi.query.api.expression.Expression;

@Extension(name="externalTimeBatch", namespace="", description="A batch (tumbling) time window based on external time, that holds events arrived during windowTime periods, and gets updated for every windowTime.", parameters={@Parameter(name="timestamp", description="The time which the window determines as current time and will act upon. The value of this parameter should be monotonically increasing.", type={DataType.LONG}), @Parameter(name="window.time", description="The batch time period for which the window should hold events.", type={DataType.INT, DataType.LONG, DataType.TIME}), @Parameter(name="start.time", description="User defined start time. This could either be a constant (of type int, long or time) or an attribute of the corresponding stream (of type long). If an attribute is provided, initial value of attribute would be considered as startTime. When startTime is not given, initial value of timestamp is used as the default.", type={DataType.INT, DataType.LONG, DataType.TIME}, optional=true, defaultValue="0"), @Parameter(name="timeout", description="Time to wait for arrival of new event, before flushing and giving output for events belonging to a specific batch. If timeout is not provided, system waits till an event from next batch arrives to flush current batch.", type={DataType.INT, DataType.LONG, DataType.TIME}, optional=true, defaultValue="0")}, examples={@Example(syntax="define window cseEventWindow (symbol string, price float, volume int) externalTimeBatch(eventTime, 1 sec) output expired events;\n@info(name = 'query0')\nfrom cseEventStream\ninsert into cseEventWindow;\n@info(name = 'query1')\nfrom cseEventWindow\nselect symbol, sum(price) as price\ninsert expired events into outputStream ;", description="This will processing events that arrive every 1 seconds from the eventTime."), @Example(syntax="define window cseEventWindow (symbol string, price float, volume int) externalTimeBatch(eventTime, 20 sec, 0) output expired events;", description="This will processing events that arrive every 1 seconds from the eventTime. Starts on 0th millisecond of an hour."), @Example(syntax="define window cseEventWindow (symbol string, price float, volume int) externalTimeBatch(eventTime, 2 sec, eventTimestamp, 100) output expired events;", description="This will processing events that arrive every 2 seconds from the eventTim. Considers the first event's eventTimestamp value as startTime. Waits 100 milliseconds for the arrival of a new event before flushing current batch.")})
public class ExternalTimeBatchWindowProcessor
extends WindowProcessor
implements SchedulingProcessor,
FindableProcessor {
    private ComplexEventChunk<StreamEvent> currentEventChunk = new ComplexEventChunk(false);
    private ComplexEventChunk<StreamEvent> expiredEventChunk = null;
    private StreamEvent resetEvent = null;
    private VariableExpressionExecutor timestampExpressionExecutor;
    private ExpressionExecutor startTimeAsVariable;
    private long timeToKeep;
    private long endTime = -1L;
    private long startTime = 0L;
    private boolean isStartTimeEnabled = false;
    private long schedulerTimeout = 0L;
    private Scheduler scheduler;
    private long lastScheduledTime;
    private long lastCurrentEventTime;
    private boolean flushed = false;
    private boolean storeExpiredEvents = false;
    private boolean replaceTimestampWithBatchEndTime = false;
    private boolean outputExpectsExpiredEvents;

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, boolean outputExpectsExpiredEvents, SiddhiAppContext siddhiAppContext) {
        this.outputExpectsExpiredEvents = outputExpectsExpiredEvents;
        if (outputExpectsExpiredEvents) {
            this.expiredEventChunk = new ComplexEventChunk(false);
            this.storeExpiredEvents = true;
        }
        if (attributeExpressionExecutors.length < 2 || attributeExpressionExecutors.length > 5) throw new SiddhiAppValidationException("ExternalTimeBatch window should only have two to five parameters (<long> timestamp, <int|long|time> windowTime, <long> startTime, <int|long|time> timeout, <bool> replaceTimestampWithBatchEndTime), but found " + attributeExpressionExecutors.length + " input attributes");
        if (!(attributeExpressionExecutors[0] instanceof VariableExpressionExecutor)) {
            throw new SiddhiAppValidationException("ExternalTime window's 1st parameter timestamp should be a variable, but found " + attributeExpressionExecutors[0].getClass());
        }
        if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.LONG) {
            throw new SiddhiAppValidationException("ExternalTime window's 1st parameter timestamp should be type long, but found " + attributeExpressionExecutors[0].getReturnType());
        }
        this.timestampExpressionExecutor = (VariableExpressionExecutor)attributeExpressionExecutors[0];
        if (attributeExpressionExecutors[1].getReturnType() == Attribute.Type.INT) {
            this.timeToKeep = ((Integer)((ConstantExpressionExecutor)attributeExpressionExecutors[1]).getValue()).intValue();
        } else {
            if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.LONG) throw new SiddhiAppValidationException("ExternalTimeBatch window's 2nd parameter windowTime should be either int or long, but found " + attributeExpressionExecutors[1].getReturnType());
            this.timeToKeep = (Long)((ConstantExpressionExecutor)attributeExpressionExecutors[1]).getValue();
        }
        if (attributeExpressionExecutors.length >= 3) {
            this.isStartTimeEnabled = true;
            if (attributeExpressionExecutors[2] instanceof ConstantExpressionExecutor) {
                if (attributeExpressionExecutors[2].getReturnType() == Attribute.Type.INT) {
                    this.startTime = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor)attributeExpressionExecutors[2]).getValue()));
                } else {
                    if (attributeExpressionExecutors[2].getReturnType() != Attribute.Type.LONG) throw new SiddhiAppValidationException("ExternalTimeBatch window's 3rd parameter startTime should either be a constant (of type int or long) or an attribute (of type long), but found " + attributeExpressionExecutors[2].getReturnType());
                    this.startTime = Long.parseLong(String.valueOf(((ConstantExpressionExecutor)attributeExpressionExecutors[2]).getValue()));
                }
            } else {
                if (attributeExpressionExecutors[2].getReturnType() != Attribute.Type.LONG) {
                    throw new SiddhiAppValidationException("ExternalTimeBatch window's 3rd parameter startTime should either be a constant (of type int or long) or an attribute (of type long), but found " + attributeExpressionExecutors[2].getReturnType());
                }
                this.startTimeAsVariable = attributeExpressionExecutors[2];
            }
        }
        if (attributeExpressionExecutors.length >= 4) {
            if (attributeExpressionExecutors[3].getReturnType() == Attribute.Type.INT) {
                this.schedulerTimeout = Integer.parseInt(String.valueOf(((ConstantExpressionExecutor)attributeExpressionExecutors[3]).getValue()));
            } else {
                if (attributeExpressionExecutors[3].getReturnType() != Attribute.Type.LONG) throw new SiddhiAppValidationException("ExternalTimeBatch window's 4th parameter timeout should be either int or long, but found " + attributeExpressionExecutors[3].getReturnType());
                this.schedulerTimeout = Long.parseLong(String.valueOf(((ConstantExpressionExecutor)attributeExpressionExecutors[3]).getValue()));
            }
        }
        if (attributeExpressionExecutors.length == 5) {
            if (attributeExpressionExecutors[4].getReturnType() != Attribute.Type.BOOL) throw new SiddhiAppValidationException("ExternalTimeBatch window's 5th parameter replaceTimestampWithBatchEndTime should be bool, but found " + attributeExpressionExecutors[4].getReturnType());
            this.replaceTimestampWithBatchEndTime = Boolean.parseBoolean(String.valueOf(((ConstantExpressionExecutor)attributeExpressionExecutors[4]).getValue()));
        }
        if (this.schedulerTimeout <= 0L || this.expiredEventChunk != null) return;
        this.expiredEventChunk = new ComplexEventChunk(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - void declaration
     */
    @Override
    protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner) {
        if (streamEventChunk.getFirst() == null) {
            return;
        }
        ArrayList<ComplexEventChunk<StreamEvent>> complexEventChunks = new ArrayList<ComplexEventChunk<StreamEvent>>();
        ExternalTimeBatchWindowProcessor externalTimeBatchWindowProcessor = this;
        synchronized (externalTimeBatchWindowProcessor) {
            void var6_7;
            this.initTiming(streamEventChunk.getFirst());
            StreamEvent streamEvent = streamEventChunk.getFirst();
            while (var6_7 != null) {
                void currStreamEvent = var6_7;
                StreamEvent streamEvent2 = var6_7.getNext();
                if (currStreamEvent.getType() == ComplexEvent.Type.TIMER) {
                    if (this.lastScheduledTime > currStreamEvent.getTimestamp()) continue;
                    if (!this.flushed) {
                        this.flushToOutputChunk(streamEventCloner, complexEventChunks, this.lastCurrentEventTime, true);
                        this.flushed = true;
                    } else if (this.currentEventChunk.getFirst() != null) {
                        this.appendToOutputChunk(streamEventCloner, complexEventChunks, this.lastCurrentEventTime, true);
                    }
                    this.lastScheduledTime = this.siddhiAppContext.getTimestampGenerator().currentTime() + this.schedulerTimeout;
                    this.scheduler.notifyAt(this.lastScheduledTime);
                    continue;
                }
                if (currStreamEvent.getType() != ComplexEvent.Type.CURRENT) continue;
                long currentEventTime = (Long)this.timestampExpressionExecutor.execute((ComplexEvent)currStreamEvent);
                if (this.lastCurrentEventTime < currentEventTime) {
                    this.lastCurrentEventTime = currentEventTime;
                }
                if (currentEventTime < this.endTime) {
                    this.cloneAppend(streamEventCloner, (StreamEvent)currStreamEvent);
                    continue;
                }
                if (this.flushed) {
                    this.appendToOutputChunk(streamEventCloner, complexEventChunks, this.lastCurrentEventTime, false);
                    this.flushed = false;
                } else {
                    this.flushToOutputChunk(streamEventCloner, complexEventChunks, this.lastCurrentEventTime, false);
                }
                this.endTime = this.findEndTime(this.lastCurrentEventTime, this.startTime, this.timeToKeep);
                this.cloneAppend(streamEventCloner, (StreamEvent)currStreamEvent);
                if (this.schedulerTimeout <= 0L) continue;
                this.lastScheduledTime = this.siddhiAppContext.getTimestampGenerator().currentTime() + this.schedulerTimeout;
                this.scheduler.notifyAt(this.lastScheduledTime);
            }
        }
        for (ComplexEventChunk complexEventChunk : complexEventChunks) {
            nextProcessor.process(complexEventChunk);
        }
    }

    private void initTiming(StreamEvent firstStreamEvent) {
        if (this.endTime < 0L) {
            if (this.isStartTimeEnabled) {
                if (this.startTimeAsVariable == null) {
                    this.endTime = this.findEndTime((Long)this.timestampExpressionExecutor.execute(firstStreamEvent), this.startTime, this.timeToKeep);
                } else {
                    this.startTime = (Long)this.startTimeAsVariable.execute(firstStreamEvent);
                    this.endTime = this.startTime + this.timeToKeep;
                }
            } else {
                this.startTime = (Long)this.timestampExpressionExecutor.execute(firstStreamEvent);
                this.endTime = this.startTime + this.timeToKeep;
            }
            if (this.schedulerTimeout > 0L) {
                this.lastScheduledTime = this.siddhiAppContext.getTimestampGenerator().currentTime() + this.schedulerTimeout;
                this.scheduler.notifyAt(this.lastScheduledTime);
            }
        }
    }

    private void flushToOutputChunk(StreamEventCloner streamEventCloner, List<ComplexEventChunk<StreamEvent>> complexEventChunks, long currentTime, boolean preserveCurrentEvents) {
        ComplexEventChunk<StreamEvent> newEventChunk = new ComplexEventChunk<StreamEvent>(true);
        if (this.outputExpectsExpiredEvents && this.expiredEventChunk.getFirst() != null) {
            this.expiredEventChunk.reset();
            while (this.expiredEventChunk.hasNext()) {
                StreamEvent expiredEvent = (StreamEvent)this.expiredEventChunk.next();
                expiredEvent.setTimestamp(currentTime);
            }
            newEventChunk.add(this.expiredEventChunk.getFirst());
        }
        if (this.expiredEventChunk != null) {
            this.expiredEventChunk.clear();
        }
        if (this.currentEventChunk.getFirst() != null) {
            this.resetEvent.setTimestamp(currentTime);
            newEventChunk.add(this.resetEvent);
            this.resetEvent = null;
            if (preserveCurrentEvents || this.storeExpiredEvents) {
                this.currentEventChunk.reset();
                while (this.currentEventChunk.hasNext()) {
                    StreamEvent currentEvent = (StreamEvent)this.currentEventChunk.next();
                    StreamEvent toExpireEvent = streamEventCloner.copyStreamEvent(currentEvent);
                    toExpireEvent.setType(ComplexEvent.Type.EXPIRED);
                    this.expiredEventChunk.add(toExpireEvent);
                }
            }
            newEventChunk.add(this.currentEventChunk.getFirst());
        }
        this.currentEventChunk.clear();
        if (newEventChunk.getFirst() != null) {
            complexEventChunks.add(newEventChunk);
        }
    }

    private void appendToOutputChunk(StreamEventCloner streamEventCloner, List<ComplexEventChunk<StreamEvent>> complexEventChunks, long currentTime, boolean preserveCurrentEvents) {
        ComplexEventChunk<StreamEvent> newEventChunk = new ComplexEventChunk<StreamEvent>(true);
        ComplexEventChunk<StreamEvent> sentEventChunk = new ComplexEventChunk<StreamEvent>(true);
        if (this.currentEventChunk.getFirst() != null) {
            if (this.expiredEventChunk.getFirst() != null) {
                this.expiredEventChunk.reset();
                while (this.expiredEventChunk.hasNext()) {
                    StreamEvent expiredEvent = (StreamEvent)this.expiredEventChunk.next();
                    if (this.outputExpectsExpiredEvents) {
                        StreamEvent toExpireEvent = streamEventCloner.copyStreamEvent(expiredEvent);
                        toExpireEvent.setTimestamp(currentTime);
                        newEventChunk.add(toExpireEvent);
                    }
                    StreamEvent toSendEvent = streamEventCloner.copyStreamEvent(expiredEvent);
                    toSendEvent.setType(ComplexEvent.Type.CURRENT);
                    sentEventChunk.add(toSendEvent);
                }
            }
            StreamEvent toResetEvent = streamEventCloner.copyStreamEvent(this.resetEvent);
            toResetEvent.setTimestamp(currentTime);
            newEventChunk.add(toResetEvent);
            newEventChunk.add((StreamEvent)sentEventChunk.getFirst());
            if (preserveCurrentEvents || this.storeExpiredEvents) {
                this.currentEventChunk.reset();
                while (this.currentEventChunk.hasNext()) {
                    StreamEvent currentEvent = (StreamEvent)this.currentEventChunk.next();
                    StreamEvent toExpireEvent = streamEventCloner.copyStreamEvent(currentEvent);
                    toExpireEvent.setType(ComplexEvent.Type.EXPIRED);
                    this.expiredEventChunk.add(toExpireEvent);
                }
            }
            newEventChunk.add(this.currentEventChunk.getFirst());
        }
        this.currentEventChunk.clear();
        if (newEventChunk.getFirst() != null) {
            complexEventChunks.add(newEventChunk);
        }
    }

    private long findEndTime(long currentTime, long startTime, long timeToKeep) {
        long elapsedTimeSinceLastEmit = (currentTime - startTime) % timeToKeep;
        return currentTime + (timeToKeep - elapsedTimeSinceLastEmit);
    }

    private void cloneAppend(StreamEventCloner streamEventCloner, StreamEvent currStreamEvent) {
        StreamEvent clonedStreamEvent = streamEventCloner.copyStreamEvent(currStreamEvent);
        if (this.replaceTimestampWithBatchEndTime) {
            clonedStreamEvent.setAttribute(this.endTime, this.timestampExpressionExecutor.getPosition());
        }
        this.currentEventChunk.add(clonedStreamEvent);
        if (this.resetEvent == null) {
            this.resetEvent = streamEventCloner.copyStreamEvent(currStreamEvent);
            this.resetEvent.setType(ComplexEvent.Type.RESET);
        }
    }

    @Override
    public void start() {
    }

    @Override
    public void stop() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<String, Object> currentState() {
        HashMap<String, Object> state = new HashMap<String, Object>();
        ExternalTimeBatchWindowProcessor externalTimeBatchWindowProcessor = this;
        synchronized (externalTimeBatchWindowProcessor) {
            state.put("StartTime", this.startTime);
            state.put("EndTime", this.endTime);
            state.put("LastScheduledTime", this.lastScheduledTime);
            state.put("LastCurrentEventTime", this.lastCurrentEventTime);
            state.put("CurrentEventChunk", this.currentEventChunk.getFirst());
            state.put("ExpiredEventChunk", this.expiredEventChunk != null ? this.expiredEventChunk.getFirst() : null);
            state.put("ResetEvent", this.resetEvent);
            state.put("Flushed", this.flushed);
        }
        return state;
    }

    @Override
    public synchronized void restoreState(Map<String, Object> state) {
        this.startTime = (Long)state.get("StartTime");
        this.endTime = (Long)state.get("EndTime");
        this.lastScheduledTime = (Long)state.get("LastScheduledTime");
        this.lastCurrentEventTime = (Long)state.get("LastCurrentEventTime");
        this.currentEventChunk.clear();
        this.currentEventChunk.add((StreamEvent)state.get("CurrentEventChunk"));
        if (this.expiredEventChunk != null) {
            this.expiredEventChunk.clear();
            this.expiredEventChunk.add((StreamEvent)state.get("ExpiredEventChunk"));
        } else {
            if (this.outputExpectsExpiredEvents) {
                this.expiredEventChunk = new ComplexEventChunk(false);
            }
            if (this.schedulerTimeout > 0L) {
                this.expiredEventChunk = new ComplexEventChunk(false);
            }
        }
        this.resetEvent = (StreamEvent)state.get("ResetEvent");
        this.flushed = (Boolean)state.get("Flushed");
    }

    @Override
    public synchronized StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCondition) {
        return ((Operator)compiledCondition).find(matchingEvent, this.expiredEventChunk, this.streamEventCloner);
    }

    @Override
    public CompiledCondition compileCondition(Expression condition, MatchingMetaInfoHolder matchingMetaInfoHolder, SiddhiAppContext siddhiAppContext, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, Table> tableMap, String queryName) {
        if (this.expiredEventChunk == null) {
            this.expiredEventChunk = new ComplexEventChunk(false);
            this.storeExpiredEvents = true;
        }
        return OperatorParser.constructOperator(this.expiredEventChunk, condition, matchingMetaInfoHolder, siddhiAppContext, variableExpressionExecutors, tableMap, this.queryName);
    }

    @Override
    public Scheduler getScheduler() {
        return this.scheduler;
    }

    @Override
    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }
}

