/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.siddhi.core.window;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.state.StateEvent;
import org.wso2.siddhi.core.event.stream.MetaStreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.StreamEventPool;
import org.wso2.siddhi.core.event.stream.converter.ZeroStreamEventConverter;
import org.wso2.siddhi.core.exception.OperationNotSupportedException;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.input.stream.single.EntryValveProcessor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.SchedulingProcessor;
import org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor;
import org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor;
import org.wso2.siddhi.core.stream.StreamJunction;
import org.wso2.siddhi.core.table.EventTable;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.core.util.collection.operator.Finder;
import org.wso2.siddhi.core.util.collection.operator.MatchingMetaStateHolder;
import org.wso2.siddhi.core.util.lock.LockWrapper;
import org.wso2.siddhi.core.util.parser.SchedulerParser;
import org.wso2.siddhi.core.util.parser.SingleInputStreamParser;
import org.wso2.siddhi.core.util.snapshot.Snapshotable;
import org.wso2.siddhi.core.util.statistics.LatencyTracker;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.WindowDefinition;
import org.wso2.siddhi.query.api.execution.query.input.handler.StreamHandler;
import org.wso2.siddhi.query.api.execution.query.output.stream.OutputStream;
import org.wso2.siddhi.query.api.expression.Expression;

public class EventWindow
implements FindableProcessor,
Snapshotable {
    private final String elementId;
    private final WindowDefinition windowDefinition;
    private final ExecutionPlanContext executionPlanContext;
    private StreamJunction.Publisher outputPublisher;
    private Processor windowProcessor;
    private WindowProcessor internalWindowProcessor;
    private final LockWrapper lockWrapper;
    private StreamEventPool streamEventPool;
    private final ZeroStreamEventConverter eventConverter = new ZeroStreamEventConverter();

    public EventWindow(WindowDefinition windowDefinition, ExecutionPlanContext executionPlanContext) {
        this.windowDefinition = windowDefinition;
        this.executionPlanContext = executionPlanContext;
        this.elementId = executionPlanContext.getElementIdGenerator().createNewId();
        this.lockWrapper = new LockWrapper(windowDefinition.getId());
        this.lockWrapper.setLock(new ReentrantLock());
    }

    public void init(Map<String, EventTable> eventTableMap, Map<String, EventWindow> eventWindowMap, LatencyTracker latencyTracker, String queryName) {
        if (this.windowProcessor != null) {
            return;
        }
        MetaStreamEvent metaStreamEvent = new MetaStreamEvent();
        metaStreamEvent.addInputDefinition((AbstractDefinition)this.windowDefinition);
        metaStreamEvent.setWindowEvent(true);
        metaStreamEvent.initializeAfterWindowData();
        for (Attribute attribute : this.windowDefinition.getAttributeList()) {
            metaStreamEvent.addOutputData(attribute);
        }
        this.streamEventPool = new StreamEventPool(metaStreamEvent, 5);
        StreamEventCloner streamEventCloner = new StreamEventCloner(metaStreamEvent, this.streamEventPool);
        OutputStream.OutputEventType outputEventType = this.windowDefinition.getOutputEventType();
        boolean outputExpectsExpiredEvents = outputEventType != OutputStream.OutputEventType.CURRENT_EVENTS;
        WindowProcessor internalWindowProcessor = (WindowProcessor)SingleInputStreamParser.generateProcessor((StreamHandler)this.windowDefinition.getWindow(), metaStreamEvent, new ArrayList<VariableExpressionExecutor>(), this.executionPlanContext, eventTableMap, false, outputExpectsExpiredEvents, queryName);
        internalWindowProcessor.setStreamEventCloner(streamEventCloner);
        internalWindowProcessor.constructStreamEventPopulater(metaStreamEvent, 0);
        EntryValveProcessor entryValveProcessor = null;
        if (internalWindowProcessor instanceof SchedulingProcessor) {
            entryValveProcessor = new EntryValveProcessor(this.executionPlanContext);
            Scheduler scheduler = SchedulerParser.parse(this.executionPlanContext.getScheduledExecutorService(), entryValveProcessor, this.executionPlanContext);
            scheduler.init(this.lockWrapper, queryName);
            scheduler.setStreamEventPool(this.streamEventPool);
            ((SchedulingProcessor)((Object)internalWindowProcessor)).setScheduler(scheduler);
        }
        if (entryValveProcessor != null) {
            entryValveProcessor.setToLast(internalWindowProcessor);
            this.windowProcessor = entryValveProcessor;
        } else {
            this.windowProcessor = internalWindowProcessor;
        }
        this.windowProcessor.setToLast(new StreamPublishProcessor(outputEventType));
        this.internalWindowProcessor = internalWindowProcessor;
    }

    public void setPublisher(StreamJunction.Publisher publisher) {
        this.outputPublisher = publisher;
    }

    public WindowDefinition getWindowDefinition() {
        return this.windowDefinition;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void add(ComplexEventChunk complexEventChunk) {
        try {
            this.lockWrapper.lock();
            complexEventChunk.reset();
            Object complexEvents = complexEventChunk.getFirst();
            StreamEvent firstEvent = this.streamEventPool.borrowEvent();
            this.eventConverter.convertComplexEvent((ComplexEvent)complexEvents, firstEvent);
            StreamEvent currentEvent = firstEvent;
            for (complexEvents = complexEvents.getNext(); complexEvents != null; complexEvents = complexEvents.getNext()) {
                StreamEvent nextEvent = this.streamEventPool.borrowEvent();
                this.eventConverter.convertComplexEvent((ComplexEvent)complexEvents, nextEvent);
                currentEvent.setNext(nextEvent);
                currentEvent = nextEvent;
            }
            this.windowProcessor.process(new ComplexEventChunk<StreamEvent>(firstEvent, currentEvent, complexEventChunk.isBatch()));
        }
        finally {
            this.lockWrapper.unlock();
        }
    }

    @Override
    public StreamEvent find(StateEvent matchingEvent, Finder finder) {
        return ((FindableProcessor)((Object)this.internalWindowProcessor)).find(matchingEvent, finder);
    }

    @Override
    public Finder constructFinder(Expression expression, MatchingMetaStateHolder matchingMetaStateHolder, ExecutionPlanContext executionPlanContext, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, EventTable> eventTableMap) {
        if (this.internalWindowProcessor instanceof FindableProcessor) {
            return ((FindableProcessor)((Object)this.internalWindowProcessor)).constructFinder(expression, matchingMetaStateHolder, executionPlanContext, variableExpressionExecutors, eventTableMap);
        }
        throw new OperationNotSupportedException("Cannot construct finder for the window " + this.windowDefinition.getWindow());
    }

    public LockWrapper getLock() {
        return this.lockWrapper;
    }

    @Override
    public Object[] currentState() {
        return new Object[]{this.internalWindowProcessor.currentState()};
    }

    @Override
    public void restoreState(Object[] state) {
        this.internalWindowProcessor.restoreState((Object[])state[0]);
    }

    @Override
    public String getElementId() {
        return this.elementId;
    }

    private class StreamPublishProcessor
    implements Processor {
        private final boolean allowCurrentEvents;
        private final boolean allowExpiredEvents;
        private final OutputStream.OutputEventType outputEventType;

        public StreamPublishProcessor(OutputStream.OutputEventType outputEventType) {
            this.outputEventType = outputEventType;
            this.allowCurrentEvents = outputEventType == OutputStream.OutputEventType.CURRENT_EVENTS || outputEventType == OutputStream.OutputEventType.ALL_EVENTS;
            this.allowExpiredEvents = outputEventType == OutputStream.OutputEventType.EXPIRED_EVENTS || outputEventType == OutputStream.OutputEventType.ALL_EVENTS;
        }

        @Override
        public void process(ComplexEventChunk complexEventChunk) {
            complexEventChunk.reset();
            while (complexEventChunk.hasNext()) {
                Object event = complexEventChunk.next();
                if (event.getType() == ComplexEvent.Type.CURRENT && this.allowCurrentEvents || event.getType() == ComplexEvent.Type.EXPIRED && this.allowExpiredEvents) continue;
                complexEventChunk.remove();
            }
            complexEventChunk.reset();
            if (complexEventChunk.hasNext()) {
                EventWindow.this.outputPublisher.send((ComplexEvent)complexEventChunk.getFirst());
            }
        }

        @Override
        public Processor getNextProcessor() {
            return null;
        }

        @Override
        public void setNextProcessor(Processor processor) {
        }

        @Override
        public void setToLast(Processor processor) {
        }

        @Override
        public Processor cloneProcessor(String key) {
            return new StreamPublishProcessor(this.outputEventType);
        }
    }
}

