/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.mantis.remote.observable;

import io.mantisrx.common.codec.Encoder;
import io.mantisrx.common.network.WritableEndpoint;
import io.reactivex.mantis.remote.observable.Group;
import io.reactivex.mantis.remote.observable.MutableReference;
import io.reactivex.mantis.remote.observable.RemoteObservable;
import io.reactivex.mantis.remote.observable.RemoteObservableException;
import io.reactivex.mantis.remote.observable.RemoteRxEvent;
import io.reactivex.mantis.remote.observable.RxMetrics;
import io.reactivex.mantis.remote.observable.ServeConfig;
import io.reactivex.mantis.remote.observable.ServeGroupedObservable;
import io.reactivex.mantis.remote.observable.ServeNestedObservable;
import io.reactivex.mantis.remote.observable.ServeObservable;
import io.reactivex.mantis.remote.observable.WriteBytesObserver;
import io.reactivex.mantis.remote.observable.ingress.IngressPolicy;
import io.reactivex.mantis.remote.observable.slotting.SlottingStrategy;
import io.reactivx.mantis.operators.DisableBackPressureOperator;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import mantis.io.reactivex.netty.channel.ConnectionHandler;
import mantis.io.reactivex.netty.channel.ObservableConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Notification;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observables.GroupedObservable;

public class RemoteObservableConnectionHandler
implements ConnectionHandler<RemoteRxEvent, List<RemoteRxEvent>> {
    private static final Logger logger = LoggerFactory.getLogger(RemoteObservableConnectionHandler.class);
    private Map<String, ServeConfig> observables;
    private RxMetrics serverMetrics;
    private IngressPolicy ingressPolicy;
    private int writeBufferTimeMSec;

    public RemoteObservableConnectionHandler(Map<String, ServeConfig> observables, IngressPolicy ingressPolicy, RxMetrics metrics, int writeBufferTimeMSec) {
        this.observables = observables;
        this.ingressPolicy = ingressPolicy;
        this.serverMetrics = metrics;
        this.writeBufferTimeMSec = writeBufferTimeMSec;
    }

    public Observable<Void> handle(ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection) {
        logger.info("Connection received: " + connection.getChannel().remoteAddress());
        if (this.ingressPolicy.allowed(connection)) {
            return this.setupConnection(connection);
        }
        RemoteObservableException e = new RemoteObservableException("Connection rejected due to ingress policy");
        return Observable.error((Throwable)e);
    }

    private <T> Subscription serveObservable(Observable<T> observable, ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection, final RemoteRxEvent event, Func1<Map<String, String>, Func1<T, Boolean>> filterFunction, final Encoder<T> encoder, ServeObservable<T> serveConfig, WritableEndpoint<Observable<T>> endpoint) {
        MutableReference<Subscription> subReference = new MutableReference<Subscription>();
        subReference.setValue(observable.filter((Func1)filterFunction.call(event.getSubscribeParameters())).doOnCompleted(new Action0(){

            public void call() {
                logger.info("OnCompleted recieved in serveObservable, sending to client.");
            }
        }).doOnError((Action1)new Action1<Throwable>(){

            public void call(Throwable t1) {
                logger.info("OnError received in serveObservable, sending to client: ", t1);
            }
        }).materialize().lift((Observable.Operator)new DisableBackPressureOperator()).buffer((long)this.writeBufferTimeMSec, TimeUnit.MILLISECONDS).filter(new Func1<List<Notification<T>>, Boolean>(){

            public Boolean call(List<Notification<T>> t1) {
                return t1 != null && !t1.isEmpty();
            }
        }).map(new Func1<List<Notification<T>>, List<RemoteRxEvent>>(){

            public List<RemoteRxEvent> call(List<Notification<T>> notifications) {
                ArrayList<RemoteRxEvent> rxEvents = new ArrayList<RemoteRxEvent>(notifications.size());
                for (Notification notification : notifications) {
                    if (notification.getKind() == Notification.Kind.OnNext) {
                        rxEvents.add(RemoteRxEvent.next(event.getName(), encoder.encode(notification.getValue())));
                        continue;
                    }
                    if (notification.getKind() == Notification.Kind.OnError) {
                        rxEvents.add(RemoteRxEvent.error(event.getName(), RemoteObservable.fromThrowableToBytes(notification.getThrowable())));
                        continue;
                    }
                    if (notification.getKind() == Notification.Kind.OnCompleted) {
                        rxEvents.add(RemoteRxEvent.completed(event.getName()));
                        continue;
                    }
                    throw new RuntimeException("Unsupported notification kind: " + notification.getKind());
                }
                return rxEvents;
            }
        }).filter((Func1)new Func1<List<RemoteRxEvent>, Boolean>(){

            public Boolean call(List<RemoteRxEvent> t1) {
                return t1 != null && !t1.isEmpty();
            }
        }).subscribe(new WriteBytesObserver<Observable<T>>(connection, subReference, this.serverMetrics, serveConfig.getSlottingStrategy(), endpoint)));
        return subReference.getValue();
    }

    private <T> Subscription serveNestedObservable(Observable<T> observable, ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection, final RemoteRxEvent event, Func1<Map<String, String>, Func1<T, Boolean>> filterFunction, final Encoder<T> encoder, ServeNestedObservable<Observable<T>> serveConfig, WritableEndpoint<Observable<T>> endpoint) {
        MutableReference<Subscription> subReference = new MutableReference<Subscription>();
        subReference.setValue(observable.filter((Func1)filterFunction.call(event.getSubscribeParameters())).doOnCompleted(new Action0(){

            public void call() {
                logger.info("OnCompleted recieved in serveNestedObservable, sending to client.");
            }
        }).doOnError((Action1)new Action1<Throwable>(){

            public void call(Throwable t1) {
                logger.info("OnError received in serveNestedObservable, sending to client: ", t1);
            }
        }).map(new Func1<T, byte[]>(){

            public byte[] call(T t1) {
                return encoder.encode(t1);
            }
        }).materialize().map((Func1)new Func1<Notification<byte[]>, RemoteRxEvent>(){

            public RemoteRxEvent call(Notification<byte[]> notification) {
                if (notification.getKind() == Notification.Kind.OnNext) {
                    return RemoteRxEvent.next(event.getName(), (byte[])notification.getValue());
                }
                if (notification.getKind() == Notification.Kind.OnError) {
                    return RemoteRxEvent.error(event.getName(), RemoteObservable.fromThrowableToBytes(notification.getThrowable()));
                }
                if (notification.getKind() == Notification.Kind.OnCompleted) {
                    return RemoteRxEvent.completed(event.getName());
                }
                throw new RuntimeException("Unsupported notification kind: " + notification.getKind());
            }
        }).lift((Observable.Operator)new DisableBackPressureOperator()).buffer((long)this.writeBufferTimeMSec, TimeUnit.MILLISECONDS).filter((Func1)new Func1<List<RemoteRxEvent>, Boolean>(){

            public Boolean call(List<RemoteRxEvent> t1) {
                return t1 != null && !t1.isEmpty();
            }
        }).filter((Func1)new Func1<List<RemoteRxEvent>, Boolean>(){

            public Boolean call(List<RemoteRxEvent> t1) {
                return t1 != null && !t1.isEmpty();
            }
        }).subscribe(new WriteBytesObserver<Observable<T>>(connection, subReference, this.serverMetrics, serveConfig.getSlottingStrategy(), endpoint)));
        return subReference.getValue();
    }

    private <K, V> Subscription serveGroupedObservable(Observable<Group<K, V>> groups, ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection, final RemoteRxEvent event, final Func1<Map<String, String>, Func1<K, Boolean>> filterFunction, Encoder<K> keyEncoder, final Encoder<V> valueEncoder, ServeGroupedObservable<K, V> serveConfig, WritableEndpoint<GroupedObservable<K, V>> endpoint) {
        MutableReference<Subscription> subReference = new MutableReference<Subscription>();
        subReference.setValue(groups.filter(new Func1<Group<K, V>, Boolean>(){

            public Boolean call(Group<K, V> group) {
                return (Boolean)((Func1)filterFunction.call(event.getSubscribeParameters())).call(group.getKeyValue());
            }
        }).doOnCompleted(new Action0(){

            public void call() {
                logger.info("OnCompleted recieved in serveGroupedObservable, sending to client.");
            }
        }).doOnError((Action1)new Action1<Throwable>(){

            public void call(Throwable t1) {
                logger.info("OnError received in serveGroupedObservable, sending to client: ", t1);
            }
        }).materialize().lift((Observable.Operator)new DisableBackPressureOperator()).buffer((long)this.writeBufferTimeMSec, TimeUnit.MILLISECONDS).filter(new Func1<List<Notification<Group<K, V>>>, Boolean>(){

            public Boolean call(List<Notification<Group<K, V>>> t1) {
                return t1 != null && !t1.isEmpty();
            }
        }).map(new Func1<List<Notification<Group<K, V>>>, List<RemoteRxEvent>>(){

            public List<RemoteRxEvent> call(List<Notification<Group<K, V>>> groupNotifications) {
                ArrayList<RemoteRxEvent> rxEvents = new ArrayList<RemoteRxEvent>(groupNotifications.size());
                for (Notification groupNotification : groupNotifications) {
                    if (Notification.Kind.OnNext == groupNotification.getKind()) {
                        Group group = (Group)groupNotification.getValue();
                        int keyLength = group.getKeyBytes().length;
                        Notification notification = ((Group)groupNotification.getValue()).getNotification();
                        byte[] data = null;
                        if (Notification.Kind.OnNext == notification.getKind()) {
                            Object value = notification.getValue();
                            byte[] valueBytes = valueEncoder.encode(value);
                            data = ByteBuffer.allocate(5 + keyLength + valueBytes.length).put((byte)1).putInt(keyLength).put(group.getKeyBytes()).put(valueBytes).array();
                        } else if (Notification.Kind.OnCompleted == notification.getKind()) {
                            data = ByteBuffer.allocate(5 + keyLength).put((byte)2).putInt(keyLength).put(group.getKeyBytes()).array();
                        } else if (Notification.Kind.OnError == notification.getKind()) {
                            Throwable error = notification.getThrowable();
                            byte[] errorBytes = RemoteObservable.fromThrowableToBytes(error);
                            data = ByteBuffer.allocate(5 + keyLength + errorBytes.length).put((byte)3).putInt(keyLength).put(group.getKeyBytes()).put(errorBytes).array();
                        }
                        rxEvents.add(RemoteRxEvent.next(event.getName(), data));
                        continue;
                    }
                    if (Notification.Kind.OnCompleted == groupNotification.getKind()) {
                        rxEvents.add(RemoteRxEvent.completed(event.getName()));
                        continue;
                    }
                    if (Notification.Kind.OnError == groupNotification.getKind()) {
                        rxEvents.add(RemoteRxEvent.error(event.getName(), RemoteObservable.fromThrowableToBytes(groupNotification.getThrowable())));
                        continue;
                    }
                    throw new RuntimeException("Unsupported notification type: " + groupNotification.getKind());
                }
                return rxEvents;
            }
        }).filter((Func1)new Func1<List<RemoteRxEvent>, Boolean>(){

            public Boolean call(List<RemoteRxEvent> t1) {
                return t1 != null && !t1.isEmpty();
            }
        }).subscribe(new WriteBytesObserver<GroupedObservable<K, V>>(connection, subReference, this.serverMetrics, serveConfig.getSlottingStrategy(), endpoint)));
        return subReference.getValue();
    }

    private void subscribe(MutableReference<Subscription> unsubscribeCallbackReference, RemoteRxEvent event, ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection, ServeConfig configuration, WritableEndpoint endpoint) {
        Func1 filterFunction = configuration.getFilterFunction();
        Subscription subscription = null;
        if (configuration instanceof ServeObservable) {
            ServeObservable serveObservable = (ServeObservable)configuration;
            if (serveObservable.isSubscriptionPerConnection()) {
                Observable toServe = serveObservable.getObservable();
                if (serveObservable.isHotStream()) {
                    toServe = toServe.share();
                }
                subscription = this.serveObservable(toServe, connection, event, filterFunction, serveObservable.getEncoder(), serveObservable, endpoint);
            } else {
                subscription = this.serveObservable(endpoint.read(), connection, event, filterFunction, serveObservable.getEncoder(), serveObservable, endpoint);
            }
        } else if (configuration instanceof ServeGroupedObservable) {
            ServeGroupedObservable sgo = (ServeGroupedObservable)configuration;
            subscription = this.serveGroupedObservable(endpoint.read(), connection, event, filterFunction, sgo.getKeyEncoder(), sgo.getValueEncoder(), sgo, endpoint);
        } else if (configuration instanceof ServeNestedObservable) {
            ServeNestedObservable serveNestedObservable = (ServeNestedObservable)configuration;
            subscription = this.serveNestedObservable(endpoint.read(), connection, event, filterFunction, serveNestedObservable.getEncoder(), serveNestedObservable, endpoint);
        }
        unsubscribeCallbackReference.setValue(subscription);
    }

    private Observable<Void> handleSubscribeRequest(RemoteRxEvent event, ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection, MutableReference<SlottingStrategy> slottingStrategyReference, MutableReference<Subscription> unsubscribeCallbackReference, MutableReference<WritableEndpoint> slottingIdReference) {
        String observableName = event.getName();
        ServeConfig config = this.observables.get(observableName);
        if (config == null) {
            return Observable.error((Throwable)new RemoteObservableException("No remote observable configuration found for name: " + observableName));
        }
        if (event.getType() == RemoteRxEvent.Type.subscribed) {
            String slotId = null;
            Map<String, String> subscriptionParameters = event.getSubscribeParameters();
            if (subscriptionParameters != null) {
                slotId = subscriptionParameters.get("slotId");
            }
            InetSocketAddress address = (InetSocketAddress)connection.getChannel().remoteAddress();
            WritableEndpoint endpoint = new WritableEndpoint(address.getHostName(), address.getPort(), slotId, connection);
            SlottingStrategy slottingStrategy = config.getSlottingStrategy();
            slottingIdReference.setValue(endpoint);
            slottingStrategyReference.setValue(slottingStrategy);
            logger.info("Connection received on server from client endpoint: " + endpoint + ", subscribed to observable: " + observableName);
            this.serverMetrics.incrementSubscribedCount();
            this.subscribe(unsubscribeCallbackReference, event, connection, config, endpoint);
            if (!slottingStrategy.addConnection(endpoint)) {
                logger.warn("Failed to slot connection for endpoint: " + endpoint);
                connection.close(true);
            }
        }
        return Observable.empty();
    }

    private Observable<Void> setupConnection(final ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection) {
        final MutableReference unsubscribeCallbackReference = new MutableReference();
        final MutableReference slottingStrategyReference = new MutableReference();
        final MutableReference slottingIdReference = new MutableReference();
        return connection.getInput().filter((Func1)new Func1<RemoteRxEvent, Boolean>(){

            public Boolean call(RemoteRxEvent event) {
                boolean supportedOperation = false;
                if (event.getType() == RemoteRxEvent.Type.subscribed || event.getType() == RemoteRxEvent.Type.unsubscribed) {
                    supportedOperation = true;
                }
                return supportedOperation;
            }
        }).flatMap((Func1)new Func1<RemoteRxEvent, Observable<Void>>(){

            public Observable<Void> call(RemoteRxEvent event) {
                if (event.getType() == RemoteRxEvent.Type.subscribed) {
                    return RemoteObservableConnectionHandler.this.handleSubscribeRequest(event, (ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>>)connection, slottingStrategyReference, unsubscribeCallbackReference, slottingIdReference);
                }
                if (event.getType() == RemoteRxEvent.Type.unsubscribed) {
                    Subscription subscription = (Subscription)unsubscribeCallbackReference.getValue();
                    if (subscription != null) {
                        subscription.unsubscribe();
                    }
                    RemoteObservableConnectionHandler.this.serverMetrics.incrementUnsubscribedCount();
                    if (slottingStrategyReference.getValue() != null && !((SlottingStrategy)slottingStrategyReference.getValue()).removeConnection((WritableEndpoint)slottingIdReference.getValue())) {
                        logger.error("Failed to remove endpoint from slot,  endpoint: " + slottingIdReference.getValue());
                    }
                    logger.info("Connection: " + connection.getChannel().remoteAddress() + " unsubscribed, closing connection");
                    connection.close(true);
                }
                return Observable.empty();
            }
        });
    }
}

