package org.mbari.vcr4j.sharktopoda.client.udp;

import com.google.gson.FieldNamingPolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.subjects.Subject;
import java.lang.System;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.mbari.vcr4j.sharktopoda.client.gson.DurationConverter;
import org.mbari.vcr4j.sharktopoda.client.model.GenericCommand;
import org.mbari.vcr4j.sharktopoda.client.model.GenericResponse;

/* loaded from: input_file:org/mbari/vcr4j/sharktopoda/client/udp/UdpIO.class */
class UdpIO {
    private final int port;
    private DatagramSocket server;
    private static final System.Logger log = System.getLogger(UdpIO.class.getName());
    private final Thread receiverThread;
    private final Subject<GenericResponse> responseSubject;
    private volatile boolean ok = true;
    private final Gson gson = newGson();
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private final Subject<GenericCommand> commandSubject = PublishSubject.create().toSerialized();

    public UdpIO(int i) {
        this.port = i;
        Scheduler from = Schedulers.from(this.executor);
        this.responseSubject = PublishSubject.create().toSerialized();
        this.responseSubject.subscribeOn(from).subscribe(this::doResponse);
        this.receiverThread = buildReceiverThread();
        this.receiverThread.setDaemon(true);
        this.receiverThread.start();
    }

    public void close() {
        if (this.ok) {
            this.ok = false;
            this.executor.shutdown();
            this.commandSubject.onComplete();
            this.responseSubject.onComplete();
        }
    }

    private void doResponse(GenericResponse genericResponse) {
        if (genericResponse.isResponseExpected()) {
            try {
                DatagramSocket server = getServer();
                byte[] bytes = this.gson.toJson(genericResponse).getBytes();
                log.log(System.Logger.Level.DEBUG, "Sending >>> " + new String(bytes));
                server.send(new DatagramPacket(bytes, bytes.length, genericResponse.getPacketAddress(), genericResponse.getPacketPort()));
            } catch (Exception e) {
                log.log(System.Logger.Level.ERROR, "UDP response failed", e);
            }
        }
    }

    private Thread buildReceiverThread() {
        return new Thread(() -> {
            byte[] bArr = new byte[4096];
            DatagramPacket datagramPacket = new DatagramPacket(bArr, bArr.length);
            while (this.ok) {
                try {
                    getServer().receive(datagramPacket);
                    String str = new String(datagramPacket.getData(), 0, datagramPacket.getLength());
                    log.log(System.Logger.Level.DEBUG, "Received <<< " + str);
                    GenericCommand genericCommand = (GenericCommand) this.gson.fromJson(str, GenericCommand.class);
                    genericCommand.setPacketAddress(datagramPacket.getAddress());
                    genericCommand.setPacketPort(datagramPacket.getPort());
                    this.commandSubject.onNext(genericCommand);
                } catch (Exception e) {
                    log.log(System.Logger.Level.INFO, "Error while reading UDP datagram", e);
                    if (!this.server.isClosed()) {
                        this.server.close();
                    }
                    if (this.server != null) {
                        this.server = null;
                    }
                }
            }
            if (this.server != null) {
                this.server.close();
            }
            log.log(System.Logger.Level.INFO, "Shutting down UDP server");
        });
    }

    private DatagramSocket getServer() throws SocketException {
        if (this.server == null || this.server.isClosed()) {
            this.server = new DatagramSocket(this.port);
        }
        return this.server;
    }

    public static Gson newGson() {
        return new GsonBuilder().setPrettyPrinting().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).setDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'").registerTypeAdapter(Duration.class, new DurationConverter()).create();
    }

    public Subject<GenericResponse> getResponseSubject() {
        return this.responseSubject;
    }

    public Subject<GenericCommand> getCommandSubject() {
        return this.commandSubject;
    }
}
