/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.common.metrics.rx;

import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;

public class MonitorOperator<T>
implements Observable.Operator<T, T> {
    private static Logger logger = LoggerFactory.getLogger(MonitorOperator.class);
    private final Counter next;
    private final Gauge nextGauge;
    private final Gauge error;
    private final Gauge complete;
    private final Gauge subscribe;
    private String name;

    public MonitorOperator(String name) {
        this.name = name;
        Metrics m = new Metrics.Builder().name(name).addCounter("onNext").addGauge("onError").addGauge("onComplete").addGauge("subscribe").addGauge("onNextGauge").build();
        m = MetricsRegistry.getInstance().registerAndGet(m);
        this.next = m.getCounter("onNext");
        this.error = m.getGauge("onError");
        this.complete = m.getGauge("onComplete");
        this.subscribe = m.getGauge("subscribe");
        this.nextGauge = m.getGauge("onNextGauge");
    }

    public Subscriber<? super T> call(final Subscriber<? super T> o) {
        this.subscribe.increment();
        o.add(Subscriptions.create((Action0)new Action0(){

            public void call() {
                MonitorOperator.this.subscribe.decrement();
            }
        }));
        return new Subscriber<T>(o){

            public void onCompleted() {
                logger.debug("onCompleted() called for monitored observable with name: " + MonitorOperator.this.name);
                MonitorOperator.this.complete.increment();
                o.onCompleted();
            }

            public void onError(Throwable e) {
                logger.error("onError() called for monitored observable with name: " + MonitorOperator.this.name, e);
                MonitorOperator.this.error.increment();
                o.onError(e);
            }

            public void onNext(T t) {
                MonitorOperator.this.next.increment();
                MonitorOperator.this.nextGauge.set(MonitorOperator.this.next.value());
                o.onNext(t);
            }
        };
    }
}

