package org.cempaka.cyclone.protocol;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.cempaka.cyclone.protocol.payloads.Payload;
import org.cempaka.cyclone.utils.Preconditions;

/* loaded from: input_file:org/cempaka/cyclone/protocol/UdpDaemonChannel.class */
public class UdpDaemonChannel implements DaemonChannel {
    private final MessageEncoder messageEncoder = new MessageEncoder();
    private final Lock runningLock = new ReentrantLock();
    private final Queue<Consumer<Payload>> writeListeners = new ConcurrentLinkedQueue();
    private final Queue<Consumer<Exception>> failureListeners = new ConcurrentLinkedQueue();
    private final Queue<BiConsumer<Integer, Payload>> readListeners = new ConcurrentLinkedQueue();
    private final ExecutorService executorService = Executors.newSingleThreadExecutor(runnable -> {
        Thread thread = new Thread(runnable);
        thread.setName("UdpDaemonChannel");
        return thread;
    });
    private final ExecutorService listenersService = Executors.newSingleThreadExecutor(runnable -> {
        Thread thread = new Thread(runnable);
        thread.setName("UdpDaemonChannel-listeners");
        return thread;
    });
    private boolean running;
    private DatagramSocket socket;

    @Override // org.cempaka.cyclone.protocol.DaemonChannel
    public void connect(int i) throws SocketException {
        try {
            this.runningLock.lock();
            Preconditions.checkState(this.socket == null, "Channel already connected");
            this.socket = new DatagramSocket(i, InetAddress.getLoopbackAddress());
            this.running = true;
            this.executorService.submit(this::awaitPacket);
        } finally {
            this.runningLock.unlock();
        }
    }

    @Override // org.cempaka.cyclone.protocol.DaemonChannel
    public void connect() throws SocketException {
        try {
            this.runningLock.lock();
            Preconditions.checkState(this.socket == null, "Channel already connected");
            this.socket = new DatagramSocket();
            this.running = true;
        } finally {
            this.runningLock.unlock();
        }
    }

    private void awaitPacket() {
        while (this.running) {
            try {
                byte[] bArr = new byte[MessageEncoder.SIZE];
                DatagramPacket datagramPacket = new DatagramPacket(bArr, bArr.length);
                this.socket.receive(datagramPacket);
                Payload decode = this.messageEncoder.decode(datagramPacket.getData());
                this.readListeners.forEach(biConsumer -> {
                    this.listenersService.submit(() -> {
                        biConsumer.accept(Integer.valueOf(datagramPacket.getPort()), decode);
                    });
                });
            } catch (Exception e) {
                this.failureListeners.forEach(consumer -> {
                    consumer.accept(e);
                });
            }
        }
    }

    @Override // org.cempaka.cyclone.protocol.DaemonChannel
    public void write(Payload payload, int i) {
        Preconditions.checkState(this.socket != null, "socket not initialized");
        byte[] array = this.messageEncoder.encode(payload).array();
        try {
            this.socket.send(new DatagramPacket(array, array.length, InetAddress.getLoopbackAddress(), i));
            this.writeListeners.forEach(consumer -> {
                consumer.accept(payload);
            });
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    @Override // org.cempaka.cyclone.protocol.DaemonChannel
    public void addWriteListener(Consumer<Payload> consumer) {
        this.writeListeners.add(consumer);
    }

    @Override // org.cempaka.cyclone.protocol.DaemonChannel
    public void addReadListener(BiConsumer<Integer, Payload> biConsumer) {
        this.readListeners.add(biConsumer);
    }

    @Override // org.cempaka.cyclone.protocol.DaemonChannel
    public void addFailureListener(Consumer<Exception> consumer) {
        this.failureListeners.add(consumer);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            this.runningLock.lock();
            this.running = false;
            if (this.socket != null) {
                this.socket.close();
            }
            this.socket = null;
        } finally {
            this.runningLock.unlock();
        }
    }
}
