package dk.alexandra.fresco.framework.network.async;

import dk.alexandra.fresco.framework.Party;
import dk.alexandra.fresco.framework.configuration.NetworkConfiguration;
import dk.alexandra.fresco.framework.network.CloseableNetwork;
import dk.alexandra.fresco.framework.util.ExceptionConverter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dk/alexandra/fresco/framework/network/async/AsyncNetwork.class */
public class AsyncNetwork implements CloseableNetwork {
    private static final int PARTY_ID_BYTES = 1;
    public static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofMinutes(1);
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) AsyncNetwork.class);
    private ServerSocketChannel server;
    private final Map<Integer, SocketChannel> channelMap;
    private final Map<Integer, BlockingQueue<byte[]>> outQueues;
    private final Map<Integer, BlockingQueue<byte[]>> inQueues;
    private final NetworkConfiguration conf;
    private final AtomicBoolean alive;
    private ExecutorService communicationService;
    private final Map<Integer, Future<Object>> sendFutures;
    private final Map<Integer, Future<Object>> receiveFutures;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dk/alexandra/fresco/framework/network/async/AsyncNetwork$Receiver.class */
    public class Receiver implements Callable<Object> {
        private final SocketChannel channel;
        private final BlockingQueue<byte[]> queue;

        public Receiver(SocketChannel socketChannel, BlockingQueue<byte[]> blockingQueue) {
            this.channel = socketChannel;
            this.queue = blockingQueue;
        }

        @Override // java.util.concurrent.Callable
        public Object call() throws IOException, InterruptedException {
            while (AsyncNetwork.this.alive.get()) {
                ByteBuffer allocate = ByteBuffer.allocate(4);
                while (allocate.hasRemaining()) {
                    this.channel.read(allocate);
                }
                allocate.flip();
                ByteBuffer allocate2 = ByteBuffer.allocate(allocate.getInt());
                while (allocate2.remaining() > 0) {
                    this.channel.read(allocate2);
                }
                this.queue.add(allocate2.array());
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dk/alexandra/fresco/framework/network/async/AsyncNetwork$Sender.class */
    public class Sender implements Callable<Object> {
        private final SocketChannel channel;
        private final BlockingQueue<byte[]> queue;

        public Sender(SocketChannel socketChannel, BlockingQueue<byte[]> blockingQueue) {
            this.channel = socketChannel;
            this.queue = blockingQueue;
        }

        @Override // java.util.concurrent.Callable
        public Object call() throws IOException, InterruptedException {
            while (true) {
                if (this.queue.isEmpty() && !AsyncNetwork.this.alive.get()) {
                    return null;
                }
                byte[] take = this.queue.take();
                ByteBuffer allocate = ByteBuffer.allocate(4 + take.length);
                allocate.putInt(take.length);
                allocate.put(take);
                allocate.position(0);
                while (allocate.hasRemaining()) {
                    this.channel.write(allocate);
                }
            }
        }
    }

    public AsyncNetwork(NetworkConfiguration networkConfiguration) {
        this(networkConfiguration, DEFAULT_CONNECTION_TIMEOUT);
    }

    public AsyncNetwork(NetworkConfiguration networkConfiguration, Duration duration) {
        this.conf = networkConfiguration;
        this.outQueues = new HashMap(networkConfiguration.noOfParties());
        this.inQueues = new HashMap(networkConfiguration.noOfParties());
        this.channelMap = new HashMap(networkConfiguration.noOfParties() - 1);
        this.sendFutures = new HashMap(networkConfiguration.noOfParties() - 1);
        this.receiveFutures = new HashMap(networkConfiguration.noOfParties() - 1);
        this.alive = new AtomicBoolean(true);
        for (int i = 1; i < networkConfiguration.noOfParties() + 1; i++) {
            this.outQueues.put(Integer.valueOf(i), new LinkedBlockingQueue());
            this.inQueues.put(Integer.valueOf(i), new LinkedBlockingQueue());
        }
        if (networkConfiguration.noOfParties() > 1) {
            connectNetwork(networkConfiguration, duration);
            startCommunication();
        }
        logger.info("P{} successfully connected network", Integer.valueOf(networkConfiguration.getMyId()));
    }

    private void connectNetwork(NetworkConfiguration networkConfiguration, Duration duration) {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(newFixedThreadPool);
        executorCompletionService.submit(() -> {
            return connectClient();
        });
        executorCompletionService.submit(() -> {
            bindServer();
            return connectServer();
        });
        try {
            for (int i = 0; i < 2; i++) {
                try {
                    Future poll = executorCompletionService.poll(duration.toMillis(), TimeUnit.MILLISECONDS);
                    if (poll == null) {
                        throw new TimeoutException("Timed out");
                    }
                    this.channelMap.putAll((Map) poll.get());
                } catch (InterruptedException e) {
                    teardown();
                    throw new RuntimeException("Interrupted while connecting network", e);
                } catch (ExecutionException e2) {
                    teardown();
                    throw new RuntimeException("Failed to connect network", e2.getCause());
                } catch (TimeoutException e3) {
                    teardown();
                    throw new RuntimeException("Timed out connecting network", e3);
                }
            }
        } finally {
            newFixedThreadPool.shutdownNow();
        }
    }

    private Map<Integer, SocketChannel> connectClient() throws InterruptedException {
        HashMap hashMap = new HashMap(this.conf.noOfParties() - this.conf.getMyId());
        for (int myId = this.conf.getMyId() + 1; myId <= this.conf.noOfParties(); myId++) {
            Party party = this.conf.getParty(myId);
            InetSocketAddress inetSocketAddress = new InetSocketAddress(party.getHostname(), party.getPort());
            boolean z = false;
            int i = 0;
            while (!z) {
                try {
                    SocketChannel open = SocketChannel.open();
                    open.connect(inetSocketAddress);
                    open.configureBlocking(true);
                    this.channelMap.put(Integer.valueOf(myId), open);
                    ByteBuffer allocate = ByteBuffer.allocate(1);
                    allocate.put((byte) this.conf.getMyId());
                    allocate.position(0);
                    while (allocate.hasRemaining()) {
                        open.write(allocate);
                    }
                    z = true;
                    hashMap.put(Integer.valueOf(myId), open);
                    logger.info("P{} connected to {}", Integer.valueOf(this.conf.getMyId()), party);
                } catch (IOException e) {
                    i++;
                    Thread.sleep(1 << i);
                }
            }
        }
        return hashMap;
    }

    private void bindServer() {
        InetSocketAddress inetSocketAddress = new InetSocketAddress(this.conf.getMe().getPort());
        try {
            this.server = ServerSocketChannel.open();
            this.server.bind((SocketAddress) inetSocketAddress);
            logger.info("P{} bound at {}", Integer.valueOf(this.conf.getMyId()), inetSocketAddress);
        } catch (IOException e) {
            throw new RuntimeException("Failed to bind to " + inetSocketAddress, e);
        }
    }

    private Map<Integer, SocketChannel> connectServer() throws IOException {
        HashMap hashMap = new HashMap(this.conf.getMyId() - 1);
        for (int i = 1; i < this.conf.getMyId(); i++) {
            SocketChannel accept = this.server.accept();
            accept.configureBlocking(true);
            ByteBuffer allocate = ByteBuffer.allocate(1);
            while (allocate.hasRemaining()) {
                accept.read(allocate);
            }
            allocate.position(0);
            byte b = allocate.get();
            this.channelMap.put(Integer.valueOf(b), accept);
            logger.info("P{} accepted connection from {}", Integer.valueOf(this.conf.getMyId()), this.conf.getParty(b));
            hashMap.put(Integer.valueOf(b), accept);
        }
        this.server.close();
        return hashMap;
    }

    private void startCommunication() {
        this.communicationService = Executors.newFixedThreadPool((this.conf.noOfParties() - 1) * 2);
        for (Map.Entry<Integer, SocketChannel> entry : this.channelMap.entrySet()) {
            int intValue = entry.getKey().intValue();
            SocketChannel value = entry.getValue();
            Future<Object> submit = this.communicationService.submit(new Receiver(value, this.inQueues.get(Integer.valueOf(intValue))));
            this.sendFutures.put(Integer.valueOf(intValue), this.communicationService.submit(new Sender(value, this.outQueues.get(Integer.valueOf(intValue)))));
            this.receiveFutures.put(Integer.valueOf(intValue), submit);
        }
    }

    @Override // dk.alexandra.fresco.framework.network.Network
    public void send(int i, byte[] bArr) {
        inRange(i);
        if (i != this.conf.getMyId() && this.sendFutures.get(Integer.valueOf(i)).isDone()) {
            try {
                this.sendFutures.get(Integer.valueOf(i)).get();
                throw new RuntimeException("Sender for P" + i + " not running. Unable to send");
            } catch (Exception e) {
                throw new RuntimeException("Sender for P" + i + " threw exception. Unable to send", e);
            }
        } else if (i == this.conf.getMyId()) {
            this.inQueues.get(Integer.valueOf(i)).add(bArr);
        } else {
            this.outQueues.get(Integer.valueOf(i)).add(bArr);
        }
    }

    @Override // dk.alexandra.fresco.framework.network.Network
    public byte[] receive(int i) {
        inRange(i);
        if (i == this.conf.getMyId() || !this.receiveFutures.get(Integer.valueOf(i)).isDone()) {
            return (byte[]) ExceptionConverter.safe(() -> {
                return this.inQueues.get(Integer.valueOf(i)).take();
            }, "Receive interrupted");
        }
        try {
            this.receiveFutures.get(Integer.valueOf(i)).get();
            throw new RuntimeException("Receiver for P" + i + " not running. Unable to receive");
        } catch (Exception e) {
            throw new RuntimeException("Receiver for P" + i + " threw exception. Unable to receive", e);
        }
    }

    private void inRange(int i) {
        if (0 >= i || i >= getNoOfParties() + 1) {
            throw new IllegalArgumentException("Party id " + i + " not in range 1 ... " + getNoOfParties());
        }
    }

    private void teardown() {
        if (!this.alive.get()) {
            logger.info("P{}: Network already closed", Integer.valueOf(this.conf.getMyId()));
            return;
        }
        this.alive.set(false);
        if (this.conf.noOfParties() < 2) {
            logger.info("P{}: Network closed", Integer.valueOf(this.conf.getMyId()));
        } else {
            ExceptionConverter.safe(() -> {
                closeThreads();
                closeChannels();
                logger.info("P{}: Network closed", Integer.valueOf(this.conf.getMyId()));
                return null;
            }, "Unable to properly close the network.");
        }
    }

    private void closeChannels() throws IOException {
        if (this.server != null) {
            this.server.close();
        }
        Iterator<SocketChannel> it = this.channelMap.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    private void closeThreads() {
        this.sendFutures.keySet().stream().filter(num -> {
            return !this.sendFutures.get(num).isDone();
        }).filter(num2 -> {
            return this.outQueues.get(num2).isEmpty();
        }).map(num3 -> {
            return this.outQueues.get(num3);
        }).forEach(blockingQueue -> {
            blockingQueue.add(new byte[0]);
        });
        Iterator<Future<Object>> it = this.sendFutures.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().get();
            } catch (Exception e) {
                logger.warn("A failed sender detected while closing network");
            }
        }
        Iterator it2 = ((List) this.receiveFutures.values().stream().filter(future -> {
            return future.isDone();
        }).collect(Collectors.toList())).iterator();
        while (it2.hasNext()) {
            try {
                ((Future) it2.next()).get();
            } catch (Exception e2) {
                logger.warn("A failed receiver detected while closing network");
            }
        }
        if (this.communicationService != null) {
            this.communicationService.shutdownNow();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        teardown();
    }

    @Override // dk.alexandra.fresco.framework.network.Network
    public int getNoOfParties() {
        return this.conf.noOfParties();
    }
}
