/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.siddhi.core.stream;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.lang.reflect.Constructor;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.event.EventFactory;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.stream.input.InputProcessor;
import org.wso2.siddhi.core.stream.output.StreamCallback;
import org.wso2.siddhi.core.util.statistics.ThroughputTracker;
import org.wso2.siddhi.core.util.timestamp.EventTimeBasedMillisTimestampGenerator;
import org.wso2.siddhi.query.api.annotation.Annotation;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import org.wso2.siddhi.query.api.exception.DuplicateAnnotationException;
import org.wso2.siddhi.query.api.util.AnnotationHelper;

public class StreamJunction {
    private static final Log log = LogFactory.getLog(StreamJunction.class);
    private final ExecutionPlanContext executionPlanContext;
    private final StreamDefinition streamDefinition;
    private int bufferSize;
    private List<Receiver> receivers = new CopyOnWriteArrayList<Receiver>();
    private List<Publisher> publishers = new CopyOnWriteArrayList<Publisher>();
    private ExecutorService executorService;
    private Boolean async = null;
    private Disruptor<Event> disruptor;
    private RingBuffer<Event> ringBuffer;
    private ThroughputTracker throughputTracker = null;
    private boolean isTraceEnabled;

    public StreamJunction(StreamDefinition streamDefinition, ExecutorService executorService, int bufferSize, ExecutionPlanContext executionPlanContext) {
        this.streamDefinition = streamDefinition;
        this.bufferSize = bufferSize;
        this.executorService = executorService;
        this.executionPlanContext = executionPlanContext;
        if (executionPlanContext.isStatsEnabled() && executionPlanContext.getStatisticsManager() != null) {
            String metricName = executionPlanContext.getSiddhiContext().getStatisticsConfiguration().getMatricPrefix() + "." + "ExecutionPlans" + "." + executionPlanContext.getName() + "." + "Siddhi" + "." + "Streams" + "." + streamDefinition.getId();
            this.throughputTracker = executionPlanContext.getSiddhiContext().getStatisticsConfiguration().getFactory().createThroughputTracker(metricName, executionPlanContext.getStatisticsManager());
        }
        try {
            Annotation annotation = AnnotationHelper.getAnnotation((String)"Async", (List)streamDefinition.getAnnotations());
            this.async = executionPlanContext.isAsync();
            if (annotation != null) {
                this.async = true;
                String bufferSizeString = annotation.getElement("BufferSize");
                if (bufferSizeString != null) {
                    this.bufferSize = Integer.parseInt(bufferSizeString);
                }
            }
        }
        catch (DuplicateAnnotationException e) {
            throw new DuplicateAnnotationException(e.getMessage() + " for the same Stream " + streamDefinition.getId());
        }
        this.isTraceEnabled = log.isTraceEnabled();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendEvent(ComplexEvent complexEvent) {
        ComplexEvent complexEventList;
        if (this.isTraceEnabled) {
            log.trace((Object)("Event is received by streamJunction " + this));
        }
        if (this.disruptor != null) {
            for (complexEventList = complexEvent; complexEventList != null; complexEventList = complexEventList.getNext()) {
                if (this.throughputTracker != null) {
                    this.throughputTracker.eventIn();
                }
                long sequenceNo = this.ringBuffer.next();
                try {
                    Event existingEvent = (Event)this.ringBuffer.get(sequenceNo);
                    existingEvent.copyFrom(complexEventList);
                    continue;
                }
                finally {
                    this.ringBuffer.publish(sequenceNo);
                }
            }
        } else {
            if (this.throughputTracker != null) {
                int messageCount = 0;
                while (complexEventList != null) {
                    ++messageCount;
                    complexEventList = complexEventList.getNext();
                }
                this.throughputTracker.eventsIn(messageCount);
            }
            for (Receiver receiver : this.receivers) {
                receiver.receive(complexEvent);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendEvent(Event event) {
        if (this.throughputTracker != null) {
            this.throughputTracker.eventIn();
        }
        if (this.isTraceEnabled) {
            log.trace((Object)(event + " event is received by streamJunction " + this));
        }
        if (this.disruptor != null) {
            long sequenceNo = this.ringBuffer.next();
            try {
                Event existingEvent = (Event)this.ringBuffer.get(sequenceNo);
                existingEvent.copyFrom(event);
            }
            finally {
                this.ringBuffer.publish(sequenceNo);
            }
        } else {
            for (Receiver receiver : this.receivers) {
                receiver.receive(event);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendEvent(Event[] events) {
        if (this.throughputTracker != null) {
            this.throughputTracker.eventsIn(events.length);
        }
        if (this.isTraceEnabled) {
            log.trace((Object)("Event is received by streamJunction " + this));
        }
        if (this.disruptor != null) {
            for (Event event : events) {
                long sequenceNo = this.ringBuffer.next();
                try {
                    Event existingEvent = (Event)this.ringBuffer.get(sequenceNo);
                    existingEvent.copyFrom(event);
                }
                finally {
                    this.ringBuffer.publish(sequenceNo);
                }
            }
        } else {
            for (Receiver receiver : this.receivers) {
                receiver.receive(events);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendEvent(List<Event> events) {
        if (this.isTraceEnabled) {
            log.trace((Object)("Event is received by streamJunction " + this));
        }
        if (this.disruptor != null) {
            for (Event event : events) {
                long sequenceNo = this.ringBuffer.next();
                try {
                    Event existingEvent = (Event)this.ringBuffer.get(sequenceNo);
                    existingEvent.copyFrom(event);
                }
                finally {
                    this.ringBuffer.publish(sequenceNo);
                }
            }
        } else {
            for (Receiver receiver : this.receivers) {
                receiver.receive(events.toArray(new Event[events.size()]));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendData(long timeStamp, Object[] data) {
        if (this.executionPlanContext.isPlayback()) {
            ((EventTimeBasedMillisTimestampGenerator)this.executionPlanContext.getTimestampGenerator()).setCurrentTimestamp(timeStamp);
        }
        if (this.throughputTracker != null) {
            this.throughputTracker.eventIn();
        }
        if (this.disruptor != null) {
            long sequenceNo = this.ringBuffer.next();
            try {
                Event existingEvent = (Event)this.ringBuffer.get(sequenceNo);
                existingEvent.setTimestamp(timeStamp);
                existingEvent.setIsExpired(false);
                System.arraycopy(data, 0, existingEvent.getData(), 0, data.length);
            }
            finally {
                this.ringBuffer.publish(sequenceNo);
            }
        } else {
            for (Receiver receiver : this.receivers) {
                receiver.receive(timeStamp, data);
            }
        }
    }

    public synchronized void startProcessing() {
        if (!this.receivers.isEmpty() && this.async.booleanValue()) {
            for (Constructor<?> constructor : Disruptor.class.getConstructors()) {
                if (constructor.getParameterTypes().length != 5) continue;
                ProducerType producerType = ProducerType.MULTI;
                this.disruptor = new Disruptor((com.lmax.disruptor.EventFactory)new EventFactory(this.streamDefinition.getAttributeList().size()), this.bufferSize, (Executor)this.executorService, producerType, (WaitStrategy)new BlockingWaitStrategy());
                this.disruptor.handleExceptionsWith(this.executionPlanContext.getDisruptorExceptionHandler());
                break;
            }
            if (this.disruptor == null) {
                this.disruptor = new Disruptor((com.lmax.disruptor.EventFactory)new EventFactory(this.streamDefinition.getAttributeList().size()), this.bufferSize, (Executor)this.executorService);
                this.disruptor.handleExceptionsWith(this.executionPlanContext.getDisruptorExceptionHandler());
            }
            for (Receiver receiver : this.receivers) {
                this.disruptor.handleEventsWith(new EventHandler[]{new StreamHandler(receiver)});
            }
            this.ringBuffer = this.disruptor.start();
        } else {
            for (Receiver receiver : this.receivers) {
                if (!(receiver instanceof StreamCallback)) continue;
                ((StreamCallback)receiver).startProcessing();
            }
        }
    }

    public synchronized void stopProcessing() {
        if (this.disruptor != null) {
            this.disruptor.shutdown();
        } else {
            for (Receiver receiver : this.receivers) {
                if (!(receiver instanceof StreamCallback)) continue;
                ((StreamCallback)receiver).stopProcessing();
            }
        }
    }

    public synchronized Publisher constructPublisher() {
        Publisher publisher = new Publisher();
        publisher.setStreamJunction(this);
        this.publishers.add(publisher);
        return publisher;
    }

    public synchronized void subscribe(Receiver receiver) {
        if (!this.receivers.contains(receiver)) {
            this.receivers.add(receiver);
        }
    }

    public String getStreamId() {
        return this.streamDefinition.getId();
    }

    public StreamDefinition getStreamDefinition() {
        return this.streamDefinition;
    }

    public class Publisher
    implements InputProcessor {
        private StreamJunction streamJunction;

        public void setStreamJunction(StreamJunction streamJunction) {
            this.streamJunction = streamJunction;
        }

        public void send(ComplexEvent complexEvent) {
            this.streamJunction.sendEvent(complexEvent);
        }

        @Override
        public void send(Event event, int streamIndex) {
            this.streamJunction.sendEvent(event);
        }

        @Override
        public void send(Event[] events, int streamIndex) {
            this.streamJunction.sendEvent(events);
        }

        @Override
        public void send(List<Event> events, int streamIndex) {
            this.streamJunction.sendEvent(events);
        }

        @Override
        public void send(long timeStamp, Object[] data, int streamIndex) {
            this.streamJunction.sendData(timeStamp, data);
        }

        public String getStreamId() {
            return this.streamJunction.getStreamId();
        }
    }

    public class StreamHandler
    implements EventHandler<Event> {
        private Receiver receiver;
        private ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk(false);

        public StreamHandler(Receiver receiver) {
            this.receiver = receiver;
        }

        public void onEvent(Event event, long sequence, boolean endOfBatch) {
            this.receiver.receive(event, endOfBatch);
        }
    }

    public static interface Receiver {
        public String getStreamId();

        public void receive(ComplexEvent var1);

        public void receive(Event var1);

        public void receive(Event var1, boolean var2);

        public void receive(long var1, Object[] var3);

        public void receive(Event[] var1);
    }
}

