/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.mantis.remote.observable;

import io.mantisrx.common.network.Endpoint;
import io.reactivex.mantis.remote.observable.ConnectToGroupedObservable;
import io.reactivex.mantis.remote.observable.ConnectToObservable;
import io.reactivex.mantis.remote.observable.RemoteObservable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subjects.PublishSubject;

public class DynamicConnection<T> {
    private static final Logger logger = LoggerFactory.getLogger(DynamicConnection.class);
    private Observable<Endpoint> changeEndpointObservable;
    private PublishSubject<Observable<T>> subject = PublishSubject.create();
    private Func1<Endpoint, Observable<T>> toObservableFunc;

    DynamicConnection(Func1<Endpoint, Observable<T>> toObservableFunc, Observable<Endpoint> changeEndpointObservable) {
        this.changeEndpointObservable = changeEndpointObservable;
        this.toObservableFunc = toObservableFunc;
    }

    public static <K, V> DynamicConnection<GroupedObservable<K, V>> create(final ConnectToGroupedObservable.Builder<K, V> config, Observable<Endpoint> endpoints) {
        Func1 toObservableFunc = new Func1<Endpoint, Observable<GroupedObservable<K, V>>>(){

            @Override
            public Observable<GroupedObservable<K, V>> call(Endpoint endpoint) {
                ConnectToGroupedObservable.Builder configCopy = new ConnectToGroupedObservable.Builder(config);
                configCopy.host(endpoint.getHost()).port(endpoint.getPort()).slotId(endpoint.getSlotId());
                return RemoteObservable.connect(configCopy.build()).getObservable();
            }
        };
        return new DynamicConnection<GroupedObservable<K, V>>(toObservableFunc, endpoints);
    }

    public static <T> DynamicConnection<T> create(final ConnectToObservable.Builder<T> config, Observable<Endpoint> endpoints) {
        Func1 toObservableFunc = new Func1<Endpoint, Observable<T>>(){

            @Override
            public Observable<T> call(Endpoint endpoint) {
                ConnectToObservable.Builder configCopy = new ConnectToObservable.Builder(config);
                configCopy.host(endpoint.getHost()).port(endpoint.getPort()).slotId(endpoint.getSlotId());
                return RemoteObservable.connect(configCopy.build()).getObservable();
            }
        };
        return new DynamicConnection<T>(toObservableFunc, endpoints);
    }

    public void close() {
        this.subject.onCompleted();
    }

    public Observable<T> observable() {
        return Observable.create(new Observable.OnSubscribe<T>(){

            @Override
            public void call(final Subscriber<? super T> subscriber2) {
                subscriber2.add(DynamicConnection.this.subject.flatMap(new Func1<Observable<T>, Observable<T>>(){

                    @Override
                    public Observable<T> call(Observable<T> t1) {
                        return t1;
                    }
                }).subscribe(new Observer<T>(){

                    @Override
                    public void onCompleted() {
                        subscriber2.onCompleted();
                    }

                    @Override
                    public void onError(Throwable e2) {
                        subscriber2.onError(e2);
                    }

                    @Override
                    public void onNext(T t) {
                        subscriber2.onNext(t);
                    }
                }));
                subscriber2.add(DynamicConnection.this.changeEndpointObservable.subscribe(new Action1<Endpoint>(){

                    @Override
                    public void call(Endpoint endpoint) {
                        logger.debug("New endpoint: " + endpoint);
                        DynamicConnection.this.subject.onNext(DynamicConnection.this.toObservableFunc.call(endpoint));
                    }
                }));
            }
        });
    }
}

