/*
 * Decompiled with CFR 0.152.
 */
package org.hswebframework.web.socket.message;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.hswebframework.web.concurrent.counter.CounterManager;
import org.hswebframework.web.concurrent.counter.SimpleCounterManager;
import org.hswebframework.web.message.Message;
import org.hswebframework.web.message.MessageSubject;
import org.hswebframework.web.message.MessageSubscribe;
import org.hswebframework.web.message.Messager;
import org.hswebframework.web.message.builder.StaticMessageBuilder;
import org.hswebframework.web.message.builder.StaticMessageSubjectBuilder;
import org.hswebframework.web.message.support.ObjectMessage;
import org.hswebframework.web.socket.message.WebSocketMessage;
import org.hswebframework.web.socket.message.WebSocketMessager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;

public class DefaultWebSocketMessager
implements WebSocketMessager {
    private static final Logger log = LoggerFactory.getLogger(DefaultWebSocketMessager.class);
    private Messager messager;
    private CounterManager counterManager;
    private final Map<String, Map<String, Map<String, MessageSubscribeSession>>> store = new ConcurrentHashMap<String, Map<String, Map<String, MessageSubscribeSession>>>(32);

    public DefaultWebSocketMessager(Messager messager) {
        this(messager, (CounterManager)new SimpleCounterManager());
    }

    public DefaultWebSocketMessager(Messager messager, CounterManager counterManager) {
        this.messager = messager;
        this.counterManager = counterManager;
    }

    @Override
    public void onSessionConnect(WebSocketSession session) {
    }

    private String getSubTotalKey(String command, String type) {
        return "sub_".concat(command).concat("_").concat(type).concat("_total");
    }

    @Override
    public int getSubscribeTotal(String command, String type) {
        return (int)this.counterManager.getCounter(this.getSubTotalKey(command, type)).get();
    }

    @Override
    public void onSessionClose(WebSocketSession session) {
        this.store.values().stream().map(Map::values).flatMap(Collection::stream).map(sessionStore -> (MessageSubscribeSession)sessionStore.get(session.getId())).filter(Objects::nonNull).forEach(MessageSubscribeSession::cancel);
    }

    @Override
    public void publish(String command, String type, WebSocketMessage message) {
        this.messager.publish((Message)StaticMessageBuilder.object((Serializable)message)).to((MessageSubject)("queue".equals(type) ? StaticMessageSubjectBuilder.queue((String)("queue_" + command)) : StaticMessageSubjectBuilder.topic((String)("topic_" + command)))).send();
    }

    private Map<String, MessageSubscribeSession> getSubSession(String command, String type) {
        return this.store.computeIfAbsent(command, cmd -> new ConcurrentHashMap(128)).computeIfAbsent(type, t -> new ConcurrentHashMap());
    }

    @Override
    public boolean subscribe(final String command, final String type, WebSocketSession socketSession) {
        Map<String, MessageSubscribeSession> subscribeSessionStore = this.getSubSession(command, type);
        subscribeSessionStore.computeIfAbsent(socketSession.getId(), sessionId -> {
            MessageSubscribe subscribe = this.messager.subscribe((MessageSubject)("queue".equals(type) ? StaticMessageSubjectBuilder.queue((String)("queue_" + command)) : StaticMessageSubjectBuilder.topic((String)("topic_" + command))));
            subscribe.onMessage(message -> {
                try {
                    if (!socketSession.isOpen()) {
                        this.deSubscribe(command, type, socketSession);
                        return;
                    }
                    socketSession.sendMessage((org.springframework.web.socket.WebSocketMessage)new TextMessage((CharSequence)message.getObject().toString()));
                }
                catch (IOException e) {
                    log.error("execute WebSocket command {} error", (Object)command, (Object)e);
                }
            });
            return new MessageSubscribeSession(subscribe, socketSession){

                @Override
                public void cancel() {
                    super.cancel();
                    DefaultWebSocketMessager.this.counterManager.getCounter(DefaultWebSocketMessager.this.getSubTotalKey(command, type)).decrement();
                }
            };
        });
        this.counterManager.getCounter(this.getSubTotalKey(command, type)).increment();
        return true;
    }

    @Override
    public boolean deSubscribe(String command, String type, WebSocketSession socketSession) {
        Map<String, MessageSubscribeSession> subscribeSessionStore = this.getSubSession(command, type);
        MessageSubscribeSession subscribeSession = subscribeSessionStore.get(socketSession.getId());
        if (null != subscribeSession) {
            subscribeSession.getSubscribe().cancel();
            subscribeSessionStore.remove(socketSession.getId());
            this.counterManager.getCounter(this.getSubTotalKey(command, type)).decrement();
            return true;
        }
        return false;
    }

    public class MessageSubscribeSession {
        private MessageSubscribe<ObjectMessage<WebSocketMessage>> subscribe;
        private WebSocketSession session;

        public MessageSubscribeSession(MessageSubscribe<ObjectMessage<WebSocketMessage>> subscribe, WebSocketSession session) {
            this.subscribe = subscribe;
            this.session = session;
        }

        public MessageSubscribe<ObjectMessage<WebSocketMessage>> getSubscribe() {
            return this.subscribe;
        }

        public void setSubscribe(MessageSubscribe<ObjectMessage<WebSocketMessage>> subscribe) {
            this.subscribe = subscribe;
        }

        public WebSocketSession getSession() {
            return this.session;
        }

        public void setSession(WebSocketSession session) {
            this.session = session;
        }

        public void cancel() {
            this.subscribe.cancel();
        }
    }
}

