/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.messaging;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import org.neo4j.causalclustering.messaging.Inbound;
import org.neo4j.causalclustering.messaging.Message;
import org.neo4j.causalclustering.messaging.TestNetwork;

public class TestNetwork<T> {
    private final Map<T, Inbound> inboundChannels = new HashMap<T, Inbound>();
    private final Map<T, Outbound> outboundChannels = new HashMap<T, Outbound>();
    private final AtomicLong seqGen = new AtomicLong();
    private final BiFunction<T, T, Long> latencySpecMillis;

    public TestNetwork(BiFunction<T, T, Long> latencySpecMillis) {
        this.latencySpecMillis = latencySpecMillis;
    }

    public void disconnect(T endpoint) {
        this.disconnectOutbound(endpoint);
        this.disconnectInbound(endpoint);
    }

    public void reconnect(T endpoint) {
        this.reconnectInbound(endpoint);
        this.reconnectOutbound(endpoint);
    }

    public void reset() {
        this.inboundChannels.values().forEach(Inbound::reconnect);
        this.outboundChannels.values().forEach(Outbound::reconnect);
    }

    public void disconnectInbound(T endpoint) {
        this.inboundChannels.get(endpoint).disconnect();
    }

    public void reconnectInbound(T endpoint) {
        this.inboundChannels.get(endpoint).reconnect();
    }

    public void disconnectOutbound(T endpoint) {
        this.outboundChannels.get(endpoint).disconnect();
    }

    public void reconnectOutbound(T endpoint) {
        this.outboundChannels.get(endpoint).reconnect();
    }

    public void start() {
        for (Inbound inbound : this.inboundChannels.values()) {
            inbound.start();
        }
        for (Outbound outbound : this.outboundChannels.values()) {
            outbound.start();
        }
    }

    public void stop() {
        for (Outbound outbound : this.outboundChannels.values()) {
            try {
                outbound.stop();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        for (Inbound inbound : this.inboundChannels.values()) {
            try {
                inbound.stop();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public class Inbound
    implements org.neo4j.causalclustering.messaging.Inbound<Message> {
        private Inbound.MessageHandler<Message> handler;
        private final BlockingQueue<Message> Q = new ArrayBlockingQueue<Message>(64, true);
        private org.neo4j.causalclustering.messaging.TestNetwork$Inbound.NetworkThread networkThread;
        private volatile boolean disconnected = false;

        public Inbound(T endpoint) {
            TestNetwork.this.inboundChannels.put(endpoint, this);
        }

        public void start() {
            this.networkThread = new NetworkThread();
            this.networkThread.start();
        }

        public void stop() throws InterruptedException {
            this.networkThread.kill();
        }

        public synchronized void deliver(Message message) {
            if (!this.disconnected) {
                this.Q.offer(message);
            }
        }

        public void registerHandler(Inbound.MessageHandler<Message> handler) {
            this.handler = handler;
        }

        public void disconnect() {
            this.disconnected = true;
        }

        public void reconnect() {
            this.disconnected = false;
        }

        class NetworkThread
        extends Thread {
            private volatile boolean done = false;

            NetworkThread() {
            }

            public void kill() throws InterruptedException {
                this.done = true;
                this.interrupt();
                this.join();
            }

            @Override
            public void run() {
                while (!this.done) {
                    try {
                        Message message = (Message)Inbound.this.Q.poll(1L, TimeUnit.SECONDS);
                        if (message == null || Inbound.this.handler == null) continue;
                        Inbound.this.handler.handle(message);
                    }
                    catch (InterruptedException e) {
                        this.done = true;
                    }
                }
            }
        }
    }

    public class Outbound
    implements org.neo4j.causalclustering.messaging.Outbound<T, Message> {
        private org.neo4j.causalclustering.messaging.TestNetwork$Outbound.NetworkThread networkThread;
        private volatile boolean disconnected = false;
        private T me;

        public Outbound(T me) {
            this.me = me;
            TestNetwork.this.outboundChannels.put(me, this);
        }

        public void start() {
            this.networkThread = new NetworkThread();
            this.networkThread.start();
        }

        public void stop() throws InterruptedException {
            this.networkThread.kill();
        }

        public void send(T destination, Message message) {
            this.doSend(destination, message, System.currentTimeMillis());
        }

        private void doSend(T destination, Message message, long now) {
            long atMillis = now + (Long)TestNetwork.this.latencySpecMillis.apply(this.me, destination);
            this.networkThread.scheduleDelivery(destination, message, atMillis);
        }

        public void disconnect() {
            this.disconnected = true;
        }

        public void reconnect() {
            this.disconnected = false;
        }

        class NetworkThread
        extends Thread {
            private volatile boolean done = false;
            private final TreeSet<org.neo4j.causalclustering.messaging.TestNetwork$Outbound.NetworkThread.MessageContext> msgQueue = new TreeSet((o1, o2) -> {
                int res = Long.compare(((MessageContext)o1).atMillis, ((MessageContext)o2).atMillis);
                if (res == 0 && o1 != o2) {
                    res = ((MessageContext)o1).seqNum < ((MessageContext)o2).seqNum ? -1 : 1;
                }
                return res;
            });

            NetworkThread() {
            }

            public void kill() throws InterruptedException {
                this.done = true;
                this.interrupt();
                this.join();
            }

            public synchronized void scheduleDelivery(T destination, Message message, long atMillis) {
                if (!Outbound.this.disconnected) {
                    this.msgQueue.add((org.neo4j.causalclustering.messaging.TestNetwork$Outbound.NetworkThread.MessageContext)new MessageContext(destination, message, atMillis));
                    this.notifyAll();
                }
            }

            @Override
            public synchronized void run() {
                while (!this.done) {
                    MessageContext context;
                    long now = System.currentTimeMillis();
                    Iterator<org.neo4j.causalclustering.messaging.TestNetwork$Outbound.NetworkThread.MessageContext> itr = this.msgQueue.iterator();
                    while (itr.hasNext() && (context = (MessageContext)itr.next()).atMillis <= now) {
                        itr.remove();
                        Inbound inbound = (Inbound)TestNetwork.this.inboundChannels.get(context.destination);
                        if (inbound == null) continue;
                        inbound.deliver(context.message);
                    }
                    try {
                        try {
                            MessageContext first = (MessageContext)this.msgQueue.first();
                            long waitTime = first.atMillis - System.currentTimeMillis();
                            if (waitTime <= 0L) continue;
                            this.wait(waitTime);
                        }
                        catch (NoSuchElementException e) {
                            this.wait(1000L);
                        }
                    }
                    catch (InterruptedException e) {
                        this.done = true;
                    }
                }
            }

            private class MessageContext {
                private final T destination;
                private final Message message;
                private long atMillis;
                private long seqNum;

                private MessageContext(T destination, Message message, long atMillis) {
                    this.destination = destination;
                    this.message = message;
                    this.atMillis = atMillis;
                    this.seqNum = TestNetwork.this.seqGen.getAndIncrement();
                }

                public boolean equals(Object o) {
                    if (this == o) {
                        return true;
                    }
                    if (o == null || this.getClass() != o.getClass()) {
                        return false;
                    }
                    MessageContext that = (MessageContext)o;
                    return this.seqNum == that.seqNum;
                }

                public int hashCode() {
                    return Objects.hash(this.seqNum);
                }
            }
        }
    }
}

