/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.runtime.source.http.impl;

import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.runtime.source.http.HttpServerProvider;
import io.mantisrx.runtime.source.http.ServerPoller;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import mantis.io.reactivex.netty.client.RxClient;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;

public class DefaultHttpServerProvider
implements HttpServerProvider {
    private final ServerPoller serverPoller;
    private Gauge discoveryActiveGauge;
    private Gauge newServersGauge;
    private Gauge removedServersGauge;

    protected DefaultHttpServerProvider(ServerPoller serverPoller) {
        this.serverPoller = serverPoller;
        Metrics m = new Metrics.Builder().name("DefaultHttpServerProvider").addGauge("discoveryActiveGauge").addGauge("newServersGauge").addGauge("removedServersGauge").build();
        m = MetricsRegistry.getInstance().registerAndGet(m);
        this.discoveryActiveGauge = m.getGauge("discoveryActiveGauge");
        this.newServersGauge = m.getGauge("newServersGauge");
        this.removedServersGauge = m.getGauge("removedServersGauge");
    }

    private static Set<RxClient.ServerInfo> diff(Set<RxClient.ServerInfo> left2, Set<RxClient.ServerInfo> right2) {
        HashSet<RxClient.ServerInfo> result = new HashSet<RxClient.ServerInfo>(left2);
        result.removeAll(right2);
        return result;
    }

    public Set<RxClient.ServerInfo> getServers() {
        return this.serverPoller.getServers();
    }

    @Override
    public final Observable<RxClient.ServerInfo> getServersToAdd() {
        return Observable.create(new Observable.OnSubscribe<RxClient.ServerInfo>(){

            @Override
            public void call(final Subscriber<? super RxClient.ServerInfo> subscriber2) {
                Set empty2 = Collections.emptySet();
                final AtomicReference activeServers = new AtomicReference(empty2);
                Subscription subs2 = DefaultHttpServerProvider.this.serverPoller.servers().subscribe(new Subscriber<Set<RxClient.ServerInfo>>(){

                    @Override
                    public void onCompleted() {
                        subscriber2.onCompleted();
                    }

                    @Override
                    public void onError(Throwable e2) {
                        subscriber2.onError(e2);
                    }

                    @Override
                    public void onNext(Set<RxClient.ServerInfo> servers) {
                        DefaultHttpServerProvider.this.discoveryActiveGauge.set(servers.size());
                        Set<RxClient.ServerInfo> currentServers = activeServers.getAndSet(servers);
                        Set newServers = DefaultHttpServerProvider.diff(servers, currentServers);
                        DefaultHttpServerProvider.this.newServersGauge.set(newServers.size());
                        for (RxClient.ServerInfo server2 : servers) {
                            subscriber2.onNext(server2);
                        }
                    }
                });
                subscriber2.add(subs2);
            }
        });
    }

    @Override
    public Observable<RxClient.ServerInfo> getServersToRemove() {
        return Observable.create(new Observable.OnSubscribe<RxClient.ServerInfo>(){

            @Override
            public void call(final Subscriber<? super RxClient.ServerInfo> subscriber2) {
                Set empty2 = Collections.emptySet();
                final AtomicReference activeServers = new AtomicReference(empty2);
                Subscription subs2 = DefaultHttpServerProvider.this.serverPoller.servers().subscribe(new Subscriber<Set<RxClient.ServerInfo>>(){

                    @Override
                    public void onCompleted() {
                        subscriber2.onCompleted();
                    }

                    @Override
                    public void onError(Throwable e2) {
                        subscriber2.onError(e2);
                    }

                    @Override
                    public void onNext(Set<RxClient.ServerInfo> servers) {
                        Set<RxClient.ServerInfo> currentServers = activeServers.getAndSet(servers);
                        Set serversToRemove = DefaultHttpServerProvider.diff(currentServers, servers);
                        DefaultHttpServerProvider.this.removedServersGauge.set(serversToRemove.size());
                        for (RxClient.ServerInfo server2 : serversToRemove) {
                            subscriber2.onNext(server2);
                        }
                    }
                });
                subscriber2.add(subs2);
            }
        });
    }
}

