/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.mantis.remote.observable;

import com.mantisrx.common.utils.NettyUtils;
import io.mantisrx.common.MantisGroup;
import io.mantisrx.common.codec.Decoder;
import io.mantisrx.common.codec.Encoder;
import io.mantisrx.server.core.ServiceRegistry;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.compression.JdkZlibDecoder;
import io.netty.handler.codec.compression.JdkZlibEncoder;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.reactivex.mantis.remote.observable.BatchedRxEventPipelineConfigurator;
import io.reactivex.mantis.remote.observable.ConnectToConfig;
import io.reactivex.mantis.remote.observable.ConnectToGroupedObservable;
import io.reactivex.mantis.remote.observable.ConnectToObservable;
import io.reactivex.mantis.remote.observable.HeartbeatHandler;
import io.reactivex.mantis.remote.observable.RemoteRxConnection;
import io.reactivex.mantis.remote.observable.RemoteRxEvent;
import io.reactivex.mantis.remote.observable.RemoteRxServer;
import io.reactivex.mantis.remote.observable.RemoteUnsubscribe;
import io.reactivex.mantis.remote.observable.RxMetrics;
import io.reactivex.mantis.remote.observable.ServeObservable;
import io.reactivex.mantis.remote.observable.ThrowableWithCount;
import io.reactivex.mantis.remote.observable.ingress.IngressPolicies;
import io.reactivex.mantis.remote.observable.ingress.IngressPolicy;
import io.reactivx.mantis.operators.DropOperator;
import io.reactivx.mantis.operators.GroupedObservableUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.TimeUnit;
import mantis.io.reactivex.netty.RxNetty;
import mantis.io.reactivex.netty.channel.ObservableConnection;
import mantis.io.reactivex.netty.pipeline.PipelineConfigurator;
import mantis.io.reactivex.netty.pipeline.PipelineConfiguratorComposite;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observables.GroupedObservable;

public class RemoteObservable {
    private static final Logger logger = LoggerFactory.getLogger(RemoteObservable.class);
    private static boolean enableHeartBeating = true;
    private static boolean enableNettyLogging = false;
    private static boolean enableCompression = true;
    private static int maxFrameLength = 0x500000;

    private RemoteObservable() {
    }

    private static void loadFastProperties() {
        String maxFrameLengthStr;
        String enableCompressionStr;
        String enableNettyLoggingStr;
        String enableHeartBeatingStr = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.netty.enableHeartBeating", "true");
        if (enableHeartBeatingStr.equals("false")) {
            enableHeartBeating = false;
        }
        if ((enableNettyLoggingStr = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.netty.enableLogging", "false")).equals("true")) {
            enableNettyLogging = true;
        }
        if ((enableCompressionStr = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.netty.enableCompression", "true")).equals("false")) {
            enableCompression = false;
        }
        if ((maxFrameLengthStr = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.netty.maxFrameLength", "5242880")) != null && maxFrameLengthStr.length() > 0) {
            maxFrameLength = Integer.parseInt(maxFrameLengthStr);
        }
    }

    private static Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> retryLogic(final ConnectToConfig params) {
        return new Func1<Observable<? extends Throwable>, Observable<?>>(){

            public Observable<?> call(Observable<? extends Throwable> attempts) {
                return attempts.zipWith(Observable.range((int)1, (int)params.getSubscribeAttempts()), (Func2)new Func2<Throwable, Integer, ThrowableWithCount>(){

                    public ThrowableWithCount call(Throwable t1, Integer retryAttempt) {
                        return new ThrowableWithCount(t1, retryAttempt);
                    }
                }).flatMap(new Func1<ThrowableWithCount, Observable<?>>(){

                    public Observable<?> call(ThrowableWithCount notificationWithCount) {
                        Throwable t;
                        logger.debug("Failed to subscribe to remote observable: " + params.getName(), notificationWithCount.getThrowable());
                        logger.info("Failed to subscribe to remote observable: " + params.getName() + " at host: " + params.getHost() + " on port: " + params.getPort() + " subscribe attempt: " + notificationWithCount.getCount() + " of: " + params.getSubscribeAttempts());
                        if (notificationWithCount.getCount().intValue() == params.getSubscribeAttempts() && (t = notificationWithCount.getThrowable()) != null) {
                            return Observable.error((Throwable)notificationWithCount.getThrowable());
                        }
                        return Observable.timer((long)notificationWithCount.getCount().intValue(), (TimeUnit)TimeUnit.SECONDS);
                    }
                });
            }
        };
    }

    public static <T> RemoteRxConnection<T> connect(final ConnectToObservable<T> params) {
        final RxMetrics metrics = new RxMetrics();
        return new RemoteRxConnection(Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<T>(){

            public void call(Subscriber<? super T> subscriber) {
                RemoteUnsubscribe remoteUnsubscribe = new RemoteUnsubscribe(params.getName());
                subscriber.add((Subscription)remoteUnsubscribe);
                RemoteObservable.createTcpConnectionToServer(params, remoteUnsubscribe, metrics, params.getConnectionDisconnectCallback(), (Observable<Integer>)params.getCloseTrigger()).subscribe(subscriber);
            }
        }), metrics, (Observer<Integer>)params.getCloseTrigger());
    }

    public static <K, V> RemoteRxConnection<GroupedObservable<K, V>> connect(final ConnectToGroupedObservable<K, V> config) {
        final RxMetrics metrics = new RxMetrics();
        return new RemoteRxConnection<GroupedObservable<K, V>>(Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<GroupedObservable<K, V>>(){

            public void call(Subscriber<? super GroupedObservable<K, V>> subscriber) {
                RemoteUnsubscribe remoteUnsubscribe = new RemoteUnsubscribe(config.getName());
                subscriber.add((Subscription)remoteUnsubscribe);
                RemoteObservable.createTcpConnectionToServer(config, remoteUnsubscribe, metrics, config.getConnectionDisconnectCallback(), (Observable<Integer>)config.getCloseTrigger()).retryWhen(RemoteObservable.retryLogic(config)).subscribe(subscriber);
            }
        }), metrics, (Observer<Integer>)config.getCloseTrigger());
    }

    public static <K, V> RemoteRxConnection<MantisGroup<K, V>> connectToMGO(final ConnectToGroupedObservable<K, V> config, final SpscArrayQueue<MantisGroup<?, ?>> inputQueue) {
        final RxMetrics metrics = new RxMetrics();
        return new RemoteRxConnection<MantisGroup<K, V>>(Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<MantisGroup<K, V>>(){

            public void call(Subscriber<? super MantisGroup<K, V>> subscriber) {
                RemoteUnsubscribe remoteUnsubscribe = new RemoteUnsubscribe(config.getName());
                subscriber.add((Subscription)remoteUnsubscribe);
                RemoteObservable.createTcpConnectionToGOServer(config, remoteUnsubscribe, metrics, config.getConnectionDisconnectCallback(), (Observable<Integer>)config.getCloseTrigger(), inputQueue).retryWhen(RemoteObservable.retryLogic(config)).subscribe(subscriber);
            }
        }), metrics, (Observer<Integer>)config.getCloseTrigger());
    }

    public static <T> Observable<T> connect(String host, int port, Decoder<T> decoder) {
        return RemoteObservable.connect(new ConnectToObservable.Builder().host(host).port(port).decoder(decoder).build()).getObservable();
    }

    private static <K, V> Observable<GroupedObservable<K, V>> createTcpConnectionToServer(final ConnectToGroupedObservable<K, V> params, final RemoteUnsubscribe remoteUnsubscribe, final RxMetrics metrics, final Action0 connectionDisconnectCallback, Observable<Integer> closeTrigger) {
        final Decoder<K> keyDecoder = params.getKeyDecoder();
        final Decoder<V> valueDecoder = params.getValueDecoder();
        RemoteObservable.loadFastProperties();
        return RxNetty.createTcpClient((String)params.getHost(), (int)params.getPort(), (PipelineConfigurator)new PipelineConfiguratorComposite(new PipelineConfigurator[]{new PipelineConfigurator<RemoteRxEvent, List<RemoteRxEvent>>(){

            public void configureNewPipeline(ChannelPipeline pipeline) {
                if (enableNettyLogging) {
                    pipeline.addFirst(new ChannelHandler[]{new LoggingHandler(LogLevel.ERROR)});
                }
                if (enableHeartBeating) {
                    pipeline.addLast("idleStateHandler", (ChannelHandler)new IdleStateHandler(10, 2, 0));
                    pipeline.addLast("heartbeat", (ChannelHandler)new HeartbeatHandler());
                }
                if (enableCompression) {
                    pipeline.addLast("gzipInflater", (ChannelHandler)new JdkZlibEncoder(ZlibWrapper.GZIP));
                    pipeline.addLast("gzipDeflater", (ChannelHandler)new JdkZlibDecoder(ZlibWrapper.GZIP));
                }
                pipeline.addLast("frameEncoder", (ChannelHandler)new LengthFieldPrepender(4));
                pipeline.addLast("frameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0, 4));
            }
        }, new BatchedRxEventPipelineConfigurator()})).connect().flatMap((Func1)new Func1<ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>>, Observable<RemoteRxEvent>>(){

            public Observable<RemoteRxEvent> call(ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection) {
                connection.writeAndFlush(RemoteRxEvent.subscribed(params.getName(), params.getSubscribeParameters()));
                remoteUnsubscribe.setConnection(connection);
                return connection.getInput().lift((Observable.Operator)new DropOperator("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups"));
            }
        }).doOnCompleted(new Action0(){

            public void call() {
                logger.warn("Detected connection completed when trying to connect to host: " + params.getHost() + " port: " + params.getPort());
                connectionDisconnectCallback.call();
            }
        }).onErrorResumeNext((Func1)new Func1<Throwable, Observable<RemoteRxEvent>>(){

            public Observable<RemoteRxEvent> call(Throwable t1) {
                logger.warn("Detected connection error when trying to connect to host: " + params.getHost() + " port: " + params.getPort(), t1);
                connectionDisconnectCallback.call();
                return Observable.empty();
            }
        }).takeUntil(closeTrigger).map((Func1)new Func1<RemoteRxEvent, Notification<byte[]>>(){

            public Notification<byte[]> call(RemoteRxEvent rxEvent) {
                if (rxEvent.getType() == RemoteRxEvent.Type.next) {
                    metrics.incrementNextCount();
                    return Notification.createOnNext((Object)rxEvent.getData());
                }
                if (rxEvent.getType() == RemoteRxEvent.Type.error) {
                    metrics.incrementErrorCount();
                    return Notification.createOnError((Throwable)RemoteObservable.fromBytesToThrowable(rxEvent.getData()));
                }
                if (rxEvent.getType() == RemoteRxEvent.Type.completed) {
                    metrics.incrementCompletedCount();
                    return Notification.createOnCompleted();
                }
                throw new RuntimeException("RemoteRxEvent of type:" + (Object)((Object)rxEvent.getType()) + ", not supported.");
            }
        }).dematerialize().groupBy(new Func1<byte[], K>(){

            public K call(byte[] bytes) {
                ByteBuffer buff = ByteBuffer.wrap(bytes);
                buff.get();
                int keyLength = buff.getInt();
                byte[] key = new byte[keyLength];
                buff.get(key);
                return keyDecoder.decode(key);
            }
        }, new Func1<byte[], Notification<V>>(){

            public Notification<V> call(byte[] bytes) {
                ByteBuffer buff = ByteBuffer.wrap(bytes);
                byte notificationType = buff.get();
                if (notificationType == 1) {
                    int keyLength = buff.getInt();
                    int end = buff.limit();
                    int dataLength = end - 4 - 1 - keyLength;
                    byte[] valueBytes = new byte[dataLength];
                    buff.position(5 + keyLength);
                    buff.get(valueBytes, 0, dataLength);
                    Object value = valueDecoder.decode(valueBytes);
                    return Notification.createOnNext((Object)value);
                }
                if (notificationType == 2) {
                    return Notification.createOnCompleted();
                }
                if (notificationType == 3) {
                    int keyLength = buff.getInt();
                    int end = buff.limit();
                    int dataLength = end - 4 - 1 - keyLength;
                    byte[] errorBytes = new byte[dataLength];
                    buff.position(5 + keyLength);
                    buff.get(errorBytes, 0, dataLength);
                    return Notification.createOnError((Throwable)RemoteObservable.fromBytesToThrowable(errorBytes));
                }
                throw new RuntimeException("Notification encoding not support: " + notificationType);
            }
        }).map(new Func1<GroupedObservable<K, Notification<V>>, GroupedObservable<K, V>>(){

            public GroupedObservable<K, V> call(GroupedObservable<K, Notification<V>> group) {
                return GroupedObservableUtils.createGroupedObservable((Object)group.getKey(), (Observable)group.dematerialize());
            }
        }).doOnEach(new Observer<GroupedObservable<K, V>>(){

            public void onCompleted() {
                logger.info("RemoteRxEvent, name: {} onCompleted()", (Object)params.getName());
            }

            public void onError(Throwable e) {
                logger.error("RemoteRxEvent, name: {} onError()", (Object)params.getName(), (Object)e);
            }

            public void onNext(GroupedObservable<K, V> group) {
                if (logger.isDebugEnabled()) {
                    logger.debug("RemoteRxEvent, name: {} new key: {}", (Object)params.getName(), group.getKey());
                }
            }
        });
    }

    private static <K, V> Observable<MantisGroup<K, V>> createTcpConnectionToGOServer(final ConnectToGroupedObservable<K, V> params, final RemoteUnsubscribe remoteUnsubscribe, final RxMetrics metrics, final Action0 connectionDisconnectCallback, Observable<Integer> closeTrigger, SpscArrayQueue<MantisGroup<?, ?>> inputQueue) {
        final Decoder<K> keyDecoder = params.getKeyDecoder();
        final Decoder<V> valueDecoder = params.getValueDecoder();
        RemoteObservable.loadFastProperties();
        return RxNetty.createTcpClient((String)params.getHost(), (int)params.getPort(), (PipelineConfigurator)new PipelineConfiguratorComposite(new PipelineConfigurator[]{new PipelineConfigurator<RemoteRxEvent, List<RemoteRxEvent>>(){

            public void configureNewPipeline(ChannelPipeline pipeline) {
                if (enableNettyLogging) {
                    pipeline.addFirst(new ChannelHandler[]{new LoggingHandler(LogLevel.ERROR)});
                }
                if (enableHeartBeating) {
                    pipeline.addLast("idleStateHandler", (ChannelHandler)new IdleStateHandler(10, 2, 0));
                    pipeline.addLast("heartbeat", (ChannelHandler)new HeartbeatHandler());
                }
                if (enableCompression) {
                    pipeline.addLast("gzipInflater", (ChannelHandler)new JdkZlibEncoder(ZlibWrapper.GZIP));
                    pipeline.addLast("gzipDeflater", (ChannelHandler)new JdkZlibDecoder(ZlibWrapper.GZIP));
                }
                pipeline.addLast("frameEncoder", (ChannelHandler)new LengthFieldPrepender(4));
                pipeline.addLast("frameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0, 4));
            }
        }, new BatchedRxEventPipelineConfigurator()})).connect().flatMap((Func1)new Func1<ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>>, Observable<RemoteRxEvent>>(){

            public Observable<RemoteRxEvent> call(ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection) {
                connection.writeAndFlush(RemoteRxEvent.subscribed(params.getName(), params.getSubscribeParameters()));
                remoteUnsubscribe.setConnection(connection);
                return connection.getInput().lift((Observable.Operator)new DropOperator("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups"));
            }
        }).doOnCompleted(new Action0(){

            public void call() {
                logger.warn("Detected connection completed when trying to connect to host: " + params.getHost() + " port: " + params.getPort());
                connectionDisconnectCallback.call();
            }
        }).onErrorResumeNext((Func1)new Func1<Throwable, Observable<RemoteRxEvent>>(){

            public Observable<RemoteRxEvent> call(Throwable t1) {
                logger.warn("Detected connection error when trying to connect to host: " + params.getHost() + " port: " + params.getPort(), t1);
                connectionDisconnectCallback.call();
                return Observable.empty();
            }
        }).takeUntil(closeTrigger).filter((Func1)new Func1<RemoteRxEvent, Boolean>(){

            public Boolean call(RemoteRxEvent rxEvent) {
                return rxEvent.getType() == RemoteRxEvent.Type.next;
            }
        }).map((Func1)new Func1<RemoteRxEvent, Notification<byte[]>>(){

            public Notification<byte[]> call(RemoteRxEvent rxEvent) {
                metrics.incrementNextCount();
                return Notification.createOnNext((Object)rxEvent.getData());
            }
        }).dematerialize().map(new Func1<byte[], MantisGroup<K, V>>(){

            public MantisGroup<K, V> call(byte[] bytes) {
                ByteBuffer buff = ByteBuffer.wrap(bytes);
                buff.get();
                int keyLength = buff.getInt();
                byte[] key = new byte[keyLength];
                buff.get(key);
                Object keyVal = keyDecoder.decode(key);
                Object value = null;
                buff = ByteBuffer.wrap(bytes);
                byte notificationType = buff.get();
                if (notificationType != 1) {
                    throw new RuntimeException("Notification encoding not support: " + notificationType);
                }
                int end = buff.limit();
                int dataLength = end - 4 - 1 - keyLength;
                byte[] valueBytes = new byte[dataLength];
                buff.position(5 + keyLength);
                buff.get(valueBytes, 0, dataLength);
                value = valueDecoder.decode(valueBytes);
                return new MantisGroup(keyVal, value);
            }
        }).doOnEach(new Observer<MantisGroup<K, V>>(){

            public void onCompleted() {
                logger.info("RemoteRxEvent, name: " + params.getName() + " onCompleted()");
            }

            public void onError(Throwable e) {
                logger.error("RemoteRxEvent, name: " + params.getName() + " onError()", e);
            }

            public void onNext(MantisGroup<K, V> group) {
                if (logger.isDebugEnabled()) {
                    logger.debug("RemoteRxEvent, name: " + params.getName() + " new key: " + group.getKeyValue());
                }
            }
        });
    }

    private static <T> Observable<T> createTcpConnectionToServer(final ConnectToObservable<T> params, final RemoteUnsubscribe remoteUnsubscribe, final RxMetrics metrics, final Action0 connectionDisconnectCallback, Observable<Integer> closeTrigger) {
        final Decoder<T> decoder = params.getDecoder();
        RemoteObservable.loadFastProperties();
        return RxNetty.createTcpClient((String)params.getHost(), (int)params.getPort(), (PipelineConfigurator)new PipelineConfiguratorComposite(new PipelineConfigurator[]{new PipelineConfigurator<RemoteRxEvent, List<RemoteRxEvent>>(){

            public void configureNewPipeline(ChannelPipeline pipeline) {
                if (enableNettyLogging) {
                    pipeline.addFirst(new ChannelHandler[]{new LoggingHandler(LogLevel.ERROR)});
                }
                if (enableHeartBeating) {
                    pipeline.addLast("idleStateHandler", (ChannelHandler)new IdleStateHandler(10, 2, 0));
                    pipeline.addLast("heartbeat", (ChannelHandler)new HeartbeatHandler());
                }
                if (enableCompression) {
                    pipeline.addLast("gzipInflater", (ChannelHandler)new JdkZlibEncoder(ZlibWrapper.GZIP));
                    pipeline.addLast("gzipDeflater", (ChannelHandler)new JdkZlibDecoder(ZlibWrapper.GZIP));
                }
                pipeline.addLast("frameEncoder", (ChannelHandler)new LengthFieldPrepender(4));
                pipeline.addLast("frameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(maxFrameLength, 0, 4, 0, 4));
            }
        }, new BatchedRxEventPipelineConfigurator()})).connect().flatMap((Func1)new Func1<ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>>, Observable<RemoteRxEvent>>(){

            public Observable<RemoteRxEvent> call(ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection) {
                connection.writeAndFlush(RemoteRxEvent.subscribed(params.getName(), params.getSubscribeParameters()));
                remoteUnsubscribe.setConnection(connection);
                return connection.getInput().lift((Observable.Operator)new DropOperator("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServer"));
            }
        }).doOnCompleted(new Action0(){

            public void call() {
                logger.warn("Detected connection completed when trying to connect to host: " + params.getHost() + " port: " + params.getPort());
                connectionDisconnectCallback.call();
            }
        }).onErrorResumeNext((Func1)new Func1<Throwable, Observable<RemoteRxEvent>>(){

            public Observable<RemoteRxEvent> call(Throwable t1) {
                logger.warn("Detected connection error when trying to connect to host: " + params.getHost() + " port: " + params.getPort(), t1);
                connectionDisconnectCallback.call();
                return Observable.empty();
            }
        }).takeUntil(closeTrigger).map(new Func1<RemoteRxEvent, Notification<T>>(){

            public Notification<T> call(RemoteRxEvent rxEvent) {
                if (rxEvent.getType() == RemoteRxEvent.Type.next) {
                    metrics.incrementNextCount();
                    return Notification.createOnNext((Object)decoder.decode(rxEvent.getData()));
                }
                if (rxEvent.getType() == RemoteRxEvent.Type.error) {
                    metrics.incrementErrorCount();
                    return Notification.createOnError((Throwable)RemoteObservable.fromBytesToThrowable(rxEvent.getData()));
                }
                if (rxEvent.getType() == RemoteRxEvent.Type.completed) {
                    metrics.incrementCompletedCount();
                    return Notification.createOnCompleted();
                }
                throw new RuntimeException("RemoteRxEvent of type: " + (Object)((Object)rxEvent.getType()) + ", not supported.");
            }
        }).dematerialize().doOnEach(new Observer<T>(){

            public void onCompleted() {
                logger.info("RemoteRxEvent: " + params.getName() + " onCompleted()");
            }

            public void onError(Throwable e) {
                logger.error("RemoteRxEvent: " + params.getName() + " onError()", e);
            }

            public void onNext(T t) {
                if (logger.isDebugEnabled()) {
                    logger.debug("RemoteRxEvent: " + params.getName() + " onNext(): " + t);
                }
            }
        });
    }

    public static <T> RemoteRxServer serve(int port, Observable<T> observable, Encoder<T> encoder) {
        return new RemoteRxServer(RemoteObservable.configureServerFromParams(null, port, observable, encoder, IngressPolicies.allowAll()));
    }

    public static <T> RemoteRxServer serve(int port, String name, Observable<T> observable, Encoder<T> encoder) {
        return new RemoteRxServer(RemoteObservable.configureServerFromParams(name, port, observable, encoder, IngressPolicies.allowAll()));
    }

    private static <T> RemoteRxServer.Builder configureServerFromParams(String name, int port, Observable<T> observable, Encoder<T> encoder, IngressPolicy ingressPolicy) {
        return new RemoteRxServer.Builder().port(port).ingressPolicy(ingressPolicy).addObservable(new ServeObservable.Builder().name(name).encoder(encoder).observable(observable).subscriptionPerConnection().build());
    }

    static byte[] fromThrowableToBytes(Throwable t) {
        ByteArrayOutputStream baos = null;
        ObjectOutput out = null;
        try {
            Constructor<?> con = t.getClass().getConstructor(String.class);
            Throwable newInstance = (Throwable)con.newInstance(t.getMessage());
            newInstance.setStackTrace(t.getStackTrace());
            baos = new ByteArrayOutputStream();
            out = new ObjectOutputStream(baos);
            out.writeObject(newInstance);
        }
        catch (IOException | IllegalAccessException | IllegalArgumentException | InstantiationException | NoSuchMethodException | SecurityException | InvocationTargetException e) {
            logger.error("Failed to convert throwable to bytes", (Throwable)e);
            throw new RuntimeException(e);
        }
        finally {
            try {
                if (out != null) {
                    out.close();
                }
                if (baos != null) {
                    baos.close();
                }
            }
            catch (IOException e1) {
                e1.printStackTrace();
                throw new RuntimeException(e1);
            }
        }
        return baos.toByteArray();
    }

    static Throwable fromBytesToThrowable(byte[] bytes) {
        Throwable t = null;
        ByteArrayInputStream bis = null;
        ObjectInput in = null;
        try {
            bis = new ByteArrayInputStream(bytes);
            in = new ObjectInputStream(bis);
            t = (Throwable)in.readObject();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        catch (ClassNotFoundException e1) {
            throw new RuntimeException(e1);
        }
        finally {
            try {
                if (bis != null) {
                    bis.close();
                }
                if (in != null) {
                    in.close();
                }
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        return t;
    }

    static {
        NettyUtils.setNettyThreads();
    }
}

