/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.siddhi.core.query.selector.attribute.aggregator.incremental;

import java.time.Instant;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.MetaStreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventPool;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.query.selector.GroupByKeyGenerator;
import org.wso2.siddhi.core.query.selector.attribute.aggregator.incremental.BaseIncrementalValueStore;
import org.wso2.siddhi.core.query.selector.attribute.aggregator.incremental.Executor;
import org.wso2.siddhi.core.query.selector.attribute.processor.executor.GroupByAggregationAttributeExecutor;
import org.wso2.siddhi.core.table.Table;
import org.wso2.siddhi.core.util.IncrementalTimeConverterUtil;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.query.api.aggregation.TimePeriod;

public class IncrementalExecutor
implements Executor {
    private static final Logger LOG = Logger.getLogger(IncrementalExecutor.class);
    private final StreamEvent resetEvent;
    private final ExpressionExecutor timestampExpressionExecutor;
    private final ExpressionExecutor timeZoneExpressionExecutor;
    private TimePeriod.Duration duration;
    private Table table;
    private GroupByKeyGenerator groupByKeyGenerator;
    private int bufferSize;
    private StreamEventPool streamEventPool;
    private long nextEmitTime = -1L;
    private boolean isProcessingOnExternalTime;
    private int currentBufferIndex = 0;
    private long startTimeOfAggregates = -1L;
    private boolean timerStarted = false;
    private boolean isGroupBy;
    private Executor next;
    private Scheduler scheduler;
    private boolean isRoot;
    private BaseIncrementalValueStore baseIncrementalValueStore = null;
    private Map<String, BaseIncrementalValueStore> baseIncrementalValueStoreMap = null;
    private ArrayList<BaseIncrementalValueStore> baseIncrementalValueStoreList = null;
    private ArrayList<HashMap<String, BaseIncrementalValueStore>> baseIncrementalValueGroupByStoreList = null;

    public IncrementalExecutor(TimePeriod.Duration duration, List<ExpressionExecutor> processExpressionExecutors, GroupByKeyGenerator groupByKeyGenerator, MetaStreamEvent metaStreamEvent, int bufferSize, IncrementalExecutor child, boolean isRoot, Table table, boolean isProcessingOnExternalTime) {
        this.duration = duration;
        this.next = child;
        this.isRoot = isRoot;
        this.table = table;
        this.bufferSize = bufferSize;
        this.streamEventPool = new StreamEventPool(metaStreamEvent, 10);
        this.isProcessingOnExternalTime = isProcessingOnExternalTime;
        this.timestampExpressionExecutor = processExpressionExecutors.remove(0);
        this.timeZoneExpressionExecutor = processExpressionExecutors.get(0);
        this.baseIncrementalValueStore = new BaseIncrementalValueStore(-1L, processExpressionExecutors, this.streamEventPool);
        if (groupByKeyGenerator != null) {
            this.groupByKeyGenerator = groupByKeyGenerator;
            this.isGroupBy = true;
            if (bufferSize > 0) {
                this.baseIncrementalValueGroupByStoreList = new ArrayList(bufferSize + 1);
                for (int i = 0; i < bufferSize + 1; ++i) {
                    this.baseIncrementalValueGroupByStoreList.add(new HashMap());
                }
            } else {
                this.baseIncrementalValueStoreMap = new HashMap<String, BaseIncrementalValueStore>();
            }
        } else {
            this.isGroupBy = false;
            if (bufferSize > 0) {
                this.baseIncrementalValueStoreList = new ArrayList(bufferSize + 1);
                for (int i = 0; i < bufferSize + 1; ++i) {
                    this.baseIncrementalValueStoreList.add(this.baseIncrementalValueStore.cloneStore(null, -1L));
                }
            }
        }
        this.resetEvent = this.streamEventPool.borrowEvent();
        this.resetEvent.setType(ComplexEvent.Type.RESET);
        this.setNextExecutor(child);
    }

    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override
    public void execute(ComplexEventChunk streamEventChunk) {
        LOG.debug((Object)("Event Chunk received by " + this.duration + " incremental executor: " + streamEventChunk.toString()));
        streamEventChunk.reset();
        while (streamEventChunk.hasNext()) {
            StreamEvent streamEvent = (StreamEvent)streamEventChunk.next();
            streamEventChunk.remove();
            String timeZone = this.getTimeZone(streamEvent);
            long timestamp = this.getTimestamp(streamEvent, timeZone);
            if (timestamp >= this.nextEmitTime) {
                this.nextEmitTime = IncrementalTimeConverterUtil.getNextEmitTime(timestamp, this.duration, timeZone);
                this.startTimeOfAggregates = IncrementalTimeConverterUtil.getStartTimeOfAggregates(timestamp, this.duration, timeZone);
                this.dispatchAggregateEvents(this.startTimeOfAggregates);
                this.sendTimerEvent(streamEvent, timestamp, timeZone);
            }
            if (streamEvent.getType() != ComplexEvent.Type.CURRENT) continue;
            this.processAggregates(streamEvent);
        }
    }

    private void sendTimerEvent(StreamEvent streamEvent, long timestamp, String timeZone) {
        if (streamEvent.getType() == ComplexEvent.Type.TIMER && this.getNextExecutor() != null) {
            StreamEvent timerEvent = this.streamEventPool.borrowEvent();
            timerEvent.setType(ComplexEvent.Type.TIMER);
            timerEvent.setTimestamp(IncrementalTimeConverterUtil.getEmitTimeOfLastEventToRemove(timestamp, this.duration, this.bufferSize, timeZone));
            ComplexEventChunk<StreamEvent> timerStreamEventChunk = new ComplexEventChunk<StreamEvent>(true);
            timerStreamEventChunk.add(timerEvent);
            this.next.execute(timerStreamEventChunk);
        }
    }

    private long getTimestamp(StreamEvent streamEvent, String timeZone) {
        long timestamp;
        if (streamEvent.getType() == ComplexEvent.Type.CURRENT) {
            timestamp = (Long)this.timestampExpressionExecutor.execute(streamEvent);
            if (this.isRoot && !this.isProcessingOnExternalTime && !this.timerStarted) {
                this.scheduler.notifyAt(IncrementalTimeConverterUtil.getNextEmitTime(timestamp, this.duration, timeZone));
                this.timerStarted = true;
            }
        } else {
            timestamp = streamEvent.getTimestamp();
            if (this.isRoot) {
                this.scheduler.notifyAt(IncrementalTimeConverterUtil.getNextEmitTime(timestamp, this.duration, timeZone));
            }
        }
        return timestamp;
    }

    private String getTimeZone(StreamEvent streamEvent) {
        String timeZone = streamEvent.getType() == ComplexEvent.Type.CURRENT ? this.timeZoneExpressionExecutor.execute(streamEvent).toString() : ZoneOffset.systemDefault().getRules().getOffset(Instant.now()).getId();
        return timeZone;
    }

    @Override
    public Executor getNextExecutor() {
        return this.next;
    }

    @Override
    public void setNextExecutor(Executor nextExecutor) {
        this.next = nextExecutor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processAggregates(StreamEvent streamEvent) {
        IncrementalExecutor incrementalExecutor = this;
        synchronized (incrementalExecutor) {
            if (this.isGroupBy) {
                try {
                    String groupedByKey = this.groupByKeyGenerator.constructEventKey(streamEvent);
                    GroupByAggregationAttributeExecutor.getKeyThreadLocal().set(groupedByKey);
                    if (this.baseIncrementalValueGroupByStoreList != null) {
                        Map baseIncrementalValueGroupByStore = this.baseIncrementalValueGroupByStoreList.get(this.currentBufferIndex);
                        BaseIncrementalValueStore aBaseIncrementalValueStore = baseIncrementalValueGroupByStore.computeIfAbsent(groupedByKey, k -> this.baseIncrementalValueStore.cloneStore((String)k, this.startTimeOfAggregates));
                        this.process(streamEvent, aBaseIncrementalValueStore);
                    }
                    BaseIncrementalValueStore aBaseIncrementalValueStore = this.baseIncrementalValueStoreMap.computeIfAbsent(groupedByKey, k -> this.baseIncrementalValueStore.cloneStore((String)k, this.startTimeOfAggregates));
                    this.process(streamEvent, aBaseIncrementalValueStore);
                }
                finally {
                    GroupByAggregationAttributeExecutor.getKeyThreadLocal().remove();
                }
            } else if (this.baseIncrementalValueStoreList != null) {
                BaseIncrementalValueStore aBaseIncrementalValueStore = this.baseIncrementalValueStoreList.get(this.currentBufferIndex);
                this.process(streamEvent, aBaseIncrementalValueStore);
            } else {
                this.process(streamEvent, this.baseIncrementalValueStore);
            }
        }
    }

    private void process(StreamEvent streamEvent, BaseIncrementalValueStore baseIncrementalValueStore) {
        List<ExpressionExecutor> expressionExecutors = baseIncrementalValueStore.getExpressionExecutors();
        for (int i = 0; i < expressionExecutors.size(); ++i) {
            ExpressionExecutor expressionExecutor = expressionExecutors.get(i);
            baseIncrementalValueStore.setValue(expressionExecutor.execute(streamEvent), i + 1);
        }
        baseIncrementalValueStore.setProcessed(true);
    }

    private void dispatchAggregateEvents(long startTimeOfNewAggregates) {
        if (this.isGroupBy) {
            if (this.baseIncrementalValueGroupByStoreList != null) {
                int dispatchIndex = this.currentBufferIndex + 1;
                if (dispatchIndex > this.bufferSize) {
                    dispatchIndex -= this.bufferSize + 1;
                }
                Map baseIncrementalValueGroupByStore = this.baseIncrementalValueGroupByStoreList.get(dispatchIndex);
                this.dispatchEvents(baseIncrementalValueGroupByStore);
                this.currentBufferIndex = dispatchIndex;
            } else {
                this.dispatchEvents(this.baseIncrementalValueStoreMap);
            }
        } else if (this.baseIncrementalValueStoreList != null) {
            int dispatchIndex = this.currentBufferIndex + 1;
            if (dispatchIndex > this.bufferSize) {
                dispatchIndex -= this.bufferSize + 1;
            }
            BaseIncrementalValueStore aBaseIncrementalValueStore = this.baseIncrementalValueStoreList.get(dispatchIndex);
            this.dispatchEvent(startTimeOfNewAggregates, aBaseIncrementalValueStore);
            this.currentBufferIndex = dispatchIndex;
        } else {
            this.dispatchEvent(startTimeOfNewAggregates, this.baseIncrementalValueStore);
        }
    }

    private void dispatchEvent(long startTimeOfNewAggregates, BaseIncrementalValueStore aBaseIncrementalValueStore) {
        if (aBaseIncrementalValueStore.isProcessed()) {
            StreamEvent streamEvent = aBaseIncrementalValueStore.createStreamEvent();
            ComplexEventChunk<StreamEvent> eventChunk = new ComplexEventChunk<StreamEvent>(true);
            eventChunk.add(streamEvent);
            LOG.debug((Object)("Event dispatched by " + this.duration + " incremental executor: " + eventChunk.toString()));
            this.table.addEvents(eventChunk);
            this.next.execute(eventChunk);
        }
        this.cleanBaseIncrementalValueStore(startTimeOfNewAggregates, aBaseIncrementalValueStore);
    }

    private void dispatchEvents(Map<String, BaseIncrementalValueStore> baseIncrementalValueGroupByStore) {
        if (baseIncrementalValueGroupByStore.size() > 0) {
            ComplexEventChunk<StreamEvent> eventChunk = new ComplexEventChunk<StreamEvent>(true);
            for (BaseIncrementalValueStore aBaseIncrementalValueStore : baseIncrementalValueGroupByStore.values()) {
                StreamEvent streamEvent = aBaseIncrementalValueStore.createStreamEvent();
                eventChunk.add(streamEvent);
            }
            LOG.debug((Object)("Event dispatched by " + this.duration + " incremental executor: " + eventChunk.toString()));
            this.table.addEvents(eventChunk);
            this.next.execute(eventChunk);
        }
        baseIncrementalValueGroupByStore.clear();
    }

    private void cleanBaseIncrementalValueStore(long startTimeOfNewAggregates, BaseIncrementalValueStore baseIncrementalValueStore) {
        baseIncrementalValueStore.clearValues();
        baseIncrementalValueStore.setTimestamp(startTimeOfNewAggregates);
        baseIncrementalValueStore.setProcessed(false);
        for (ExpressionExecutor expressionExecutor : baseIncrementalValueStore.getExpressionExecutors()) {
            expressionExecutor.execute(this.resetEvent);
        }
    }

    ArrayList<HashMap<String, BaseIncrementalValueStore>> getBaseIncrementalValueGroupByStoreList() {
        return this.baseIncrementalValueGroupByStoreList;
    }

    Map<String, BaseIncrementalValueStore> getBaseIncrementalValueStoreMap() {
        return this.baseIncrementalValueStoreMap;
    }

    ArrayList<BaseIncrementalValueStore> getBaseIncrementalValueStoreList() {
        return this.baseIncrementalValueStoreList;
    }

    BaseIncrementalValueStore getBaseIncrementalValueStore() {
        return this.baseIncrementalValueStore;
    }

    public BaseIncrementalValueStore getOldestEvent() {
        if (this.isGroupBy) {
            if (this.baseIncrementalValueGroupByStoreList != null) {
                Map baseIncrementalValueGroupByStore;
                int oldestEventIndex = this.currentBufferIndex + 1;
                if (oldestEventIndex > this.bufferSize) {
                    oldestEventIndex -= this.bufferSize + 1;
                }
                return (baseIncrementalValueGroupByStore = (Map)this.baseIncrementalValueGroupByStoreList.get(oldestEventIndex)).size() != 0 ? (BaseIncrementalValueStore)baseIncrementalValueGroupByStore.values().toArray()[0] : null;
            }
            return this.baseIncrementalValueStoreMap.size() != 0 ? (BaseIncrementalValueStore)this.baseIncrementalValueStoreMap.values().toArray()[0] : null;
        }
        if (this.baseIncrementalValueStoreList != null) {
            BaseIncrementalValueStore aBaseIncrementalValueStore;
            int oldestEventIndex = this.currentBufferIndex + 1;
            if (oldestEventIndex > this.bufferSize) {
                oldestEventIndex -= this.bufferSize + 1;
            }
            return (aBaseIncrementalValueStore = this.baseIncrementalValueStoreList.get(oldestEventIndex)).isProcessed() ? aBaseIncrementalValueStore : null;
        }
        return this.baseIncrementalValueStore.isProcessed() ? this.baseIncrementalValueStore : null;
    }

    public BaseIncrementalValueStore getNewestEvent() {
        if (this.isGroupBy) {
            if (this.baseIncrementalValueGroupByStoreList != null) {
                Map baseIncrementalValueGroupByStore = this.baseIncrementalValueGroupByStoreList.get(this.currentBufferIndex);
                return baseIncrementalValueGroupByStore.size() != 0 ? (BaseIncrementalValueStore)baseIncrementalValueGroupByStore.values().toArray()[0] : null;
            }
            return this.baseIncrementalValueStoreMap.size() != 0 ? (BaseIncrementalValueStore)this.baseIncrementalValueStoreMap.values().toArray()[0] : null;
        }
        if (this.baseIncrementalValueStoreList != null) {
            BaseIncrementalValueStore aBaseIncrementalValueStore = this.baseIncrementalValueStoreList.get(this.currentBufferIndex);
            return aBaseIncrementalValueStore.isProcessed() ? aBaseIncrementalValueStore : null;
        }
        return this.baseIncrementalValueStore.isProcessed() ? this.baseIncrementalValueStore : null;
    }
}

