/*
 * Decompiled with CFR 0.152.
 */
package org.fusesource.stomp.jms;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.apollo.filter.FilterException;
import org.apache.activemq.apollo.selector.SelectorParser;
import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.ByteArrayOutputStream;
import org.fusesource.stomp.client.Constants;
import org.fusesource.stomp.codec.StompFrame;
import org.fusesource.stomp.jms.StompChannel;
import org.fusesource.stomp.jms.StompJmsConnection;
import org.fusesource.stomp.jms.StompJmsDestination;
import org.fusesource.stomp.jms.StompJmsDurableTopicSubscriber;
import org.fusesource.stomp.jms.StompJmsMessageConsumer;
import org.fusesource.stomp.jms.StompJmsMessageListener;
import org.fusesource.stomp.jms.StompJmsMessageProducer;
import org.fusesource.stomp.jms.StompJmsPrefetch;
import org.fusesource.stomp.jms.StompJmsQueue;
import org.fusesource.stomp.jms.StompJmsQueueBrowser;
import org.fusesource.stomp.jms.StompJmsQueueReceiver;
import org.fusesource.stomp.jms.StompJmsQueueSender;
import org.fusesource.stomp.jms.StompJmsTopic;
import org.fusesource.stomp.jms.StompJmsTopicPublisher;
import org.fusesource.stomp.jms.StompJmsTopicSubscriber;
import org.fusesource.stomp.jms.message.StompJmsBytesMessage;
import org.fusesource.stomp.jms.message.StompJmsMapMessage;
import org.fusesource.stomp.jms.message.StompJmsMessage;
import org.fusesource.stomp.jms.message.StompJmsMessageTransformation;
import org.fusesource.stomp.jms.message.StompJmsObjectMessage;
import org.fusesource.stomp.jms.message.StompJmsStreamMessage;
import org.fusesource.stomp.jms.message.StompJmsTextMessage;

public class StompJmsSession
implements Session,
QueueSession,
TopicSession,
StompJmsMessageListener {
    static final int SERVER_AUTO_ACKNOWLEDGE = -1;
    long nextMessageSwquence = 0L;
    final StompJmsConnection connection;
    final int acknowledgementMode;
    final List<MessageProducer> producers = new CopyOnWriteArrayList<MessageProducer>();
    final Map<AsciiBuffer, StompJmsMessageConsumer> consumers = new ConcurrentHashMap<AsciiBuffer, StompJmsMessageConsumer>();
    MessageListener messageListener;
    AtomicBoolean closed = new AtomicBoolean();
    AtomicBoolean started = new AtomicBoolean();
    volatile AsciiBuffer currentTransactionId;
    boolean forceAsyncSend;
    long consumerMessageBufferSize = 65536L;
    LinkedBlockingQueue<StompJmsMessage> stoppedMessages = new LinkedBlockingQueue(10000);
    StompChannel channel;
    StompJmsPrefetch prefetch;
    static final OptionalSectorParser SELECTOR_PARSER;
    ExecutorService executor;

    protected StompJmsSession(StompJmsConnection connection, int acknowledgementMode, boolean forceAsyncSend) {
        this.connection = connection;
        this.acknowledgementMode = acknowledgementMode;
        this.forceAsyncSend = forceAsyncSend;
        this.prefetch = new StompJmsPrefetch(connection.prefetch);
    }

    public int getAcknowledgeMode() throws JMSException {
        this.checkClosed();
        return this.acknowledgementMode;
    }

    public boolean getTransacted() throws JMSException {
        this.checkClosed();
        return this.acknowledgementMode == 0;
    }

    public MessageListener getMessageListener() throws JMSException {
        this.checkClosed();
        return this.messageListener;
    }

    public void setMessageListener(MessageListener listener) throws JMSException {
        this.checkClosed();
        this.messageListener = listener;
    }

    public void recover() throws JMSException {
        this.checkClosed();
        if (this.getTransacted()) {
            throw new IllegalStateException("Cannot call recover() on a transacted session");
        }
    }

    public void commit() throws JMSException {
        this.checkClosed();
        if (!this.getTransacted()) {
            throw new IllegalStateException("Not a transacted session");
        }
        for (StompJmsMessageConsumer c : this.consumers.values()) {
            c.commit();
        }
        this.getChannel().commitTransaction(this.currentTransactionId);
        this.currentTransactionId = this.getChannel().startTransaction();
    }

    public void rollback() throws JMSException {
        this.checkClosed();
        if (!this.getTransacted()) {
            throw new IllegalStateException("Not a transacted session");
        }
        for (StompJmsMessageConsumer c : this.consumers.values()) {
            c.rollback();
        }
        this.getChannel().rollbackTransaction(this.currentTransactionId);
        this.currentTransactionId = this.getChannel().startTransaction();
        this.getExecutor().execute(new Runnable(){

            @Override
            public void run() {
                for (StompJmsMessageConsumer c : StompJmsSession.this.consumers.values()) {
                    c.drainMessageQueueToListener();
                }
            }
        });
    }

    public void run() {
    }

    public void close() throws JMSException {
        if (this.closed.compareAndSet(false, true)) {
            this.stop();
            for (StompJmsMessageConsumer c : new ArrayList<StompJmsMessageConsumer>(this.consumers.values())) {
                c.close();
            }
            this.connection.removeSession(this, this.channel);
            this.channel = null;
        }
    }

    public MessageConsumer createConsumer(Destination destination) throws JMSException {
        this.checkClosed();
        StompJmsSession.checkDestination(destination);
        StompJmsDestination dest = StompJmsMessageTransformation.transformDestination(this.connection, destination);
        StompJmsMessageConsumer result = new StompJmsMessageConsumer(this.getChannel().nextId(), this, dest, "");
        result.init();
        return result;
    }

    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
        this.checkClosed();
        StompJmsSession.checkDestination(destination);
        messageSelector = StompJmsSession.checkSelector(messageSelector);
        StompJmsDestination dest = StompJmsMessageTransformation.transformDestination(this.connection, destination);
        StompJmsMessageConsumer result = new StompJmsMessageConsumer(this.getChannel().nextId(), this, dest, messageSelector);
        result.init();
        return result;
    }

    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) throws JMSException {
        this.checkClosed();
        StompJmsSession.checkDestination(destination);
        messageSelector = StompJmsSession.checkSelector(messageSelector);
        StompJmsDestination dest = StompJmsMessageTransformation.transformDestination(this.connection, destination);
        StompJmsTopicSubscriber result = new StompJmsTopicSubscriber(this.getChannel().nextId(), this, dest, NoLocal, messageSelector);
        result.init();
        return result;
    }

    public QueueReceiver createReceiver(Queue queue) throws JMSException {
        this.checkClosed();
        StompJmsSession.checkDestination((Destination)queue);
        StompJmsDestination dest = StompJmsMessageTransformation.transformDestination(this.connection, (Destination)queue);
        StompJmsQueueReceiver result = new StompJmsQueueReceiver(this.getChannel().nextId(), this, dest, "");
        result.init();
        return result;
    }

    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
        this.checkClosed();
        StompJmsSession.checkDestination((Destination)queue);
        messageSelector = StompJmsSession.checkSelector(messageSelector);
        StompJmsDestination dest = StompJmsMessageTransformation.transformDestination(this.connection, (Destination)queue);
        StompJmsQueueReceiver result = new StompJmsQueueReceiver(this.getChannel().nextId(), this, dest, messageSelector);
        result.init();
        return result;
    }

    public QueueBrowser createBrowser(Queue destination) throws JMSException {
        this.checkClosed();
        StompJmsSession.checkDestination((Destination)destination);
        StompJmsDestination dest = StompJmsMessageTransformation.transformDestination(this.connection, (Destination)destination);
        StompJmsQueueBrowser result = new StompJmsQueueBrowser(this, this.getChannel().nextId(), dest, "");
        return result;
    }

    public QueueBrowser createBrowser(Queue destination, String messageSelector) throws JMSException {
        this.checkClosed();
        StompJmsSession.checkDestination((Destination)destination);
        messageSelector = StompJmsSession.checkSelector(messageSelector);
        StompJmsDestination dest = StompJmsMessageTransformation.transformDestination(this.connection, (Destination)destination);
        StompJmsQueueBrowser result = new StompJmsQueueBrowser(this, this.getChannel().nextId(), dest, messageSelector);
        return result;
    }

    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
        this.checkClosed();
        StompJmsSession.checkDestination((Destination)topic);
        StompJmsDestination dest = StompJmsMessageTransformation.transformDestination(this.connection, (Destination)topic);
        StompJmsTopicSubscriber result = new StompJmsTopicSubscriber(this.getChannel().nextId(), this, dest, false, "");
        result.init();
        return result;
    }

    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
        this.checkClosed();
        StompJmsSession.checkDestination((Destination)topic);
        messageSelector = StompJmsSession.checkSelector(messageSelector);
        StompJmsDestination dest = StompJmsMessageTransformation.transformDestination(this.connection, (Destination)topic);
        StompJmsTopicSubscriber result = new StompJmsTopicSubscriber(this.getChannel().nextId(), this, dest, noLocal, messageSelector);
        return result;
    }

    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
        this.checkClosed();
        StompJmsSession.checkDestination((Destination)topic);
        AsciiBuffer id = StompFrame.encodeHeader(this.connection.getClientID() + ":" + name);
        StompJmsDestination dest = StompJmsMessageTransformation.transformDestination(this.connection, (Destination)topic);
        StompJmsDurableTopicSubscriber result = new StompJmsDurableTopicSubscriber(id, this, dest, false, "");
        result.init();
        return result;
    }

    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
        this.checkClosed();
        StompJmsSession.checkDestination((Destination)topic);
        messageSelector = StompJmsSession.checkSelector(messageSelector);
        StompJmsDestination dest = StompJmsMessageTransformation.transformDestination(this.connection, (Destination)topic);
        AsciiBuffer id = name != null ? StompFrame.encodeHeader(name) : this.getChannel().nextId();
        StompJmsDurableTopicSubscriber result = new StompJmsDurableTopicSubscriber(id, this, dest, noLocal, messageSelector);
        result.init();
        return result;
    }

    public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) throws JMSException {
        throw new UnsupportedOperationException("createSharedConsumer(topic, sharedSubscriptionName) is not supported");
    }

    public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, String messageSelector) throws JMSException {
        throw new UnsupportedOperationException("createSharedConsumer(topic, sharedSubscriptionName, messageSelector) is not supported");
    }

    public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException {
        throw new UnsupportedOperationException("createDurableConsumer(topic, name) is not supported");
    }

    public MessageConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
        throw new UnsupportedOperationException("createDurableConsumer(topic, name, messageSelector, noLocal) is not supported");
    }

    public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException {
        throw new UnsupportedOperationException("createSharedDurableConsumer(topic, name) is not supported");
    }

    public MessageConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector) throws JMSException {
        throw new UnsupportedOperationException("createSharedDurableConsumer(topic, name, messageSelector) is not supported");
    }

    public void unsubscribe(String name) throws JMSException {
        this.checkClosed();
        AsciiBuffer id = StompFrame.encodeHeader(name);
        StompJmsMessageConsumer consumer = this.consumers.remove(id);
        if (consumer != null) {
            consumer.close();
        }
        this.getChannel().unsubscribe(id, true);
    }

    public MessageProducer createProducer(Destination destination) throws JMSException {
        this.checkClosed();
        StompJmsDestination dest = StompJmsMessageTransformation.transformDestination(this.connection, destination);
        StompJmsMessageProducer result = new StompJmsMessageProducer(this, dest);
        this.add(result);
        return result;
    }

    public QueueSender createSender(Queue queue) throws JMSException {
        this.checkClosed();
        StompJmsDestination dest = StompJmsMessageTransformation.transformDestination(this.connection, (Destination)queue);
        StompJmsQueueSender result = new StompJmsQueueSender(this, dest);
        return result;
    }

    public TopicPublisher createPublisher(Topic topic) throws JMSException {
        this.checkClosed();
        StompJmsDestination dest = StompJmsMessageTransformation.transformDestination(this.connection, (Destination)topic);
        StompJmsTopicPublisher result = new StompJmsTopicPublisher(this, dest);
        this.add(result);
        return result;
    }

    public BytesMessage createBytesMessage() throws IllegalStateException {
        this.checkClosed();
        return this.init(new StompJmsBytesMessage());
    }

    public MapMessage createMapMessage() throws IllegalStateException {
        this.checkClosed();
        return this.init(new StompJmsMapMessage());
    }

    public Message createMessage() throws IllegalStateException {
        this.checkClosed();
        return this.init(new StompJmsMessage());
    }

    public ObjectMessage createObjectMessage() throws IllegalStateException {
        this.checkClosed();
        return this.init(new StompJmsObjectMessage());
    }

    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
        ObjectMessage result = this.createObjectMessage();
        result.setObject(object);
        return result;
    }

    public StreamMessage createStreamMessage() throws JMSException {
        this.checkClosed();
        return this.init(new StompJmsStreamMessage());
    }

    public TextMessage createTextMessage() throws JMSException {
        this.checkClosed();
        return this.init(new StompJmsTextMessage());
    }

    public TextMessage createTextMessage(String text) throws JMSException {
        TextMessage result = this.createTextMessage();
        result.setText(text);
        return result;
    }

    public Queue createQueue(String queueName) throws JMSException {
        this.checkClosed();
        return new StompJmsQueue(this.connection, queueName);
    }

    public TemporaryQueue createTemporaryQueue() throws JMSException {
        this.checkClosed();
        return this.getChannel().getServerAdaptor().createTemporaryQueue(this);
    }

    public TemporaryTopic createTemporaryTopic() throws JMSException {
        this.checkClosed();
        return this.getChannel().getServerAdaptor().createTemporaryTopic(this);
    }

    public Topic createTopic(String topicName) throws JMSException {
        this.checkClosed();
        return new StompJmsTopic(this.connection, topicName);
    }

    protected void add(StompJmsMessageConsumer consumer) throws JMSException {
        this.consumers.put(consumer.getId(), consumer);
        if (consumer.tcpFlowControl()) {
            this.getChannel().serverAckSubs.incrementAndGet();
        }
        AsciiBuffer mode = this.acknowledgementMode == -1 ? Constants.AUTO : Constants.CLIENT;
        this.getChannel().subscribe(consumer.getDestination(), consumer.getId(), StompFrame.encodeHeader(consumer.getMessageSelector()), mode, consumer.getNoLocal(), consumer.isDurableSubscription(), consumer.isBrowser(), this.prefetch, StompFrame.encodeHeaders(consumer.getDestination().getSubscribeHeaders()));
        if (this.started.get()) {
            consumer.start();
        }
    }

    protected void remove(StompJmsMessageConsumer consumer) throws JMSException {
        if (this.getChannel().isStarted()) {
            this.getChannel().unsubscribe(consumer.getId(), false);
        }
        this.consumers.remove(consumer.getId());
        if (consumer.tcpFlowControl()) {
            this.getChannel().serverAckSubs.decrementAndGet();
        }
    }

    protected void add(MessageProducer producer) {
        this.producers.add(producer);
    }

    protected void remove(MessageProducer producer) {
        this.producers.remove(producer);
    }

    protected void onException(Exception ex) {
        this.connection.onException(ex);
    }

    protected void onException(JMSException ex) {
        this.connection.onException(ex);
    }

    protected void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive, boolean disableMsgId) throws JMSException {
        StompJmsDestination destination = StompJmsMessageTransformation.transformDestination(this.connection, dest);
        this.send(destination, msg, deliveryMode, priority, timeToLive, disableMsgId);
    }

    private void send(StompJmsDestination destination, Message original, int deliveryMode, int priority, long timeToLive, boolean disableMsgId) throws JMSException {
        boolean sync;
        boolean nativeMessage;
        original.setJMSDeliveryMode(deliveryMode);
        original.setJMSPriority(priority);
        if (timeToLive > 0L) {
            long timeStamp = System.currentTimeMillis();
            original.setJMSTimestamp(timeStamp);
            original.setJMSExpiration(System.currentTimeMillis() + timeToLive);
        }
        AsciiBuffer msgId = null;
        if (!disableMsgId) {
            msgId = this.getNextMessageId();
        }
        if (nativeMessage = original instanceof StompJmsMessage) {
            ((StompJmsMessage)original).setConnection(this.connection);
            if (!disableMsgId) {
                ((StompJmsMessage)original).setMessageID(msgId);
            }
            original.setJMSDestination((Destination)destination);
        } else if (!disableMsgId) {
            original.setJMSMessageID(msgId.toString());
        }
        StompJmsMessage copy = StompJmsMessageTransformation.transformMessage(this.connection, original);
        if (!nativeMessage) {
            copy.setJMSDestination(destination);
        }
        boolean bl = sync = !this.forceAsyncSend && deliveryMode == 2 && !this.getTransacted();
        if (this.consumers.isEmpty() || this.getTransacted()) {
            StompChannel channel = this.getChannel();
            channel.sendMessage(copy, this.currentTransactionId, sync);
        } else {
            if (!disableMsgId) {
                copy.setMessageID(msgId);
            }
            this.connection.getChannel().sendMessage(copy, this.currentTransactionId, sync);
        }
    }

    protected void checkClosed() throws IllegalStateException {
        if (this.closed.get()) {
            throw new IllegalStateException("The MessageProducer is closed");
        }
    }

    public static String checkSelector(String selector) throws InvalidSelectorException {
        if (selector != null) {
            if (selector.trim().length() == 0) {
                return null;
            }
            if (SELECTOR_PARSER != null) {
                OptionalSectorParser.check(selector);
            }
        }
        return selector;
    }

    public static void checkDestination(Destination dest) throws InvalidDestinationException {
        if (dest == null) {
            throw new InvalidDestinationException("Destination cannot be null");
        }
    }

    @Override
    public void onMessage(StompJmsMessage message) {
        message.setConnection(this.connection);
        if (this.started.get()) {
            this.dispatch(message);
        } else {
            this.stoppedMessages.add(message);
        }
    }

    protected void start() throws JMSException {
        if (this.started.compareAndSet(false, true)) {
            StompJmsMessage message = null;
            while ((message = this.stoppedMessages.poll()) != null) {
                this.dispatch(message);
            }
            if (this.getTransacted() && this.currentTransactionId == null) {
                this.currentTransactionId = this.getChannel().startTransaction();
            }
            for (StompJmsMessageConsumer consumer : this.consumers.values()) {
                consumer.start();
            }
        }
    }

    protected StompChannel getChannel() throws JMSException {
        if (this.channel == null) {
            this.checkClosed();
            this.channel = this.connection.createChannel(this);
        }
        return this.channel;
    }

    public boolean isForceAsyncSend() {
        return this.forceAsyncSend;
    }

    public void setForceAsyncSend(boolean forceAsyncSend) {
        this.forceAsyncSend = forceAsyncSend;
    }

    protected void stop() throws JMSException {
        this.started.set(false);
        if (this.executor != null) {
            this.executor.shutdown();
            this.executor = null;
        }
        for (StompJmsMessageConsumer consumer : this.consumers.values()) {
            consumer.stop();
        }
    }

    protected boolean isStarted() {
        return this.started.get();
    }

    public StompJmsConnection getConnection() {
        return this.connection;
    }

    Executor getExecutor() {
        if (this.executor == null) {
            this.executor = Executors.newSingleThreadExecutor();
        }
        return this.executor;
    }

    private void dispatch(StompJmsMessage message) {
        AsciiBuffer id = message.getConsumerId();
        if (id == null || id.isEmpty()) {
            this.connection.onException(new JMSException("No ConsumerId set for " + message));
        }
        if (this.messageListener != null) {
            this.messageListener.onMessage((Message)message);
        } else {
            StompJmsMessageConsumer consumer = this.consumers.get(id);
            if (consumer != null) {
                consumer.onMessage(message);
            }
        }
    }

    private AsciiBuffer getNextMessageId() throws JMSException {
        AsciiBuffer session = null;
        session = this.channel != null ? this.channel.sessionId() : this.connection.getChannel().sessionId();
        AsciiBuffer id = Buffer.ascii(Long.toString(this.nextMessageSwquence++));
        ByteArrayOutputStream out = new ByteArrayOutputStream(3 + session.length() + 1 + id.length());
        out.write(73);
        out.write(68);
        out.write(58);
        out.write(session);
        out.write(45);
        out.write(id);
        return out.toBuffer().ascii();
    }

    private <T extends StompJmsMessage> T init(T message) {
        message.setConnection(this.connection);
        return message;
    }

    public StompJmsPrefetch getPrefetch() {
        return this.prefetch;
    }

    public void setPrefetch(StompJmsPrefetch prefetch) {
        this.prefetch = prefetch;
    }

    static {
        OptionalSectorParser parser;
        try {
            parser = new OptionalSectorParser();
            OptionalSectorParser.check("x=1");
        }
        catch (Throwable e) {
            parser = null;
        }
        SELECTOR_PARSER = parser;
    }

    static class OptionalSectorParser {
        OptionalSectorParser() {
        }

        public static void check(String selector) throws InvalidSelectorException {
            try {
                SelectorParser.parse(selector);
            }
            catch (FilterException e) {
                throw new InvalidSelectorException(e.getMessage());
            }
        }
    }
}

