/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.siddhi.core.aggregation;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.aggregation.BaseIncrementalValueStore;
import org.wso2.siddhi.core.aggregation.Executor;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.MetaStreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventPool;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.selector.GroupByKeyGenerator;
import org.wso2.siddhi.core.query.selector.attribute.processor.executor.GroupByAggregationAttributeExecutor;
import org.wso2.siddhi.core.table.Table;
import org.wso2.siddhi.core.util.IncrementalTimeConverterUtil;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.core.util.snapshot.Snapshotable;
import org.wso2.siddhi.query.api.aggregation.TimePeriod;

public class IncrementalExecutor
implements Executor,
Snapshotable {
    private static final Logger LOG = Logger.getLogger(IncrementalExecutor.class);
    private final StreamEvent resetEvent;
    private final ExpressionExecutor timestampExpressionExecutor;
    private TimePeriod.Duration duration;
    private Table table;
    private GroupByKeyGenerator groupByKeyGenerator;
    private StreamEventPool streamEventPool;
    private long nextEmitTime = -1L;
    private long startTimeOfAggregates = -1L;
    private boolean timerStarted = false;
    private boolean isGroupBy;
    private Executor next;
    private Scheduler scheduler;
    private boolean isRoot;
    private String elementId;
    private boolean isProcessingExecutor;
    private BaseIncrementalValueStore baseIncrementalValueStore = null;
    private Map<String, BaseIncrementalValueStore> baseIncrementalValueStoreGroupByMap = null;

    public IncrementalExecutor(TimePeriod.Duration duration, List<ExpressionExecutor> processExpressionExecutors, GroupByKeyGenerator groupByKeyGenerator, MetaStreamEvent metaStreamEvent, IncrementalExecutor child, boolean isRoot, Table table, SiddhiAppContext siddhiAppContext, String aggregatorName, ExpressionExecutor shouldUpdateExpressionExecutor) {
        this.duration = duration;
        this.next = child;
        this.isRoot = isRoot;
        this.table = table;
        this.streamEventPool = new StreamEventPool(metaStreamEvent, 10);
        this.timestampExpressionExecutor = processExpressionExecutors.remove(0);
        this.baseIncrementalValueStore = new BaseIncrementalValueStore(-1L, processExpressionExecutors, this.streamEventPool, siddhiAppContext, aggregatorName, shouldUpdateExpressionExecutor);
        this.isProcessingExecutor = false;
        if (groupByKeyGenerator != null) {
            this.isGroupBy = true;
            this.groupByKeyGenerator = groupByKeyGenerator;
            this.baseIncrementalValueStoreGroupByMap = new HashMap<String, BaseIncrementalValueStore>();
        } else {
            this.isGroupBy = false;
        }
        this.resetEvent = this.streamEventPool.borrowEvent();
        this.resetEvent.setType(ComplexEvent.Type.RESET);
        this.setNextExecutor(child);
        if (this.elementId == null) {
            this.elementId = "IncrementalExecutor-" + siddhiAppContext.getElementIdGenerator().createNewId();
        }
        siddhiAppContext.getSnapshotService().addSnapshotable(aggregatorName, this);
    }

    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override
    public void execute(ComplexEventChunk streamEventChunk) {
        LOG.debug((Object)("Event Chunk received by " + this.duration + " incremental executor: " + streamEventChunk.toString()));
        streamEventChunk.reset();
        while (streamEventChunk.hasNext()) {
            StreamEvent streamEvent = (StreamEvent)streamEventChunk.next();
            streamEventChunk.remove();
            long timestamp = this.getTimestamp(streamEvent);
            this.startTimeOfAggregates = IncrementalTimeConverterUtil.getStartTimeOfAggregates(timestamp, this.duration);
            if (timestamp >= this.nextEmitTime) {
                this.nextEmitTime = IncrementalTimeConverterUtil.getNextEmitTime(timestamp, this.duration, null);
                this.dispatchAggregateEvents(this.startTimeOfAggregates);
                this.sendTimerEvent();
            }
            if (streamEvent.getType() != ComplexEvent.Type.CURRENT) continue;
            this.processAggregates(streamEvent);
        }
    }

    private void sendTimerEvent() {
        if (this.getNextExecutor() != null) {
            StreamEvent timerEvent = this.streamEventPool.borrowEvent();
            timerEvent.setType(ComplexEvent.Type.TIMER);
            timerEvent.setTimestamp(IncrementalTimeConverterUtil.getPreviousStartTime(this.startTimeOfAggregates, this.duration));
            ComplexEventChunk<StreamEvent> timerStreamEventChunk = new ComplexEventChunk<StreamEvent>(true);
            timerStreamEventChunk.add(timerEvent);
            this.next.execute(timerStreamEventChunk);
        }
    }

    private long getTimestamp(StreamEvent streamEvent) {
        long timestamp;
        if (streamEvent.getType() == ComplexEvent.Type.CURRENT) {
            timestamp = (Long)this.timestampExpressionExecutor.execute(streamEvent);
            if (this.isRoot && !this.timerStarted) {
                this.scheduler.notifyAt(IncrementalTimeConverterUtil.getNextEmitTime(timestamp, this.duration, null));
                this.timerStarted = true;
            }
        } else {
            timestamp = streamEvent.getTimestamp();
            if (this.isRoot) {
                this.scheduler.notifyAt(IncrementalTimeConverterUtil.getNextEmitTime(timestamp, this.duration, null));
            }
        }
        return timestamp;
    }

    @Override
    public Executor getNextExecutor() {
        return this.next;
    }

    @Override
    public void setNextExecutor(Executor nextExecutor) {
        this.next = nextExecutor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processAggregates(StreamEvent streamEvent) {
        IncrementalExecutor incrementalExecutor = this;
        synchronized (incrementalExecutor) {
            if (this.isGroupBy) {
                try {
                    String groupedByKey = this.groupByKeyGenerator.constructEventKey(streamEvent);
                    GroupByAggregationAttributeExecutor.getKeyThreadLocal().set(groupedByKey);
                    BaseIncrementalValueStore aBaseIncrementalValueStore = this.baseIncrementalValueStoreGroupByMap.computeIfAbsent(groupedByKey, k -> this.baseIncrementalValueStore.cloneStore((String)k, this.startTimeOfAggregates));
                    this.process(streamEvent, aBaseIncrementalValueStore);
                }
                finally {
                    GroupByAggregationAttributeExecutor.getKeyThreadLocal().remove();
                }
            } else {
                this.process(streamEvent, this.baseIncrementalValueStore);
            }
        }
    }

    private void process(StreamEvent streamEvent, BaseIncrementalValueStore baseIncrementalValueStore) {
        List<ExpressionExecutor> expressionExecutors = baseIncrementalValueStore.getExpressionExecutors();
        boolean shouldUpdate = true;
        ExpressionExecutor shouldUpdateExpressionExecutor = baseIncrementalValueStore.getShouldUpdateExpressionExecutor();
        if (shouldUpdateExpressionExecutor != null) {
            shouldUpdate = (Boolean)shouldUpdateExpressionExecutor.execute(streamEvent);
        }
        for (int i = 0; i < expressionExecutors.size(); ++i) {
            ExpressionExecutor expressionExecutor;
            if (shouldUpdate) {
                expressionExecutor = expressionExecutors.get(i);
                baseIncrementalValueStore.setValue(expressionExecutor.execute(streamEvent), i + 1);
                continue;
            }
            expressionExecutor = expressionExecutors.get(i);
            if (expressionExecutor instanceof VariableExpressionExecutor) continue;
            baseIncrementalValueStore.setValue(expressionExecutor.execute(streamEvent), i + 1);
        }
        baseIncrementalValueStore.setProcessed(true);
    }

    private void dispatchAggregateEvents(long startTimeOfNewAggregates) {
        if (this.isGroupBy) {
            this.dispatchEvents(this.baseIncrementalValueStoreGroupByMap);
        } else {
            this.dispatchEvent(startTimeOfNewAggregates, this.baseIncrementalValueStore);
        }
    }

    private void dispatchEvent(long startTimeOfNewAggregates, BaseIncrementalValueStore aBaseIncrementalValueStore) {
        if (aBaseIncrementalValueStore.isProcessed()) {
            StreamEvent streamEvent = aBaseIncrementalValueStore.createStreamEvent();
            ComplexEventChunk<StreamEvent> eventChunk = new ComplexEventChunk<StreamEvent>(true);
            eventChunk.add(streamEvent);
            LOG.debug((Object)("Event dispatched by " + this.duration + " incremental executor: " + eventChunk.toString()));
            if (this.isProcessingExecutor) {
                this.table.addEvents(eventChunk, 1);
            }
            if (this.getNextExecutor() != null) {
                this.next.execute(eventChunk);
            }
        }
        this.cleanBaseIncrementalValueStore(startTimeOfNewAggregates, aBaseIncrementalValueStore);
    }

    private void dispatchEvents(Map<String, BaseIncrementalValueStore> baseIncrementalValueGroupByStore) {
        int noOfEvents = baseIncrementalValueGroupByStore.size();
        if (noOfEvents > 0) {
            ComplexEventChunk<StreamEvent> eventChunk = new ComplexEventChunk<StreamEvent>(true);
            for (BaseIncrementalValueStore aBaseIncrementalValueStore : baseIncrementalValueGroupByStore.values()) {
                StreamEvent streamEvent = aBaseIncrementalValueStore.createStreamEvent();
                eventChunk.add(streamEvent);
            }
            LOG.debug((Object)("Event dispatched by " + this.duration + " incremental executor: " + eventChunk.toString()));
            if (this.isProcessingExecutor) {
                this.table.addEvents(eventChunk, noOfEvents);
            }
            if (this.getNextExecutor() != null) {
                this.next.execute(eventChunk);
            }
        }
        baseIncrementalValueGroupByStore.clear();
    }

    private void cleanBaseIncrementalValueStore(long startTimeOfNewAggregates, BaseIncrementalValueStore baseIncrementalValueStore) {
        baseIncrementalValueStore.clearValues();
        baseIncrementalValueStore.setTimestamp(startTimeOfNewAggregates);
        baseIncrementalValueStore.setProcessed(false);
        for (ExpressionExecutor expressionExecutor : baseIncrementalValueStore.getExpressionExecutors()) {
            expressionExecutor.execute(this.resetEvent);
        }
    }

    Map<String, BaseIncrementalValueStore> getBaseIncrementalValueStoreGroupByMap() {
        return this.baseIncrementalValueStoreGroupByMap;
    }

    BaseIncrementalValueStore getBaseIncrementalValueStore() {
        return this.baseIncrementalValueStore;
    }

    public long getAggregationStartTimestamp() {
        return this.startTimeOfAggregates;
    }

    public long getNextEmitTime() {
        return this.nextEmitTime;
    }

    public void setValuesForInMemoryRecreateFromTable(long emitTimeOfLatestEventInTable) {
        this.nextEmitTime = emitTimeOfLatestEventInTable;
    }

    public boolean isProcessingExecutor() {
        return this.isProcessingExecutor;
    }

    public void setProcessingExecutor(boolean processingExecutor) {
        this.isProcessingExecutor = processingExecutor;
    }

    public void clearExecutor() {
        if (this.isGroupBy) {
            this.baseIncrementalValueStoreGroupByMap.clear();
        } else {
            this.cleanBaseIncrementalValueStore(-1L, this.baseIncrementalValueStore);
        }
    }

    @Override
    public Map<String, Object> currentState() {
        HashMap<String, Object> state = new HashMap<String, Object>();
        state.put("NextEmitTime", this.nextEmitTime);
        state.put("StartTimeOfAggregates", this.startTimeOfAggregates);
        state.put("TimerStarted", this.timerStarted);
        return state;
    }

    @Override
    public void restoreState(Map<String, Object> state) {
        this.nextEmitTime = (Long)state.get("NextEmitTime");
        this.startTimeOfAggregates = (Long)state.get("StartTimeOfAggregates");
        this.timerStarted = (Boolean)state.get("TimerStarted");
    }

    @Override
    public String getElementId() {
        return this.elementId;
    }
}

