/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.siddhi.core.aggregation;

import java.time.Instant;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.aggregation.BaseIncrementalValueStore;
import org.wso2.siddhi.core.aggregation.Executor;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.MetaStreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventPool;
import org.wso2.siddhi.core.exception.SiddhiAppRuntimeException;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.query.selector.GroupByKeyGenerator;
import org.wso2.siddhi.core.query.selector.attribute.processor.executor.GroupByAggregationAttributeExecutor;
import org.wso2.siddhi.core.table.Table;
import org.wso2.siddhi.core.util.IncrementalTimeConverterUtil;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.core.util.snapshot.Snapshotable;
import org.wso2.siddhi.query.api.aggregation.TimePeriod;

public class IncrementalExecutor
implements Executor,
Snapshotable {
    private static final Logger LOG = Logger.getLogger(IncrementalExecutor.class);
    private final StreamEvent resetEvent;
    private final ExpressionExecutor timestampExpressionExecutor;
    private final ExpressionExecutor timeZoneExpressionExecutor;
    private TimePeriod.Duration duration;
    private Table table;
    private GroupByKeyGenerator groupByKeyGenerator;
    private int bufferSize;
    private boolean ignoreEventsOlderThanBuffer;
    private StreamEventPool streamEventPool;
    private long nextEmitTime = -1L;
    private boolean isProcessingOnExternalTime;
    private int currentBufferIndex = -1;
    private long startTimeOfAggregates = -1L;
    private boolean timerStarted = false;
    private boolean isGroupBy;
    private Executor next;
    private Scheduler scheduler;
    private boolean isRoot;
    private long millisecondsPerDuration;
    private boolean eventOlderThanBuffer;
    private int maxTimestampPosition;
    private long maxTimestampInBuffer;
    private long minTimestampInBuffer;
    private Semaphore mutex;
    private boolean isRootAndLoadedFromTable = false;
    private String elementId;
    private BaseIncrementalValueStore baseIncrementalValueStore = null;
    private Map<String, BaseIncrementalValueStore> baseIncrementalValueStoreMap = null;
    private ArrayList<BaseIncrementalValueStore> baseIncrementalValueStoreList = null;
    private ArrayList<HashMap<String, BaseIncrementalValueStore>> baseIncrementalValueGroupByStoreList = null;

    public IncrementalExecutor(TimePeriod.Duration duration, List<ExpressionExecutor> processExpressionExecutors, GroupByKeyGenerator groupByKeyGenerator, MetaStreamEvent metaStreamEvent, int bufferSize, boolean ignoreEventsOlderThanBuffer, IncrementalExecutor child, boolean isRoot, Table table, boolean isProcessingOnExternalTime, SiddhiAppContext siddhiAppContext, String aggregatorName) {
        this.duration = duration;
        this.next = child;
        this.isRoot = isRoot;
        this.table = table;
        this.bufferSize = bufferSize;
        this.ignoreEventsOlderThanBuffer = ignoreEventsOlderThanBuffer;
        this.streamEventPool = new StreamEventPool(metaStreamEvent, 10);
        this.isProcessingOnExternalTime = isProcessingOnExternalTime;
        this.timestampExpressionExecutor = processExpressionExecutors.remove(0);
        this.timeZoneExpressionExecutor = processExpressionExecutors.get(0);
        this.baseIncrementalValueStore = new BaseIncrementalValueStore(-1L, processExpressionExecutors, this.streamEventPool, siddhiAppContext, aggregatorName);
        if (groupByKeyGenerator != null) {
            this.groupByKeyGenerator = groupByKeyGenerator;
            this.isGroupBy = true;
            if (bufferSize > 0 && isRoot) {
                this.baseIncrementalValueGroupByStoreList = new ArrayList(bufferSize + 1);
                for (int i = 0; i < bufferSize + 1; ++i) {
                    this.baseIncrementalValueGroupByStoreList.add(new HashMap());
                }
                this.millisecondsPerDuration = IncrementalTimeConverterUtil.getMillisecondsPerDuration(duration);
            } else {
                this.baseIncrementalValueStoreMap = new HashMap<String, BaseIncrementalValueStore>();
            }
        } else {
            this.isGroupBy = false;
            if (bufferSize > 0 && isRoot) {
                this.baseIncrementalValueStoreList = new ArrayList(bufferSize + 1);
                for (int i = 0; i < bufferSize + 1; ++i) {
                    this.baseIncrementalValueStoreList.add(this.baseIncrementalValueStore.cloneStore(null, -1L));
                }
                this.millisecondsPerDuration = IncrementalTimeConverterUtil.getMillisecondsPerDuration(duration);
            }
        }
        this.resetEvent = this.streamEventPool.borrowEvent();
        this.resetEvent.setType(ComplexEvent.Type.RESET);
        this.setNextExecutor(child);
        this.mutex = new Semaphore(1);
        if (this.elementId == null) {
            this.elementId = "IncrementalExecutor-" + siddhiAppContext.getElementIdGenerator().createNewId();
        }
        siddhiAppContext.getSnapshotService().addSnapshotable(aggregatorName, this);
    }

    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override
    public void execute(ComplexEventChunk streamEventChunk) {
        LOG.debug((Object)("Event Chunk received by " + this.duration + " incremental executor: " + streamEventChunk.toString()));
        streamEventChunk.reset();
        while (streamEventChunk.hasNext()) {
            StreamEvent streamEvent = (StreamEvent)streamEventChunk.next();
            streamEventChunk.remove();
            String timeZone = this.getTimeZone(streamEvent);
            long timestamp = this.getTimestamp(streamEvent, timeZone);
            this.startTimeOfAggregates = IncrementalTimeConverterUtil.getStartTimeOfAggregates(timestamp, this.duration, timeZone);
            if (this.isRootAndLoadedFromTable) {
                if (timestamp < this.nextEmitTime) continue;
                this.isRootAndLoadedFromTable = false;
            }
            if (this.bufferSize > 0 && this.isRoot) {
                try {
                    this.mutex.acquire();
                    this.dispatchBufferedAggregateEvents(this.startTimeOfAggregates);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new SiddhiAppRuntimeException("Error when dispatching events from buffer", e);
                }
                finally {
                    this.mutex.release();
                }
                if (streamEvent.getType() != ComplexEvent.Type.CURRENT) continue;
                if (!this.eventOlderThanBuffer) {
                    this.processAggregates(streamEvent);
                    continue;
                }
                if (this.ignoreEventsOlderThanBuffer) continue;
                this.startTimeOfAggregates = this.minTimestampInBuffer;
                this.processAggregates(streamEvent);
                continue;
            }
            if (timestamp >= this.nextEmitTime) {
                this.nextEmitTime = IncrementalTimeConverterUtil.getNextEmitTime(timestamp, this.duration, timeZone);
                this.dispatchAggregateEvents(this.startTimeOfAggregates);
                if (!this.isProcessingOnExternalTime) {
                    this.sendTimerEvent(timeZone);
                }
            }
            if (streamEvent.getType() != ComplexEvent.Type.CURRENT) continue;
            if (this.nextEmitTime == IncrementalTimeConverterUtil.getNextEmitTime(timestamp, this.duration, timeZone)) {
                this.processAggregates(streamEvent);
                continue;
            }
            if (this.ignoreEventsOlderThanBuffer) continue;
            this.startTimeOfAggregates = this.minTimestampInBuffer;
            this.processAggregates(streamEvent);
        }
    }

    private void sendTimerEvent(String timeZone) {
        if (this.getNextExecutor() != null) {
            StreamEvent timerEvent = this.streamEventPool.borrowEvent();
            timerEvent.setType(ComplexEvent.Type.TIMER);
            timerEvent.setTimestamp(IncrementalTimeConverterUtil.getPreviousStartTime(this.startTimeOfAggregates, this.duration, timeZone));
            ComplexEventChunk<StreamEvent> timerStreamEventChunk = new ComplexEventChunk<StreamEvent>(true);
            timerStreamEventChunk.add(timerEvent);
            this.next.execute(timerStreamEventChunk);
        }
    }

    private long getTimestamp(StreamEvent streamEvent, String timeZone) {
        long timestamp;
        if (streamEvent.getType() == ComplexEvent.Type.CURRENT) {
            timestamp = (Long)this.timestampExpressionExecutor.execute(streamEvent);
            if (this.isRoot && !this.isProcessingOnExternalTime && !this.timerStarted) {
                this.scheduler.notifyAt(IncrementalTimeConverterUtil.getNextEmitTime(timestamp, this.duration, timeZone));
                this.timerStarted = true;
            }
        } else {
            timestamp = streamEvent.getTimestamp();
            if (this.isRoot) {
                this.scheduler.notifyAt(IncrementalTimeConverterUtil.getNextEmitTime(timestamp, this.duration, timeZone));
            }
        }
        return timestamp;
    }

    private String getTimeZone(StreamEvent streamEvent) {
        String timeZone = streamEvent.getType() == ComplexEvent.Type.CURRENT ? this.timeZoneExpressionExecutor.execute(streamEvent).toString() : ZoneOffset.systemDefault().getRules().getOffset(Instant.now()).getId();
        return timeZone;
    }

    @Override
    public Executor getNextExecutor() {
        return this.next;
    }

    @Override
    public void setNextExecutor(Executor nextExecutor) {
        this.next = nextExecutor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processAggregates(StreamEvent streamEvent) {
        IncrementalExecutor incrementalExecutor = this;
        synchronized (incrementalExecutor) {
            if (this.isGroupBy) {
                try {
                    String groupedByKey = this.groupByKeyGenerator.constructEventKey(streamEvent);
                    GroupByAggregationAttributeExecutor.getKeyThreadLocal().set(groupedByKey);
                    if (this.baseIncrementalValueGroupByStoreList != null) {
                        Map baseIncrementalValueGroupByStore = this.baseIncrementalValueGroupByStoreList.get(this.currentBufferIndex);
                        BaseIncrementalValueStore aBaseIncrementalValueStore = baseIncrementalValueGroupByStore.computeIfAbsent(groupedByKey, k -> this.baseIncrementalValueStore.cloneStore((String)k, this.startTimeOfAggregates));
                        this.process(streamEvent, aBaseIncrementalValueStore);
                    }
                    BaseIncrementalValueStore aBaseIncrementalValueStore = this.baseIncrementalValueStoreMap.computeIfAbsent(groupedByKey, k -> this.baseIncrementalValueStore.cloneStore((String)k, this.startTimeOfAggregates));
                    this.process(streamEvent, aBaseIncrementalValueStore);
                }
                finally {
                    GroupByAggregationAttributeExecutor.getKeyThreadLocal().remove();
                }
            } else if (this.baseIncrementalValueStoreList != null) {
                BaseIncrementalValueStore aBaseIncrementalValueStore = this.baseIncrementalValueStoreList.get(this.currentBufferIndex);
                if (!aBaseIncrementalValueStore.isProcessed()) {
                    aBaseIncrementalValueStore.setTimestamp(this.startTimeOfAggregates);
                }
                this.process(streamEvent, aBaseIncrementalValueStore);
            } else {
                this.process(streamEvent, this.baseIncrementalValueStore);
            }
        }
    }

    private void process(StreamEvent streamEvent, BaseIncrementalValueStore baseIncrementalValueStore) {
        List<ExpressionExecutor> expressionExecutors = baseIncrementalValueStore.getExpressionExecutors();
        for (int i = 0; i < expressionExecutors.size(); ++i) {
            ExpressionExecutor expressionExecutor = expressionExecutors.get(i);
            baseIncrementalValueStore.setValue(expressionExecutor.execute(streamEvent), i + 1);
        }
        baseIncrementalValueStore.setProcessed(true);
    }

    private void dispatchAggregateEvents(long startTimeOfNewAggregates) {
        if (this.isGroupBy) {
            this.dispatchEvents(this.baseIncrementalValueStoreMap);
        } else {
            this.dispatchEvent(startTimeOfNewAggregates, this.baseIncrementalValueStore);
        }
    }

    private void dispatchBufferedAggregateEvents(long startTimeOfNewAggregates) {
        if (this.currentBufferIndex == -1) {
            this.maxTimestampPosition = 0;
            this.maxTimestampInBuffer = startTimeOfNewAggregates;
            this.currentBufferIndex = 0;
            this.eventOlderThanBuffer = false;
            return;
        }
        if (startTimeOfNewAggregates > this.maxTimestampInBuffer) {
            if ((startTimeOfNewAggregates - this.maxTimestampInBuffer) / this.millisecondsPerDuration >= (long)(this.bufferSize + 1)) {
                if (this.isGroupBy) {
                    for (int i = 0; i <= this.bufferSize; ++i) {
                        this.dispatchEvents((Map<String, BaseIncrementalValueStore>)this.baseIncrementalValueGroupByStoreList.get(i));
                    }
                } else {
                    for (int i = 0; i <= this.bufferSize; ++i) {
                        this.dispatchEvent(startTimeOfNewAggregates, this.baseIncrementalValueStoreList.get(i));
                    }
                }
                this.maxTimestampPosition = this.currentBufferIndex = (int)((startTimeOfNewAggregates - this.maxTimestampInBuffer) / this.millisecondsPerDuration % (long)(this.bufferSize + 1));
            } else {
                int lastDispatchIndex = (this.maxTimestampPosition + (int)((startTimeOfNewAggregates - this.maxTimestampInBuffer) / this.millisecondsPerDuration)) % (this.bufferSize + 1);
                int minTimestampIndex = this.maxTimestampPosition - this.bufferSize;
                if (minTimestampIndex < 0) {
                    minTimestampIndex = this.bufferSize + 1 + minTimestampIndex;
                }
                if (this.isGroupBy) {
                    do {
                        Map baseIncrementalValueGroupByStore;
                        if ((baseIncrementalValueGroupByStore = (Map)this.baseIncrementalValueGroupByStoreList.get(minTimestampIndex)).size() > 0) {
                            this.dispatchEvents(baseIncrementalValueGroupByStore);
                        }
                        if (++minTimestampIndex <= this.bufferSize) continue;
                        minTimestampIndex = 0;
                    } while (!(lastDispatchIndex != this.bufferSize ? minTimestampIndex == lastDispatchIndex + 1 : minTimestampIndex == 0));
                } else {
                    do {
                        BaseIncrementalValueStore aBaseIncrementalValueStore;
                        if ((aBaseIncrementalValueStore = this.baseIncrementalValueStoreList.get(minTimestampIndex)).isProcessed()) {
                            this.dispatchEvent(startTimeOfNewAggregates, aBaseIncrementalValueStore);
                        }
                        if (++minTimestampIndex <= this.bufferSize) continue;
                        minTimestampIndex = 0;
                    } while (!(lastDispatchIndex != this.bufferSize ? minTimestampIndex == lastDispatchIndex + 1 : minTimestampIndex == 0));
                }
                this.currentBufferIndex = lastDispatchIndex;
                this.maxTimestampPosition = lastDispatchIndex;
            }
            this.maxTimestampInBuffer = startTimeOfNewAggregates;
            this.eventOlderThanBuffer = false;
        } else if ((int)((this.maxTimestampInBuffer - startTimeOfNewAggregates) / this.millisecondsPerDuration) <= this.bufferSize) {
            int tempIndex = this.maxTimestampPosition - (int)((this.maxTimestampInBuffer - startTimeOfNewAggregates) / this.millisecondsPerDuration);
            this.currentBufferIndex = tempIndex >= 0 ? tempIndex : this.bufferSize + 1 + tempIndex;
            this.eventOlderThanBuffer = false;
        } else {
            int minTimestampPosition = this.maxTimestampPosition - this.bufferSize;
            this.currentBufferIndex = minTimestampPosition < 0 ? this.bufferSize + 1 + minTimestampPosition : minTimestampPosition;
            this.minTimestampInBuffer = this.maxTimestampInBuffer - (long)this.bufferSize * this.millisecondsPerDuration;
            this.eventOlderThanBuffer = true;
        }
    }

    private void dispatchEvent(long startTimeOfNewAggregates, BaseIncrementalValueStore aBaseIncrementalValueStore) {
        if (aBaseIncrementalValueStore.isProcessed()) {
            StreamEvent streamEvent = aBaseIncrementalValueStore.createStreamEvent();
            ComplexEventChunk<StreamEvent> eventChunk = new ComplexEventChunk<StreamEvent>(true);
            eventChunk.add(streamEvent);
            LOG.debug((Object)("Event dispatched by " + this.duration + " incremental executor: " + eventChunk.toString()));
            this.table.addEvents(eventChunk, 1);
            if (this.getNextExecutor() != null) {
                this.next.execute(eventChunk);
            }
        }
        this.cleanBaseIncrementalValueStore(startTimeOfNewAggregates, aBaseIncrementalValueStore);
    }

    private void dispatchEvents(Map<String, BaseIncrementalValueStore> baseIncrementalValueGroupByStore) {
        int noOfEvents = baseIncrementalValueGroupByStore.size();
        if (noOfEvents > 0) {
            ComplexEventChunk<StreamEvent> eventChunk = new ComplexEventChunk<StreamEvent>(true);
            for (BaseIncrementalValueStore aBaseIncrementalValueStore : baseIncrementalValueGroupByStore.values()) {
                StreamEvent streamEvent = aBaseIncrementalValueStore.createStreamEvent();
                eventChunk.add(streamEvent);
            }
            LOG.debug((Object)("Event dispatched by " + this.duration + " incremental executor: " + eventChunk.toString()));
            this.table.addEvents(eventChunk, noOfEvents);
            if (this.getNextExecutor() != null) {
                this.next.execute(eventChunk);
            }
        }
        baseIncrementalValueGroupByStore.clear();
    }

    private void cleanBaseIncrementalValueStore(long startTimeOfNewAggregates, BaseIncrementalValueStore baseIncrementalValueStore) {
        baseIncrementalValueStore.clearValues();
        baseIncrementalValueStore.setTimestamp(startTimeOfNewAggregates);
        baseIncrementalValueStore.setProcessed(false);
        for (ExpressionExecutor expressionExecutor : baseIncrementalValueStore.getExpressionExecutors()) {
            expressionExecutor.execute(this.resetEvent);
        }
    }

    ArrayList<HashMap<String, BaseIncrementalValueStore>> getBaseIncrementalValueGroupByStoreList() {
        return this.baseIncrementalValueGroupByStoreList;
    }

    Map<String, BaseIncrementalValueStore> getBaseIncrementalValueStoreMap() {
        return this.baseIncrementalValueStoreMap;
    }

    ArrayList<BaseIncrementalValueStore> getBaseIncrementalValueStoreList() {
        return this.baseIncrementalValueStoreList;
    }

    BaseIncrementalValueStore getBaseIncrementalValueStore() {
        return this.baseIncrementalValueStore;
    }

    public long getOldestEventTimestamp() {
        if (this.bufferSize > 0 && this.isRoot) {
            try {
                this.mutex.acquire();
                if (this.currentBufferIndex == -1) {
                    long l = -1L;
                    return l;
                }
                long l = this.maxTimestampInBuffer - (long)this.bufferSize * this.millisecondsPerDuration;
                return l;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new SiddhiAppRuntimeException("Error when getting the oldest in-memory event timestamp", e);
            }
            finally {
                this.mutex.release();
            }
        }
        if (this.isGroupBy) {
            return this.baseIncrementalValueStoreMap.size() != 0 ? ((BaseIncrementalValueStore)this.baseIncrementalValueStoreMap.values().toArray()[0]).getTimestamp() : -1L;
        }
        return this.baseIncrementalValueStore.isProcessed() ? this.baseIncrementalValueStore.getTimestamp() : -1L;
    }

    public long getNewestEventTimestamp() {
        if (this.bufferSize > 0 && this.isRoot) {
            try {
                this.mutex.acquire();
                if (this.currentBufferIndex == -1) {
                    long l = -1L;
                    return l;
                }
                long l = this.maxTimestampInBuffer;
                return l;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new SiddhiAppRuntimeException("Error when getting the newest in-memory event timestamp", e);
            }
            finally {
                this.mutex.release();
            }
        }
        if (this.isGroupBy) {
            return this.baseIncrementalValueStoreMap.size() != 0 ? ((BaseIncrementalValueStore)this.baseIncrementalValueStoreMap.values().toArray()[0]).getTimestamp() : -1L;
        }
        return this.baseIncrementalValueStore.isProcessed() ? this.baseIncrementalValueStore.getTimestamp() : -1L;
    }

    public long getNextEmitTime() {
        return this.nextEmitTime;
    }

    public void setValuesForInMemoryRecreateFromTable(boolean isRootAndLoadedFromTable, long emitTimeOfLatestEventInTable) {
        this.isRootAndLoadedFromTable = isRootAndLoadedFromTable;
        this.nextEmitTime = emitTimeOfLatestEventInTable;
    }

    @Override
    public Map<String, Object> currentState() {
        HashMap<String, Object> state = new HashMap<String, Object>();
        state.put("NextEmitTime", this.nextEmitTime);
        state.put("CurrentBufferIndex", this.currentBufferIndex);
        state.put("StartTimeOfAggregates", this.startTimeOfAggregates);
        state.put("TimerStarted", this.timerStarted);
        state.put("EventOlderThanBuffer", this.eventOlderThanBuffer);
        state.put("MaxTimestampPosition", this.maxTimestampPosition);
        state.put("MaxTimestampInBuffer", this.maxTimestampInBuffer);
        state.put("MinTimestampInBuffer", this.minTimestampInBuffer);
        state.put("IsRootAndLoadedFromTable", this.isRootAndLoadedFromTable);
        state.put("Mutex", this.mutex);
        return state;
    }

    @Override
    public void restoreState(Map<String, Object> state) {
        this.nextEmitTime = (Long)state.get("NextEmitTime");
        this.currentBufferIndex = (Integer)state.get("CurrentBufferIndex");
        this.startTimeOfAggregates = (Long)state.get("StartTimeOfAggregates");
        this.timerStarted = (Boolean)state.get("TimerStarted");
        this.eventOlderThanBuffer = (Boolean)state.get("EventOlderThanBuffer");
        this.maxTimestampPosition = (Integer)state.get("MaxTimestampPosition");
        this.maxTimestampInBuffer = (Long)state.get("MaxTimestampInBuffer");
        this.minTimestampInBuffer = (Long)state.get("MinTimestampInBuffer");
        this.isRootAndLoadedFromTable = (Boolean)state.get("IsRootAndLoadedFromTable");
        this.mutex = (Semaphore)state.get("Mutex");
    }

    @Override
    public String getElementId() {
        return this.elementId;
    }
}

