/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.runtime.source.http.impl;

import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.runtime.source.http.HttpServerProvider;
import io.mantisrx.runtime.source.http.ServerPoller;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import mantis.io.reactivex.netty.client.RxClient;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;

public class DefaultHttpServerProvider
implements HttpServerProvider {
    private final ServerPoller serverPoller;
    private Gauge discoveryActiveGauge;
    private Gauge newServersGauge;
    private Gauge removedServersGauge;

    protected DefaultHttpServerProvider(ServerPoller serverPoller) {
        this.serverPoller = serverPoller;
        Metrics m = new Metrics.Builder().name("DefaultHttpServerProvider").addGauge("discoveryActiveGauge").addGauge("newServersGauge").addGauge("removedServersGauge").build();
        m = MetricsRegistry.getInstance().registerAndGet(m);
        this.discoveryActiveGauge = m.getGauge("discoveryActiveGauge");
        this.newServersGauge = m.getGauge("newServersGauge");
        this.removedServersGauge = m.getGauge("removedServersGauge");
    }

    private static Set<RxClient.ServerInfo> diff(Set<RxClient.ServerInfo> left, Set<RxClient.ServerInfo> right) {
        HashSet<RxClient.ServerInfo> result = new HashSet<RxClient.ServerInfo>(left);
        result.removeAll(right);
        return result;
    }

    public Set<RxClient.ServerInfo> getServers() {
        return this.serverPoller.getServers();
    }

    @Override
    public final Observable<RxClient.ServerInfo> getServersToAdd() {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<RxClient.ServerInfo>(){

            public void call(final Subscriber<? super RxClient.ServerInfo> subscriber) {
                Set empty = Collections.emptySet();
                final AtomicReference activeServers = new AtomicReference(empty);
                Subscription subs = DefaultHttpServerProvider.this.serverPoller.servers().subscribe((Subscriber)new Subscriber<Set<RxClient.ServerInfo>>(){

                    public void onCompleted() {
                        subscriber.onCompleted();
                    }

                    public void onError(Throwable e) {
                        subscriber.onError(e);
                    }

                    public void onNext(Set<RxClient.ServerInfo> servers) {
                        DefaultHttpServerProvider.this.discoveryActiveGauge.set((long)servers.size());
                        Set<RxClient.ServerInfo> currentServers = activeServers.getAndSet(servers);
                        Set newServers = DefaultHttpServerProvider.diff(servers, currentServers);
                        DefaultHttpServerProvider.this.newServersGauge.set((long)newServers.size());
                        for (RxClient.ServerInfo server : servers) {
                            subscriber.onNext((Object)server);
                        }
                    }
                });
                subscriber.add(subs);
            }
        });
    }

    @Override
    public Observable<RxClient.ServerInfo> getServersToRemove() {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<RxClient.ServerInfo>(){

            public void call(final Subscriber<? super RxClient.ServerInfo> subscriber) {
                Set empty = Collections.emptySet();
                final AtomicReference activeServers = new AtomicReference(empty);
                Subscription subs = DefaultHttpServerProvider.this.serverPoller.servers().subscribe((Subscriber)new Subscriber<Set<RxClient.ServerInfo>>(){

                    public void onCompleted() {
                        subscriber.onCompleted();
                    }

                    public void onError(Throwable e) {
                        subscriber.onError(e);
                    }

                    public void onNext(Set<RxClient.ServerInfo> servers) {
                        Set<RxClient.ServerInfo> currentServers = activeServers.getAndSet(servers);
                        Set serversToRemove = DefaultHttpServerProvider.diff(currentServers, servers);
                        DefaultHttpServerProvider.this.removedServersGauge.set((long)serversToRemove.size());
                        for (RxClient.ServerInfo server : serversToRemove) {
                            subscriber.onNext((Object)server);
                        }
                    }
                });
                subscriber.add(subs);
            }
        });
    }
}

