/*
 * Decompiled with CFR 0.152.
 */
package org.hswebframework.web.message.memory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.hswebframework.web.message.Message;
import org.hswebframework.web.message.MessagePublish;
import org.hswebframework.web.message.MessageSubject;
import org.hswebframework.web.message.MessageSubscribe;
import org.hswebframework.web.message.Messager;
import org.hswebframework.web.message.memory.MemoryPublish;
import org.hswebframework.web.message.memory.MemoryTopic;
import org.hswebframework.web.message.memory.MemoryTopicSubscribe;
import org.hswebframework.web.message.support.QueueMessageSubject;
import org.hswebframework.web.message.support.TopicMessageSubject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MemoryMessager
implements Messager {
    private static final Logger log = LoggerFactory.getLogger(MemoryMessager.class);
    private Map<String, MemoryTopic<? extends Message>> topicStore = new ConcurrentHashMap<String, MemoryTopic<? extends Message>>(256);
    private Map<String, QueueConsumer<? extends Message>> queueStore = new ConcurrentHashMap<String, QueueConsumer<? extends Message>>(512);
    private Executor executor;

    public MemoryMessager(Executor executor) {
        this.executor = executor;
    }

    public MemoryMessager() {
        this(Executors.newCachedThreadPool());
    }

    public void setExecutor(Executor executor) {
        this.executor = executor;
    }

    @Override
    public MessagePublish publish(Message message) {
        return new MemoryPublish(this::getQueue, this::getTopic, message);
    }

    private <M extends Message> QueueConsumer<M> getQueue(String name) {
        return this.queueStore.computeIfAbsent(name, queueName -> new QueueConsumer());
    }

    public <M extends Message> MemoryTopic<M> getTopic(String name) {
        return this.topicStore.computeIfAbsent(name, topic -> new MemoryTopic());
    }

    @Override
    public <M extends Message> MessageSubscribe<M> subscribe(MessageSubject subject) {
        if (subject instanceof QueueMessageSubject) {
            final QueueConsumer<M> queue = this.getQueue(((QueueMessageSubject)subject).getQueueName());
            return new MessageSubscribe<M>(){
                private List<Consumer<M>> consumers = new ArrayList();
                private Consumer<M> consumer = m -> this.consumers.forEach(cons -> cons.accept(m));
                {
                    queue.lock.writeLock().lock();
                    try {
                        queue.consumers.add(this.consumer);
                    }
                    finally {
                        queue.lock.writeLock().unlock();
                    }
                }

                @Override
                public MessageSubscribe<M> onMessage(Consumer<M> consumer) {
                    this.consumers.add(consumer);
                    return this;
                }

                @Override
                public void cancel() {
                    try {
                        queue.lock.writeLock().tryLock(5L, TimeUnit.SECONDS);
                        try {
                            queue.consumers.remove(this.consumer);
                        }
                        finally {
                            queue.lock.writeLock().unlock();
                        }
                    }
                    catch (Exception e) {
                        log.warn(e.getMessage(), (Throwable)e);
                    }
                }
            };
        }
        if (subject instanceof TopicMessageSubject) {
            return new MemoryTopicSubscribe(this.topicStore.computeIfAbsent(((TopicMessageSubject)subject).getTopic(), topic -> new MemoryTopic()));
        }
        throw new UnsupportedOperationException(subject.getClass().getName());
    }

    class QueueConsumer<M extends Message> {
        final List<Consumer<M>> consumers = new ArrayList<Consumer<M>>();
        final ReadWriteLock lock = new ReentrantReadWriteLock();

        QueueConsumer() {
        }
    }
}

