/*
 * Decompiled with CFR 0.152.
 */
package org.openremote.agent.protocol.io;

import dev.failsafe.Failsafe;
import dev.failsafe.Policy;
import dev.failsafe.RetryPolicy;
import dev.failsafe.RetryPolicyBuilder;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Future;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.validation.constraints.NotNull;
import org.openremote.agent.protocol.io.NettyIOClient;
import org.openremote.container.Container;
import org.openremote.model.asset.agent.ConnectionStatus;
import org.openremote.model.syslog.SyslogCategory;

public abstract class AbstractNettyIOClient<T, U extends SocketAddress>
implements NettyIOClient<T> {
    public static long RECONNECT_DELAY_INITIAL_MILLIS = 1000L;
    public static long RECONNECT_DELAY_MAX_MILLIS = 300000L;
    private static final Logger LOG = SyslogCategory.getLogger((SyslogCategory)SyslogCategory.PROTOCOL, AbstractNettyIOClient.class);
    protected final List<Consumer<T>> messageConsumers = new ArrayList<Consumer<T>>();
    protected final List<Consumer<ConnectionStatus>> connectionStatusConsumers = new CopyOnWriteArrayList<Consumer<ConnectionStatus>>();
    protected ConnectionStatus connectionStatus = ConnectionStatus.DISCONNECTED;
    protected ChannelFuture channelStartFuture;
    protected Channel channel;
    protected Bootstrap bootstrap;
    protected EventLoopGroup workerGroup;
    protected ScheduledExecutorService executorService = Container.EXECUTOR_SERVICE;
    protected CompletableFuture<Void> connectRetry;
    protected boolean permanentError;
    protected Supplier<ChannelHandler[]> encoderDecoderProvider;

    protected AbstractNettyIOClient() {
    }

    @Override
    public void setEncoderDecoderProvider(Supplier<ChannelHandler[]> encoderDecoderProvider) throws UnsupportedOperationException {
        this.encoderDecoderProvider = encoderDecoderProvider;
    }

    protected abstract Class<? extends Channel> getChannelClass();

    protected abstract EventLoopGroup getWorkerGroup();

    protected abstract ChannelFuture startChannel();

    protected void configureChannel() {
        this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)2000);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void connect() {
        AbstractNettyIOClient abstractNettyIOClient = this;
        synchronized (abstractNettyIOClient) {
            if (this.permanentError) {
                LOG.info("Unable to connect as permanent error has been set");
                return;
            }
            if (this.connectionStatus != ConnectionStatus.DISCONNECTED) {
                LOG.finer("Must be disconnected before calling connect: " + this.getClientUri());
                return;
            }
            LOG.fine("Connecting IO Client: " + this.getClientUri());
            this.onConnectionStatusChanged(ConnectionStatus.CONNECTING);
        }
        this.scheduleDoConnect(false);
    }

    protected void scheduleDoConnect(boolean immediate) {
        long delay = immediate ? 50L : Math.max(250L, RECONNECT_DELAY_INITIAL_MILLIS);
        long maxDelay = immediate ? 51L : Math.max(delay + 1L, Math.max(250L, RECONNECT_DELAY_MAX_MILLIS));
        RetryPolicy retryPolicy = ((RetryPolicyBuilder)RetryPolicy.builder().withJitter(Duration.ofMillis(delay)).withBackoff(Duration.ofMillis(delay), Duration.ofMillis(maxDelay)).handle(RuntimeException.class)).onRetryScheduled(execution -> LOG.fine("Re-connection scheduled in '" + execution.getDelay() + "' for: " + this.getClientUri())).onFailedAttempt(execution -> LOG.fine("Re-connection attempt failed '" + execution.getAttemptCount() + "' for: " + this.getClientUri())).withMaxRetries(Integer.MAX_VALUE).build();
        this.connectRetry = Failsafe.with((Policy)retryPolicy, (Policy[])new RetryPolicy[0]).with(this.executorService).runAsyncExecution(execution -> {
            LOG.fine("Connection attempt '" + (execution.getAttemptCount() + 1) + "' for: " + this.getClientUri());
            boolean success = this.doConnect().get();
            if (this.connectRetry.isCancelled()) {
                execution.recordResult(null);
                return;
            }
            if (success) {
                this.onConnectionStatusChanged(ConnectionStatus.CONNECTED);
                execution.recordResult(null);
            } else {
                this.doDisconnect();
                execution.recordFailure((Throwable)new RuntimeException("Connection attempt failed"));
            }
        });
    }

    protected java.util.concurrent.Future<Boolean> doConnect() {
        LOG.info("Establishing connection: " + this.getClientUri());
        if (this.workerGroup == null) {
            this.workerGroup = this.getWorkerGroup();
        }
        this.bootstrap = new Bootstrap();
        this.bootstrap.channel(this.getChannelClass());
        this.configureChannel();
        this.bootstrap.group(this.workerGroup);
        this.bootstrap.handler((ChannelHandler)new ChannelInitializer<Channel>(){

            public void initChannel(Channel channel) {
                AbstractNettyIOClient.this.initChannel(channel);
            }
        });
        this.channelStartFuture = this.startChannel();
        this.channel = this.channelStartFuture.channel();
        CompletableFuture<Boolean> connectedFuture = this.createConnectedFuture();
        this.channel.closeFuture().addListener(future -> {
            AbstractNettyIOClient abstractNettyIOClient = this;
            synchronized (abstractNettyIOClient) {
                if (this.connectionStatus == ConnectionStatus.CONNECTED) {
                    LOG.info("Connection closed un-expectedly: " + this.getClientUri());
                    this.onConnectionStatusChanged(ConnectionStatus.CONNECTING);
                    this.doDisconnect();
                    this.scheduleDoConnect(false);
                }
            }
        });
        return connectedFuture;
    }

    protected CompletableFuture<Boolean> createConnectedFuture() {
        CompletableFuture<Boolean> connectedFuture = new CompletableFuture<Boolean>();
        this.channelStartFuture.addListener(future -> this.onConnectedFutureComplete((Future<Void>)future, connectedFuture));
        return connectedFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void onConnectedFutureComplete(Future<? super Void> future, CompletableFuture<Boolean> connectedFuture) {
        AbstractNettyIOClient abstractNettyIOClient = this;
        synchronized (abstractNettyIOClient) {
            if (this.connectionStatus != ConnectionStatus.CONNECTING) {
                return;
            }
            if (future.isSuccess()) {
                LOG.log(Level.INFO, "Connected: " + this.getClientUri());
            } else if (future.cause() != null) {
                LOG.log(Level.WARNING, "Connection error: " + this.getClientUri(), future.cause());
            }
            connectedFuture.complete(future.isSuccess());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void disconnect() {
        AbstractNettyIOClient abstractNettyIOClient = this;
        synchronized (abstractNettyIOClient) {
            if (this.connectionStatus != ConnectionStatus.CONNECTED && this.connectionStatus != ConnectionStatus.CONNECTING) {
                LOG.finest("Already disconnected: " + this.getClientUri());
                return;
            }
            LOG.fine("Disconnecting IO client: " + this.getClientUri());
            this.onConnectionStatusChanged(ConnectionStatus.DISCONNECTING);
        }
        if (this.connectRetry != null) {
            this.connectRetry.cancel(false);
            try {
                this.connectRetry.get();
            }
            catch (InterruptedException | CancellationException | ExecutionException exception) {
            }
            catch (Exception e) {
                LOG.log(Level.FINE, "Failed to wait for connection retry future: " + this.getClientUri());
            }
            this.connectRetry = null;
        }
        this.doDisconnect();
        this.onConnectionStatusChanged(ConnectionStatus.DISCONNECTED);
    }

    protected void doDisconnect() {
        try {
            if (this.channelStartFuture != null) {
                this.channelStartFuture.cancel(true);
                this.channelStartFuture = null;
            }
            if (this.channel != null) {
                this.channel.disconnect();
                this.channel.close();
                this.channel = null;
            }
        }
        finally {
            if (this.workerGroup != null) {
                this.workerGroup.shutdownGracefully();
                this.workerGroup = null;
            }
        }
    }

    @Override
    public void sendMessage(T message) {
        if (this.connectionStatus != ConnectionStatus.CONNECTED) {
            return;
        }
        try {
            this.channel.writeAndFlush(message);
            LOG.finest("Message sent to server: " + this.getClientUri());
        }
        catch (Exception e) {
            LOG.log(Level.WARNING, "Message send failed: " + this.getClientUri(), e);
        }
    }

    @Override
    public ConnectionStatus getConnectionStatus() {
        return this.connectionStatus;
    }

    @Override
    public void addConnectionStatusConsumer(Consumer<ConnectionStatus> connectionStatusConsumer) {
        if (!this.connectionStatusConsumers.contains(connectionStatusConsumer)) {
            this.connectionStatusConsumers.add(connectionStatusConsumer);
        }
    }

    @Override
    public void removeConnectionStatusConsumer(Consumer<ConnectionStatus> connectionStatusConsumer) {
        this.connectionStatusConsumers.remove(connectionStatusConsumer);
    }

    @Override
    public void removeAllConnectionStatusConsumers() {
        this.connectionStatusConsumers.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addMessageConsumer(Consumer<T> messageConsumer) {
        List<Consumer<T>> list = this.messageConsumers;
        synchronized (list) {
            if (!this.messageConsumers.contains(messageConsumer)) {
                this.messageConsumers.add(messageConsumer);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeMessageConsumer(Consumer<T> messageConsumer) {
        List<Consumer<T>> list = this.messageConsumers;
        synchronized (list) {
            this.messageConsumers.remove(messageConsumer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeAllMessageConsumers() {
        List<Consumer<T>> list = this.messageConsumers;
        synchronized (list) {
            this.messageConsumers.clear();
        }
    }

    protected void initChannel(Channel channel) {
        this.addEncodersDecoders(channel);
    }

    protected void addEncodersDecoders(Channel channel) {
        ChannelHandler[] handlers;
        if (this.encoderDecoderProvider != null && (handlers = this.encoderDecoderProvider.get()) != null) {
            Arrays.stream(handlers).forEach(handler -> channel.pipeline().addLast(new ChannelHandler[]{handler}));
        }
    }

    protected void onMessageReceived(T message) {
        if (this.connectionStatus != ConnectionStatus.CONNECTED) {
            return;
        }
        LOG.finest("Message received notifying consumers: " + this.getClientUri());
        this.messageConsumers.forEach(consumer -> {
            try {
                consumer.accept(message);
            }
            catch (Exception e) {
                LOG.log(Level.WARNING, "Exception occurred in message handler: " + this.getClientUri(), e);
            }
        });
    }

    protected void onDecodeException(ChannelHandlerContext ctx, Throwable cause) {
        LOG.log(Level.SEVERE, "Exception occurred on in-bound message: " + this.getClientUri(), cause);
    }

    protected void onEncodeException(ChannelHandlerContext ctx, Throwable cause) {
        LOG.log(Level.SEVERE, "Exception occurred on out-bound message: " + this.getClientUri(), cause);
    }

    protected void onConnectionStatusChanged(ConnectionStatus connectionStatus) {
        this.connectionStatus = connectionStatus;
        this.executorService.submit(() -> this.connectionStatusConsumers.forEach(consumer -> {
            try {
                consumer.accept(connectionStatus);
            }
            catch (Exception e) {
                LOG.log(Level.WARNING, "Connection status change handler threw an exception: " + this.getClientUri(), e);
            }
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void setPermanentError(String message) {
        AbstractNettyIOClient abstractNettyIOClient = this;
        synchronized (abstractNettyIOClient) {
            if (this.permanentError) {
                return;
            }
            LOG.info("An unrecoverable error has occurred with client '" + this.getClientUri() + "' is no longer usable: " + message);
            this.permanentError = true;
        }
        this.disconnect();
    }

    public String toString() {
        return this.getClientUri();
    }

    public static class MessageToByteEncoder<T>
    extends io.netty.handler.codec.MessageToByteEncoder<T> {
        protected AbstractNettyIOClient<T, ?> client;
        protected BiConsumer<T, ByteBuf> encoder;

        public MessageToByteEncoder(Class<? extends T> typeClazz, AbstractNettyIOClient<T, ?> client, BiConsumer<T, ByteBuf> encoder) {
            super(typeClazz);
            this.client = client;
            this.encoder = encoder;
        }

        protected void encode(ChannelHandlerContext ctx, T msg, ByteBuf out) {
            this.encoder.accept(msg, out);
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            super.exceptionCaught(ctx, cause);
            this.client.onEncodeException(ctx, cause);
        }
    }

    public static class MessageToMessageDecoder<T>
    extends SimpleChannelInboundHandler<T> {
        protected AbstractNettyIOClient<T, ?> client;

        public MessageToMessageDecoder(Class<? extends T> typeClazz, AbstractNettyIOClient<T, ?> client) {
            super(typeClazz);
            this.client = client;
        }

        public void channelRead0(ChannelHandlerContext ctx, T msg) throws Exception {
            this.client.onMessageReceived(msg);
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            super.exceptionCaught(ctx, cause);
            this.client.onDecodeException(ctx, cause);
        }
    }

    public static class ByteToMessageDecoder<T>
    extends io.netty.handler.codec.ByteToMessageDecoder {
        protected List<T> messages = new ArrayList<T>(1);
        protected AbstractNettyIOClient<T, ?> client;
        protected BiConsumer<ByteBuf, List<T>> decoder;

        public ByteToMessageDecoder(AbstractNettyIOClient<T, ?> client, @NotNull BiConsumer<ByteBuf, List<T>> decoder) {
            this.client = client;
            this.decoder = decoder;
        }

        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
            this.decoder.accept(in, this.messages);
            if (!this.messages.isEmpty()) {
                this.messages.forEach(m -> this.client.onMessageReceived(m));
                this.messages.clear();
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            super.exceptionCaught(ctx, cause);
            this.client.onDecodeException(ctx, cause);
        }
    }
}

