package dk.alexandra.fresco.framework.network;

import ch.qos.logback.core.CoreConstants;
import dk.alexandra.fresco.framework.Party;
import dk.alexandra.fresco.framework.configuration.NetworkConfiguration;
import dk.alexandra.fresco.framework.util.ExceptionConverter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dk/alexandra/fresco/framework/network/AsyncNetwork.class */
public class AsyncNetwork implements CloseableNetwork {
    private static final int PARTY_ID_BYTES = 1;
    private final BlockingQueue<byte[]> selfQueue;
    private final NetworkConfiguration conf;
    private boolean alive;
    private ExecutorService communicationService;
    private Collection<SocketChannel> channels;
    private final Map<Integer, Sender> senders;
    private final Map<Integer, Receiver> receivers;
    public static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofMinutes(1);
    private static final Duration RECEIVE_TIMEOUT = Duration.ofMillis(100);
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) AsyncNetwork.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:dk/alexandra/fresco/framework/network/AsyncNetwork$Receiver.class */
    public static class Receiver implements Callable<Object> {
        private final SocketChannel channel;
        private final BlockingQueue<byte[]> queue;
        private final Future<Object> future;
        private final AtomicBoolean run;

        Receiver(SocketChannel socketChannel, ExecutorService executorService) {
            Objects.requireNonNull(socketChannel);
            Objects.requireNonNull(executorService);
            this.channel = socketChannel;
            this.queue = new LinkedBlockingQueue();
            this.run = new AtomicBoolean(true);
            this.future = executorService.submit(this);
        }

        boolean isRunning() throws InterruptedException, ExecutionException {
            if (!this.future.isDone()) {
                return true;
            }
            this.future.get();
            return false;
        }

        void stop() throws InterruptedException, ExecutionException, IOException {
            if (isRunning()) {
                this.run.set(false);
                this.channel.shutdownInput();
                this.future.get();
            }
        }

        byte[] pollMessage(Duration duration) {
            return (byte[]) ExceptionConverter.safe(() -> {
                return this.queue.poll(duration.toMillis(), TimeUnit.MILLISECONDS);
            }, "Receive interrupted");
        }

        @Override // java.util.concurrent.Callable
        public Object call() throws IOException, InterruptedException {
            while (this.run.get()) {
                ByteBuffer allocate = ByteBuffer.allocate(4);
                while (allocate.hasRemaining() && this.run.get()) {
                    this.channel.read(allocate);
                }
                if (this.run.get()) {
                    allocate.flip();
                    ByteBuffer allocate2 = ByteBuffer.allocate(allocate.getInt());
                    while (allocate2.remaining() > 0) {
                        this.channel.read(allocate2);
                    }
                    this.queue.add(allocate2.array());
                }
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:dk/alexandra/fresco/framework/network/AsyncNetwork$Sender.class */
    public static class Sender implements Callable<Object> {
        private final SocketChannel channel;
        private final BlockingQueue<byte[]> queue;
        private final AtomicBoolean flush;
        private final AtomicBoolean ignoreNext;
        private Future<Object> future;

        Sender(SocketChannel socketChannel, ExecutorService executorService) {
            Objects.requireNonNull(socketChannel);
            Objects.requireNonNull(executorService);
            this.channel = socketChannel;
            this.queue = new LinkedBlockingQueue();
            this.flush = new AtomicBoolean(false);
            this.ignoreNext = new AtomicBoolean(false);
            this.future = executorService.submit(this);
        }

        private void unblock() {
            this.flush.set(true);
            if (this.queue.isEmpty()) {
                this.ignoreNext.set(true);
                this.queue.add(new byte[0]);
            }
        }

        void queueMessage(byte[] bArr) {
            this.queue.add(bArr);
        }

        boolean isRunning() throws InterruptedException, ExecutionException {
            if (!this.future.isDone()) {
                return true;
            }
            this.future.get();
            return false;
        }

        void stop() throws InterruptedException, ExecutionException, IOException {
            if (isRunning()) {
                unblock();
            }
            this.future.get();
            this.channel.shutdownOutput();
        }

        @Override // java.util.concurrent.Callable
        public Object call() throws IOException, InterruptedException {
            while (true) {
                if (this.queue.isEmpty() && this.flush.get()) {
                    return null;
                }
                byte[] take = this.queue.take();
                if (!this.ignoreNext.get()) {
                    ByteBuffer allocate = ByteBuffer.allocate(4 + take.length);
                    allocate.putInt(take.length);
                    allocate.put(take);
                    allocate.position(0);
                    while (allocate.hasRemaining()) {
                        this.channel.write(allocate);
                    }
                }
            }
        }
    }

    public AsyncNetwork(NetworkConfiguration networkConfiguration) {
        this(networkConfiguration, DEFAULT_CONNECTION_TIMEOUT);
    }

    public AsyncNetwork(NetworkConfiguration networkConfiguration, Duration duration) {
        this.conf = networkConfiguration;
        int noOfParties = networkConfiguration.noOfParties() - 1;
        this.receivers = new HashMap(noOfParties);
        this.senders = new HashMap(noOfParties);
        this.alive = true;
        this.selfQueue = new LinkedBlockingQueue();
        if (networkConfiguration.noOfParties() > 1) {
            Map<Integer, SocketChannel> connectNetwork = connectNetwork(duration);
            this.channels = connectNetwork.values();
            startCommunication(connectNetwork);
        }
        logger.info("P{}: successfully connected network", Integer.valueOf(networkConfiguration.getMyId()));
    }

    private Map<Integer, SocketChannel> connectNetwork(Duration duration) {
        HashMap hashMap = new HashMap(this.conf.noOfParties());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(newFixedThreadPool);
        executorCompletionService.submit(() -> {
            return connectClient();
        });
        executorCompletionService.submit(() -> {
            return connectServer(bindServer());
        });
        try {
            for (int i = 0; i < 2; i++) {
                try {
                    try {
                        try {
                            Future poll = executorCompletionService.poll(duration.toMillis(), TimeUnit.MILLISECONDS);
                            if (poll == null) {
                                throw new TimeoutException("Timed out");
                            }
                            hashMap.putAll((Map) poll.get());
                        } catch (InterruptedException e) {
                            throw new RuntimeException("Interrupted while connecting network", e);
                        }
                    } catch (TimeoutException e2) {
                        throw new RuntimeException("Timed out connecting network", e2);
                    }
                } catch (ExecutionException e3) {
                    throw new RuntimeException("Failed to connect network", e3.getCause());
                }
            }
            return hashMap;
        } finally {
            newFixedThreadPool.shutdownNow();
        }
    }

    private Map<Integer, SocketChannel> connectClient() throws InterruptedException {
        HashMap hashMap = new HashMap(this.conf.noOfParties() - this.conf.getMyId());
        for (int myId = this.conf.getMyId() + 1; myId <= this.conf.noOfParties(); myId++) {
            Party party = this.conf.getParty(myId);
            InetSocketAddress inetSocketAddress = new InetSocketAddress(party.getHostname(), party.getPort());
            boolean z = false;
            int i = 0;
            while (!z) {
                try {
                    SocketChannel open = SocketChannel.open();
                    open.socket().setTcpNoDelay(true);
                    open.connect(inetSocketAddress);
                    open.configureBlocking(true);
                    ByteBuffer allocate = ByteBuffer.allocate(1);
                    allocate.put((byte) this.conf.getMyId());
                    allocate.position(0);
                    while (allocate.hasRemaining()) {
                        open.write(allocate);
                    }
                    z = true;
                    hashMap.put(Integer.valueOf(myId), open);
                    logger.info("P{}: connected to {}", Integer.valueOf(this.conf.getMyId()), party);
                } catch (IOException e) {
                    i++;
                    Thread.sleep(1 << i);
                }
            }
        }
        return hashMap;
    }

    private ServerSocketChannel bindServer() {
        InetSocketAddress inetSocketAddress = new InetSocketAddress(this.conf.getMe().getPort());
        try {
            ServerSocketChannel open = ServerSocketChannel.open();
            open.bind((SocketAddress) inetSocketAddress);
            logger.info("P{}: bound at {}", Integer.valueOf(this.conf.getMyId()), inetSocketAddress);
            return open;
        } catch (IOException e) {
            throw new RuntimeException("Failed to bind to " + inetSocketAddress, e);
        }
    }

    private Map<Integer, SocketChannel> connectServer(ServerSocketChannel serverSocketChannel) throws IOException {
        HashMap hashMap = new HashMap(this.conf.getMyId() - 1);
        for (int i = 1; i < this.conf.getMyId(); i++) {
            try {
                SocketChannel accept = serverSocketChannel.accept();
                accept.socket().setTcpNoDelay(true);
                accept.configureBlocking(true);
                ByteBuffer allocate = ByteBuffer.allocate(1);
                while (allocate.hasRemaining()) {
                    accept.read(allocate);
                }
                allocate.position(0);
                byte b = allocate.get();
                hashMap.put(Integer.valueOf(b), accept);
                logger.info("P{}: accepted connection from {}", Integer.valueOf(this.conf.getMyId()), this.conf.getParty(b));
                hashMap.put(Integer.valueOf(b), accept);
            } finally {
                serverSocketChannel.close();
            }
        }
        return hashMap;
    }

    private void startCommunication(Map<Integer, SocketChannel> map) {
        this.communicationService = Executors.newFixedThreadPool((this.conf.noOfParties() - 1) * 2);
        for (Map.Entry<Integer, SocketChannel> entry : map.entrySet()) {
            int intValue = entry.getKey().intValue();
            SocketChannel value = entry.getValue();
            this.receivers.put(Integer.valueOf(intValue), new Receiver(value, this.communicationService));
            this.senders.put(Integer.valueOf(intValue), new Sender(value, this.communicationService));
        }
    }

    @Override // dk.alexandra.fresco.framework.network.Network
    public void send(int i, byte[] bArr) {
        if (i == this.conf.getMyId()) {
            this.selfQueue.add(bArr);
            return;
        }
        inRange(i);
        ExceptionConverter.safe(() -> {
            if (this.senders.get(Integer.valueOf(i)).isRunning()) {
                return null;
            }
            throw new RuntimeException("Sender not running");
        }, "P" + this.conf.getMyId() + ": Unable to send to P" + i);
        this.senders.get(Integer.valueOf(i)).queueMessage(bArr);
    }

    @Override // dk.alexandra.fresco.framework.network.Network
    public byte[] receive(int i) {
        if (i == this.conf.getMyId()) {
            return (byte[]) ExceptionConverter.safe(() -> {
                return this.selfQueue.take();
            }, "Receiving from self iterrupted");
        }
        inRange(i);
        byte[] bArr = null;
        while (true) {
            byte[] bArr2 = bArr;
            if (bArr2 != null) {
                return bArr2;
            }
            ExceptionConverter.safe(() -> {
                if (this.receivers.get(Integer.valueOf(i)).isRunning()) {
                    return null;
                }
                throw new RuntimeException("Receiver not running");
            }, "P" + this.conf.getMyId() + ": Unable to receive from P" + i);
            bArr = this.receivers.get(Integer.valueOf(i)).pollMessage(RECEIVE_TIMEOUT);
        }
    }

    private void inRange(int i) {
        if (0 >= i || i >= getNoOfParties() + 1) {
            throw new IllegalArgumentException("Party id " + i + " not in range 1 ... " + getNoOfParties());
        }
    }

    private void teardown() {
        if (!this.alive) {
            logger.info("P{}: Network already closed", Integer.valueOf(this.conf.getMyId()));
            return;
        }
        this.alive = false;
        if (this.conf.noOfParties() < 2) {
            logger.info("P{}: Network closed", Integer.valueOf(this.conf.getMyId()));
        } else {
            ExceptionConverter.safe(() -> {
                closeCommunication();
                logger.info("P{}: Network closed", Integer.valueOf(this.conf.getMyId()));
                return null;
            }, "Unable to properly close the network.");
        }
    }

    private void closeCommunication() {
        Iterator<Sender> it = this.senders.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().stop();
            } catch (Exception e) {
                logger.warn("P{}: A failed sender detected while closing network", Integer.valueOf(this.conf.getMyId()));
            }
        }
        Iterator<Receiver> it2 = this.receivers.values().iterator();
        while (it2.hasNext()) {
            try {
                it2.next().stop();
            } catch (Exception e2) {
                logger.warn("P{}: A failed receiver detected while closing network", Integer.valueOf(this.conf.getMyId()));
            }
        }
        for (SocketChannel socketChannel : this.channels) {
            ExceptionConverter.safe(() -> {
                socketChannel.close();
                return null;
            }, CoreConstants.EMPTY_STRING);
        }
        this.communicationService.shutdownNow();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        teardown();
    }

    @Override // dk.alexandra.fresco.framework.network.Network
    public int getNoOfParties() {
        return this.conf.noOfParties();
    }
}
