/*
 * Decompiled with CFR 0.152.
 */
package org.icij.datashare.com;

import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.icij.datashare.CollectionUtils;
import org.icij.datashare.com.Channel;
import org.icij.datashare.com.DataBus;
import org.icij.datashare.com.Message;
import org.icij.datashare.com.ShutdownMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MemoryDataBus
implements DataBus {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final Map<Consumer<Message>, MessageListener> subscribers = new ConcurrentHashMap<Consumer<Message>, MessageListener>();

    @Override
    public void publish(Channel channel, Message message) {
        Message nonNullMessage = Objects.requireNonNull(message, "cannot publish a null message");
        this.subscribers.values().stream().filter(l -> l.hasSubscribedTo(channel)).forEach(l -> l.accept(nonNullMessage));
    }

    @Override
    public int subscribe(Consumer<Message> subscriber, Channel ... channels) throws InterruptedException {
        return this.subscribe(subscriber, () -> this.logger.debug("subscribed {} to {}", (Object)subscriber, (Object)Arrays.toString((Object[])channels)), channels);
    }

    @Override
    public int subscribe(Consumer<Message> subscriber, Runnable subscriptionCallback, Channel ... channels) throws InterruptedException {
        MessageListener listener = new MessageListener(subscriber, channels);
        this.subscribers.put(subscriber, listener);
        subscriptionCallback.run();
        int nbMessages = listener.loopUntilShutdown();
        this.logger.info("exiting {}", subscriber);
        return nbMessages;
    }

    @Override
    public void unsubscribe(Consumer<Message> subscriber) {
        Optional.ofNullable(this.subscribers.remove(subscriber)).ifPresent(l -> {
            l.accept(new ShutdownMessage());
            this.logger.debug("unsubscribed {}", (Object)subscriber);
        });
    }

    @Override
    public boolean getHealth() {
        return true;
    }

    private static class MessageListener
    implements Consumer<Message> {
        private final Consumer<Message> subscriber;
        private final LinkedHashSet<Channel> channels;
        final AtomicReference<Message> message = new AtomicReference();
        final AtomicInteger nbMessages = new AtomicInteger(0);

        public MessageListener(Consumer<Message> subscriber, Channel ... channels) {
            this.subscriber = subscriber;
            this.channels = CollectionUtils.asSet(channels);
        }

        boolean hasSubscribedTo(Channel channel) {
            return this.channels.contains((Object)channel);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void accept(Message message) {
            this.subscriber.accept(message);
            AtomicReference<Message> atomicReference = this.message;
            synchronized (atomicReference) {
                this.message.set(message);
                this.message.notify();
            }
            this.nbMessages.getAndIncrement();
        }

        boolean shutdownAsked() {
            Message message = this.message.get();
            return message != null && message.type == Message.Type.SHUTDOWN;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        int loopUntilShutdown() throws InterruptedException {
            AtomicReference<Message> atomicReference = this.message;
            synchronized (atomicReference) {
                while (!this.shutdownAsked()) {
                    this.message.wait();
                }
            }
            return this.nbMessages.get();
        }
    }
}

