/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.mantis.network.push;

import io.mantisrx.common.MantisGroup;
import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.reactivex.mantis.network.push.HashFunction;
import io.reactivex.mantis.network.push.KeyValuePair;
import io.reactivex.mantis.network.push.PushTrigger;
import io.reactivx.mantis.operators.DisableBackPressureOperator;
import java.util.ArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;

public final class ObservableTrigger {
    private static final Logger logger = LoggerFactory.getLogger(ObservableTrigger.class);
    private static Scheduler timeoutScheduler = Schedulers.from((Executor)Executors.newFixedThreadPool(5));

    private ObservableTrigger() {
    }

    private static <T> PushTrigger<T> trigger(String name, Observable<T> o, Action0 doOnComplete, Action1<Throwable> doOnError) {
        AtomicReference subRef = new AtomicReference();
        Metrics metrics = new Metrics.Builder().name("ObservableTrigger_" + name).addGauge("subscriptionActive").build();
        Gauge subscriptionActive = metrics.getGauge("subscriptionActive");
        Action1 doOnStart = queue -> {
            Subscription oldSub = subRef.getAndSet(o.filter(t1 -> t1 != null).doOnSubscribe(() -> {
                logger.info("Subscription is ACTIVE for observable trigger with name: " + name);
                subscriptionActive.increment();
            }).doOnUnsubscribe(() -> {
                logger.info("Subscription is INACTIVE for observable trigger with name: " + name);
                subscriptionActive.decrement();
            }).subscribe(data -> queue.write(data), e -> {
                logger.warn("Observable used to push data errored, on server with name: " + name, e);
                if (doOnError != null) {
                    doOnError.call(e);
                }
            }, () -> {
                logger.info("Observable used to push data completed, on server with name: " + name);
                if (doOnComplete != null) {
                    doOnComplete.call();
                }
            }));
            if (oldSub != null) {
                logger.info("A new subscription is ACTIVE. Unsubscribe from previous subscription observable trigger with name: " + name);
                oldSub.unsubscribe();
            }
        };
        Action1 doOnStop = t1 -> {
            if (subRef.get() != null) {
                logger.warn("Connections from next stage has dropped to 0. Do not propagate unsubscribe");
            }
        };
        return new PushTrigger(doOnStart, doOnStop, metrics);
    }

    private static <T> PushTrigger<T> ssetrigger(String name, Observable<T> o, Action0 doOnComplete, Action1<Throwable> doOnError) {
        AtomicReference subRef = new AtomicReference();
        Metrics metrics = new Metrics.Builder().name("ObservableTrigger_" + name).addGauge("subscriptionActive").build();
        Gauge subscriptionActive = metrics.getGauge("subscriptionActive");
        Action1 doOnStart = queue -> subRef.set(o.filter(t1 -> t1 != null).doOnSubscribe(() -> {
            logger.info("Subscription is ACTIVE for observable trigger with name: " + name);
            subscriptionActive.increment();
        }).doOnUnsubscribe(() -> {
            logger.info("Subscription is INACTIVE for observable trigger with name: " + name);
            subscriptionActive.decrement();
        }).subscribe(data -> queue.write(data), e -> {
            logger.warn("Observable used to push data errored, on server with name: " + name, e);
            if (doOnError != null) {
                doOnError.call(e);
            }
        }, () -> {
            logger.info("Observable used to push data completed, on server with name: " + name);
            if (doOnComplete != null) {
                doOnComplete.call();
            }
        }));
        Action1 doOnStop = t1 -> {
            if (subRef.get() != null) {
                logger.warn("Connections from next stage has dropped to 0 for SSE stage. propagate unsubscribe");
                ((Subscription)subRef.get()).unsubscribe();
            }
        };
        return new PushTrigger(doOnStart, doOnStop, metrics);
    }

    private static <K, V> PushTrigger<KeyValuePair<K, V>> groupTrigger(String name, Observable<GroupedObservable<K, V>> o, Action0 doOnComplete, Action1<Throwable> doOnError, long groupExpirySeconds, Func1<K, byte[]> keyEncoder, HashFunction hashFunction) {
        AtomicReference subRef = new AtomicReference();
        Metrics metrics = new Metrics.Builder().name("ObservableTrigger_" + name).addGauge("subscriptionActive").build();
        Gauge subscriptionActive = metrics.getGauge("subscriptionActive");
        Action1 doOnStart = queue -> {
            Subscription oldSub = subRef.getAndSet(o.doOnSubscribe(() -> {
                logger.info("Subscription is ACTIVE for observable trigger with name: " + name);
                subscriptionActive.increment();
            }).doOnUnsubscribe(() -> {
                logger.info("Subscription is INACTIVE for observable trigger with name: " + name);
                subscriptionActive.decrement();
            }).flatMap(group -> {
                byte[] keyBytes = (byte[])keyEncoder.call(group.getKey());
                long keyBytesHashed = hashFunction.computeHash(keyBytes);
                return group.timeout(groupExpirySeconds, TimeUnit.SECONDS, Observable.empty(), timeoutScheduler).lift((Observable.Operator)new DisableBackPressureOperator()).buffer(250L, TimeUnit.MILLISECONDS).filter(t1 -> t1 != null && !t1.isEmpty()).map(list -> {
                    ArrayList keyPairList = new ArrayList(list.size());
                    for (Object data : list) {
                        keyPairList.add(new KeyValuePair(keyBytesHashed, keyBytes, data));
                    }
                    return keyPairList;
                });
            }).subscribe(list -> {
                for (KeyValuePair data : list) {
                    queue.write(data);
                }
            }, e -> {
                logger.warn("Observable used to push data errored, on server with name: " + name, e);
                if (doOnError != null) {
                    doOnError.call(e);
                }
            }, () -> {
                logger.info("Observable used to push data completed, on server with name: " + name);
                if (doOnComplete != null) {
                    doOnComplete.call();
                }
            }));
            if (oldSub != null) {
                logger.info("A new subscription is ACTIVE. Unsubscribe from previous subscription observable trigger with name: " + name);
                oldSub.unsubscribe();
            }
        };
        Action1 doOnStop = t1 -> {
            if (subRef.get() != null) {
                logger.warn("Connections from next stage has dropped to 0. Do not propagate unsubscribe until a new connection is made.");
            }
        };
        return new PushTrigger<KeyValuePair<K, V>>(doOnStart, doOnStop, metrics);
    }

    private static <K, V> PushTrigger<KeyValuePair<K, V>> mantisGroupTrigger(String name, Observable<MantisGroup<K, V>> o, Action0 doOnComplete, Action1<Throwable> doOnError, long groupExpirySeconds, Func1<K, byte[]> keyEncoder, HashFunction hashFunction) {
        AtomicReference subRef = new AtomicReference();
        Metrics metrics = new Metrics.Builder().name("ObservableTrigger_" + name).addGauge("subscriptionActive").build();
        Gauge subscriptionActive = metrics.getGauge("subscriptionActive");
        Action1 doOnStart = queue -> {
            Subscription oldSub = subRef.getAndSet(o.doOnSubscribe(() -> {
                logger.info("Subscription is ACTIVE for observable trigger with name: " + name);
                subscriptionActive.increment();
            }).doOnUnsubscribe(() -> {
                logger.info("Subscription is INACTIVE for observable trigger with name: " + name);
                subscriptionActive.decrement();
            }).map(data -> {
                byte[] keyBytes = (byte[])keyEncoder.call(data.getKeyValue());
                long keyBytesHashed = hashFunction.computeHash(keyBytes);
                return new KeyValuePair(keyBytesHashed, keyBytes, data.getValue());
            }).subscribe(data -> queue.write(data), e -> {
                logger.warn("Observable used to push data errored, on server with name: " + name, e);
                if (doOnError != null) {
                    doOnError.call(e);
                }
            }, () -> {
                logger.info("Observable used to push data completed, on server with name: " + name);
                if (doOnComplete != null) {
                    doOnComplete.call();
                }
            }));
            if (oldSub != null) {
                logger.info("A new subscription is ACTIVE. Unsubscribe from previous subscription observable trigger with name: " + name);
                oldSub.unsubscribe();
            }
        };
        Action1 doOnStop = t1 -> {
            if (subRef.get() != null) {
                logger.warn("Connections from next stage has dropped to 0. Do not propagate unsubscribe until a new connection is made.");
            }
        };
        return new PushTrigger<KeyValuePair<K, V>>(doOnStart, doOnStop, metrics);
    }

    public static <T> PushTrigger<T> o(String name, Observable<T> o, Action0 doOnComplete, Action1<Throwable> doOnError) {
        return ObservableTrigger.ssetrigger(name, o, doOnComplete, doOnError);
    }

    public static <T> PushTrigger<T> oo(String name, Observable<Observable<T>> oo, Action0 doOnComplete, Action1<Throwable> doOnError) {
        return ObservableTrigger.trigger(name, Observable.merge(oo), doOnComplete, doOnError);
    }

    public static <K, V> PushTrigger<KeyValuePair<K, V>> oogo(String name, Observable<Observable<GroupedObservable<K, V>>> oo, Action0 doOnComplete, Action1<Throwable> doOnError, long groupExpirySeconds, Func1<K, byte[]> keyEncoder, HashFunction hashFunction) {
        return ObservableTrigger.groupTrigger(name, Observable.merge(oo), doOnComplete, doOnError, groupExpirySeconds, keyEncoder, hashFunction);
    }

    public static <K, V> PushTrigger<KeyValuePair<K, V>> oomgo(String name, Observable<Observable<MantisGroup<K, V>>> oo, Action0 doOnComplete, Action1<Throwable> doOnError, long groupExpirySeconds, Func1<K, byte[]> keyEncoder, HashFunction hashFunction) {
        return ObservableTrigger.mantisGroupTrigger(name, Observable.merge(oo), doOnComplete, doOnError, groupExpirySeconds, keyEncoder, hashFunction);
    }
}

