/*
 * Decompiled with CFR 0.152.
 */
package org.openremote.agent.protocol.io;

import io.netty.bootstrap.AbstractBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.openremote.agent.protocol.io.IOServer;
import org.openremote.container.Container;
import org.openremote.model.asset.agent.ConnectionStatus;
import org.openremote.model.syslog.SyslogCategory;

public abstract class AbstractNettyIOServer<T, U extends Channel, V extends AbstractBootstrap<?, ?>, W extends SocketAddress>
implements IOServer<T, U, W> {
    protected static final int INITIAL_RECONNECT_DELAY_MILLIS = 1000;
    protected static final int MAX_RECONNECT_DELAY_MILLIS = 60000;
    protected static final int RECONNECT_BACKOFF_MULTIPLIER = 2;
    protected static final Logger LOG = SyslogCategory.getLogger((SyslogCategory)SyslogCategory.PROTOCOL, AbstractNettyIOServer.class);
    protected final ScheduledExecutorService executorService;
    protected int clientLimit = 0;
    protected V bootstrap;
    protected ConnectionStatus connectionStatus = ConnectionStatus.DISCONNECTED;
    protected ChannelFuture channelFuture;
    protected EventLoopGroup workerGroup;
    protected U channel;
    protected final ChannelGroup allChannels = new DefaultChannelGroup((EventExecutor)GlobalEventExecutor.INSTANCE);
    protected final List<IOServer.IoServerMessageConsumer<T, U, W>> messageConsumers = new ArrayList<IOServer.IoServerMessageConsumer<T, U, W>>();
    protected final List<Consumer<ConnectionStatus>> connectionStatusConsumers = new ArrayList<Consumer<ConnectionStatus>>();
    protected final List<BiConsumer<U, ConnectionStatus>> clientConnectionStatusConsumers = new ArrayList<BiConsumer<U, ConnectionStatus>>();
    protected ScheduledFuture<?> reconnectTask;
    protected int reconnectDelayMilliseconds = 1000;

    public AbstractNettyIOServer() {
        this.executorService = Container.EXECUTOR_SERVICE;
    }

    @Override
    public synchronized void start() {
        if (this.connectionStatus != ConnectionStatus.DISCONNECTED && this.connectionStatus != ConnectionStatus.WAITING) {
            LOG.finest("Must be disconnected before calling start: " + this.getSocketAddressString());
            return;
        }
        LOG.fine("Starting IO Server: " + this.getSocketAddressString());
        this.onConnectionStatusChanged(ConnectionStatus.CONNECTING);
        if (this.workerGroup == null) {
            this.workerGroup = new NioEventLoopGroup();
        }
        try {
            this.bootstrap = this.createAndConfigureBootstrap();
            this.bootstrap.handler((ChannelHandler)new ChannelInitializer<U>(){

                public void initChannel(U channel) {
                    AbstractNettyIOServer.this.initChannel(channel);
                }
            });
            this.channelFuture = this.bootstrap.bind().sync();
            this.channel = this.channelFuture.channel();
            this.channelFuture.addListener((GenericFutureListener)new ChannelFutureListener(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void operationComplete(ChannelFuture future) {
                    AbstractNettyIOServer abstractNettyIOServer = AbstractNettyIOServer.this;
                    synchronized (abstractNettyIOServer) {
                        AbstractNettyIOServer.this.channelFuture.removeListener((GenericFutureListener)this);
                        if (AbstractNettyIOServer.this.connectionStatus == ConnectionStatus.DISCONNECTING) {
                            return;
                        }
                        if (future.isSuccess()) {
                            LOG.log(Level.INFO, "Connection initialising: " + AbstractNettyIOServer.this.getSocketAddressString());
                            AbstractNettyIOServer.this.reconnectTask = null;
                            AbstractNettyIOServer.this.reconnectDelayMilliseconds = 1000;
                        } else if (future.cause() != null) {
                            LOG.log(Level.WARNING, "Connection error: " + AbstractNettyIOServer.this.getSocketAddressString(), future.cause());
                            AbstractNettyIOServer.this.scheduleReconnect();
                        }
                    }
                }
            });
            this.channel.closeFuture().addListener(future -> {
                if (this.connectionStatus != ConnectionStatus.DISCONNECTING) {
                    this.scheduleReconnect();
                }
            });
        }
        catch (Exception e) {
            LOG.log(Level.SEVERE, "An error occurred whilst starting the server so shutting down", e);
            this.stop();
        }
    }

    @Override
    public synchronized void stop() {
        if (this.connectionStatus == ConnectionStatus.DISCONNECTING || this.connectionStatus == ConnectionStatus.DISCONNECTED) {
            LOG.finest("Already stopping or stopped: " + this.getSocketAddressString());
            return;
        }
        LOG.fine("Stopping IO Server: " + this.getSocketAddressString());
        this.onConnectionStatusChanged(ConnectionStatus.DISCONNECTING);
        try {
            if (this.reconnectTask != null) {
                this.reconnectTask.cancel(false);
            }
            this.allChannels.close().sync();
            this.allChannels.clear();
            if (this.channelFuture != null) {
                this.channelFuture.cancel(true);
                this.channelFuture.sync();
                this.channelFuture = null;
            }
            if (this.channel != null) {
                this.channel.close().sync();
                this.channel = null;
            }
        }
        catch (InterruptedException interruptedException) {
        }
        finally {
            if (this.workerGroup != null) {
                this.workerGroup.shutdownGracefully();
                this.workerGroup = null;
            }
            this.onConnectionStatusChanged(ConnectionStatus.DISCONNECTED);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addMessageConsumer(IOServer.IoServerMessageConsumer<T, U, W> messageConsumer) {
        LOG.finest("Adding message consumer");
        List<IOServer.IoServerMessageConsumer<T, U, W>> list = this.messageConsumers;
        synchronized (list) {
            this.messageConsumers.add(messageConsumer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeMessageConsumer(IOServer.IoServerMessageConsumer<T, U, W> messageConsumer) {
        List<IOServer.IoServerMessageConsumer<T, U, W>> list = this.messageConsumers;
        synchronized (list) {
            this.messageConsumers.remove(messageConsumer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void removeAllMessageConsumers() {
        List<IOServer.IoServerMessageConsumer<T, U, W>> list = this.messageConsumers;
        synchronized (list) {
            this.messageConsumers.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addConnectionStatusConsumer(Consumer<ConnectionStatus> connectionStatusConsumer) {
        List<BiConsumer<U, ConnectionStatus>> list = this.clientConnectionStatusConsumers;
        synchronized (list) {
            this.connectionStatusConsumers.add(connectionStatusConsumer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeConnectionStatusConsumer(Consumer<ConnectionStatus> connectionStatusConsumer) {
        List<BiConsumer<U, ConnectionStatus>> list = this.clientConnectionStatusConsumers;
        synchronized (list) {
            this.connectionStatusConsumers.remove(connectionStatusConsumer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addConnectionStatusConsumer(BiConsumer<U, ConnectionStatus> connectionStatusConsumer) {
        List<BiConsumer<U, ConnectionStatus>> list = this.clientConnectionStatusConsumers;
        synchronized (list) {
            this.clientConnectionStatusConsumers.add(connectionStatusConsumer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeConnectionStatusConsumer(BiConsumer<U, ConnectionStatus> connectionStatusConsumer) {
        List<BiConsumer<U, ConnectionStatus>> list = this.clientConnectionStatusConsumers;
        synchronized (list) {
            this.clientConnectionStatusConsumers.remove(connectionStatusConsumer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeAllConnectionStatusConsumers() {
        List<Consumer<ConnectionStatus>> list = this.connectionStatusConsumers;
        synchronized (list) {
            this.connectionStatusConsumers.clear();
        }
    }

    @Override
    public ConnectionStatus getConnectionStatus() {
        return this.connectionStatus;
    }

    @Override
    public ConnectionStatus getConnectionStatus(U client) {
        return client.isActive() ? ConnectionStatus.CONNECTED : ConnectionStatus.DISCONNECTED;
    }

    @Override
    public void disconnectClient(U client) {
        LOG.finer("Disconnecting client: " + this.getClientDescriptor(client));
        client.close();
    }

    protected void initChannel(U channel) {
        channel.pipeline().addLast(new ChannelHandler[]{new ChannelInboundHandlerAdapter(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                AbstractNettyIOServer abstractNettyIOServer = AbstractNettyIOServer.this;
                synchronized (abstractNettyIOServer) {
                    LOG.fine("Connected: " + AbstractNettyIOServer.this.getSocketAddressString());
                    AbstractNettyIOServer.this.onConnectionStatusChanged(ConnectionStatus.CONNECTED);
                }
                super.channelActive(ctx);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                AbstractNettyIOServer abstractNettyIOServer = AbstractNettyIOServer.this;
                synchronized (abstractNettyIOServer) {
                    if (AbstractNettyIOServer.this.connectionStatus != ConnectionStatus.DISCONNECTING) {
                        return;
                    }
                    LOG.fine("Disconnected: " + AbstractNettyIOServer.this.getSocketAddressString());
                    AbstractNettyIOServer.this.onConnectionStatusChanged(ConnectionStatus.DISCONNECTED);
                }
                super.channelInactive(ctx);
            }
        }});
    }

    protected void initClientChannel(U channel) {
        LOG.fine("Client initialising: " + this.getClientDescriptor(channel));
        channel.pipeline().addLast(new ChannelHandler[]{new ChannelInboundHandlerAdapter((Channel)channel){
            final /* synthetic */ Channel val$channel;
            {
                this.val$channel = channel;
            }

            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                AbstractNettyIOServer.this.onClientConnected(this.val$channel);
                super.channelActive(ctx);
            }

            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                AbstractNettyIOServer.this.onClientDisconnected(this.val$channel);
                super.channelInactive(ctx);
            }

            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                AbstractNettyIOServer.this.onDecodeException(ctx, cause);
                super.exceptionCaught(ctx, cause);
            }
        }});
        channel.pipeline().addLast(new ChannelHandler[]{new ChannelOutboundHandlerAdapter(){

            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                AbstractNettyIOServer.this.onEncodeException(ctx, cause);
                super.exceptionCaught(ctx, cause);
            }
        }});
        this.addDecoders(channel);
        this.addEncoders(channel);
        channel.pipeline().addLast(new ChannelHandler[]{new SimpleChannelInboundHandler<T>((Channel)channel){
            final /* synthetic */ Channel val$channel;
            {
                this.val$channel = channel;
            }

            protected void channelRead0(ChannelHandlerContext ctx, T msg) {
                AbstractNettyIOServer.this.handleMessageReceived(this.val$channel, msg);
            }
        }});
    }

    protected void handleMessageReceived(U channel, T message) {
        this.onMessageReceived(message, channel, channel.remoteAddress());
    }

    protected void onClientDisconnected(U client) {
        LOG.fine("Client disconnected: " + this.getClientDescriptor(client));
        this.allChannels.remove(client);
        this.sendClientConnectionStatus(client, ConnectionStatus.DISCONNECTED);
    }

    protected void onClientConnected(U client) {
        LOG.fine("Client connected: " + this.getClientDescriptor(client));
        this.allChannels.add(client);
        this.sendClientConnectionStatus(client, ConnectionStatus.CONNECTED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected synchronized void onConnectionStatusChanged(ConnectionStatus connectionStatus) {
        this.connectionStatus = connectionStatus;
        List<Consumer<ConnectionStatus>> list = this.connectionStatusConsumers;
        synchronized (list) {
            this.connectionStatusConsumers.forEach(consumer -> consumer.accept(connectionStatus));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sendClientConnectionStatus(U channel, ConnectionStatus connectionStatus) {
        List<BiConsumer<U, ConnectionStatus>> list = this.clientConnectionStatusConsumers;
        synchronized (list) {
            this.clientConnectionStatusConsumers.forEach(statusConsumer -> statusConsumer.accept(channel, connectionStatus));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void onMessageReceived(T message, U channel, W sender) {
        List<IOServer.IoServerMessageConsumer<T, U, W>> list = this.messageConsumers;
        synchronized (list) {
            this.messageConsumers.forEach(messageConsumer -> messageConsumer.accept(message, channel, sender));
        }
    }

    protected void onDecodeException(ChannelHandlerContext ctx, Throwable cause) {
        LOG.log(Level.SEVERE, "Exception occurred on in-bound message: ", cause);
        ctx.channel().close();
    }

    protected void onEncodeException(ChannelHandlerContext ctx, Throwable cause) {
        LOG.log(Level.SEVERE, "Exception occurred on out-bound message: ", cause);
        ctx.channel().close();
    }

    @Override
    public void sendMessage(T message, U client) {
        try {
            client.writeAndFlush(message);
            LOG.finest("Message sent to client: " + this.getClientDescriptor(client));
        }
        catch (Exception e) {
            LOG.log(Level.WARNING, "Message send failed", e);
        }
    }

    @Override
    public void sendMessage(T message) {
        try {
            this.allChannels.writeAndFlush(message);
            LOG.finest("Message sent to all clients");
        }
        catch (Exception e) {
            LOG.log(Level.WARNING, "Message send failed", e);
        }
    }

    @Override
    public void sendMessage(T message, W recipient) {
        Channel client = this.allChannels.stream().filter(c -> Objects.equals(c.remoteAddress(), recipient)).findFirst().orElse(null);
        if (client == null) {
            LOG.warning("Couldn't find existing connection for recipient '" + recipient.toString() + "': " + this.getSocketAddressString());
            return;
        }
        this.sendMessage(message, (W)client);
    }

    protected synchronized void scheduleReconnect() {
        if (this.reconnectTask != null) {
            return;
        }
        this.onConnectionStatusChanged(ConnectionStatus.WAITING);
        if (this.reconnectDelayMilliseconds < 60000) {
            this.reconnectDelayMilliseconds *= 2;
            this.reconnectDelayMilliseconds = Math.min(60000, this.reconnectDelayMilliseconds);
        }
        LOG.finest("Scheduling reconnection in '" + this.reconnectDelayMilliseconds + "' milliseconds: " + this.getSocketAddressString());
        this.reconnectTask = this.executorService.schedule(() -> {
            AbstractNettyIOServer abstractNettyIOServer = this;
            synchronized (abstractNettyIOServer) {
                this.reconnectTask = null;
                if (this.connectionStatus != ConnectionStatus.DISCONNECTING && this.connectionStatus != ConnectionStatus.DISCONNECTED) {
                    this.start();
                }
            }
        }, (long)this.reconnectDelayMilliseconds, TimeUnit.MILLISECONDS);
    }

    protected abstract String getSocketAddressString();

    protected abstract V createAndConfigureBootstrap();

    protected abstract String getClientDescriptor(U var1);

    protected abstract void addDecoders(U var1);

    protected abstract void addEncoders(U var1);

    protected void addDecoder(U channel, ChannelInboundHandler decoder) {
        channel.pipeline().addLast(new ChannelHandler[]{decoder});
    }

    protected void addEncoder(U channel, ChannelOutboundHandler encoder) {
        channel.pipeline().addLast(new ChannelHandler[]{encoder});
    }
}

