/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.mantis.remote.observable;

import io.mantisrx.common.codec.Encoder;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.network.HashFunctions;
import io.mantisrx.server.core.ServiceRegistry;
import io.reactivex.mantis.remote.observable.Group;
import io.reactivex.mantis.remote.observable.MutableReference;
import io.reactivex.mantis.remote.observable.ServeConfig;
import io.reactivex.mantis.remote.observable.filter.ServerSideFilters;
import io.reactivex.mantis.remote.observable.slotting.ConsistentHashing;
import io.reactivex.mantis.remote.observable.slotting.SlottingStrategy;
import io.reactivx.mantis.operators.DisableBackPressureOperator;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observables.GroupedObservable;

public class ServeGroupedObservable<K, V>
extends ServeConfig<K, Group<String, V>> {
    private static final Logger logger = LoggerFactory.getLogger(ServeGroupedObservable.class);
    private Encoder<String> keyEncoder;
    private Encoder<V> valueEncoder;
    private int groupBufferTimeMSec = 250;
    private long expiryInSecs = Long.MAX_VALUE;
    private Counter groupsExpiredCounter;

    public ServeGroupedObservable(Builder<K, V> builder) {
        super(((Builder)builder).name, ((Builder)builder).slottingStrategy, ((Builder)builder).filterFunction, ((Builder)builder).maxWriteAttempts);
        String groupBufferTimeMSecStr = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.remoteObservable.groupBufferMSec", "250");
        if (groupBufferTimeMSecStr != null && !groupBufferTimeMSecStr.equals("250")) {
            this.groupBufferTimeMSec = Integer.parseInt(groupBufferTimeMSecStr);
        }
        this.keyEncoder = ((Builder)builder).keyEncoder;
        this.valueEncoder = ((Builder)builder).valueEncoder;
        this.expiryInSecs = ((Builder)builder).expiryTimeInSecs;
        Metrics m3 = new Metrics.Builder().name("ServeGroupedObservable").addCounter("groupsExpiredCounter").build();
        m3 = MetricsRegistry.getInstance().registerAndGet(m3);
        this.groupsExpiredCounter = m3.getCounter("groupsExpiredCounter");
        this.applySlottingSideEffectToObservable(((Builder)builder).observable, ((Builder)builder).minConnectionsToSubscribe);
    }

    private void applySlottingSideEffectToObservable(Observable<Observable<GroupedObservable<String, V>>> o, Observable<Integer> minConnectionsToSubscribe) {
        final AtomicInteger currentMinConnectionsToSubscribe = new AtomicInteger();
        minConnectionsToSubscribe.subscribe(new Action1<Integer>(){

            @Override
            public void call(Integer t1) {
                currentMinConnectionsToSubscribe.set(t1);
            }
        });
        Observable listOfGroups = o.map(new Func1<Observable<GroupedObservable<String, V>>, Observable<List<Group<String, V>>>>(){

            @Override
            public Observable<List<Group<String, V>>> call(Observable<GroupedObservable<String, V>> og) {
                return og.flatMap(new Func1<GroupedObservable<String, V>, Observable<List<Group<String, V>>>>(){

                    @Override
                    public Observable<List<Group<String, V>>> call(GroupedObservable<String, V> group) {
                        final byte[] keyBytes = ServeGroupedObservable.this.keyEncoder.encode(group.getKey());
                        final String keyValue = group.getKey();
                        return group.doOnUnsubscribe(new Action0(){

                            @Override
                            public void call() {
                                ServeGroupedObservable.this.groupsExpiredCounter.increment();
                            }
                        }).timeout(ServeGroupedObservable.this.expiryInSecs, TimeUnit.SECONDS, Observable.empty()).materialize().lift(new DisableBackPressureOperator()).buffer((long)ServeGroupedObservable.this.groupBufferTimeMSec, TimeUnit.MILLISECONDS).filter(new Func1<List<Notification<V>>, Boolean>(){

                            @Override
                            public Boolean call(List<Notification<V>> t1) {
                                return t1 != null && !t1.isEmpty();
                            }
                        }).map(new Func1<List<Notification<V>>, List<Group<String, V>>>(){

                            @Override
                            public List<Group<String, V>> call(List<Notification<V>> notifications) {
                                ArrayList groups = new ArrayList(notifications.size());
                                for (Notification notification : notifications) {
                                    groups.add(new Group(keyValue, keyBytes, notification));
                                }
                                return groups;
                            }
                        });
                    }
                });
            }
        });
        final Observable withSideEffects = Observable.merge(listOfGroups).doOnEach(new Observer<List<Group<String, V>>>(){

            @Override
            public void onCompleted() {
                ServeGroupedObservable.this.slottingStrategy.completeAllConnections();
            }

            @Override
            public void onError(Throwable e2) {
                e2.printStackTrace();
                ServeGroupedObservable.this.slottingStrategy.errorAllConnections(e2);
            }

            @Override
            public void onNext(List<Group<String, V>> listOfGroups) {
                for (Group group : listOfGroups) {
                    ServeGroupedObservable.this.slottingStrategy.writeOnSlot(group.getKeyBytes(), group);
                }
            }
        });
        final MutableReference subscriptionRef = new MutableReference();
        final AtomicInteger connectionCount = new AtomicInteger(0);
        final AtomicBoolean isSubscribed = new AtomicBoolean();
        this.slottingStrategy.registerDoOnEachConnectionAdded(new Action0(){

            @Override
            public void call() {
                Integer minNeeded = currentMinConnectionsToSubscribe.get();
                Integer current = connectionCount.incrementAndGet();
                if (current >= minNeeded) {
                    if (isSubscribed.compareAndSet(false, true)) {
                        logger.info("MinConnectionsToSubscribe: " + minNeeded + ", has been met, subscribing to observable, current connection count: " + current);
                        subscriptionRef.setValue(withSideEffects.subscribe());
                    }
                } else {
                    logger.info("MinConnectionsToSubscribe: " + minNeeded + ", has NOT been met, current connection count: " + current);
                }
            }
        });
        this.slottingStrategy.registerDoAfterLastConnectionRemoved(new Action0(){

            @Override
            public void call() {
                ((Subscription)subscriptionRef.getValue()).unsubscribe();
                logger.info("All connections deregistered, unsubscribed to observable, resetting current connection count: 0");
                connectionCount.set(0);
                isSubscribed.set(false);
            }
        });
    }

    public Encoder<String> getKeyEncoder() {
        return this.keyEncoder;
    }

    public Encoder<V> getValueEncoder() {
        return this.valueEncoder;
    }

    public static class Builder<K, V> {
        private String name;
        private Observable<Observable<GroupedObservable<String, V>>> observable;
        private SlottingStrategy<Group<String, V>> slottingStrategy;
        private Encoder<String> keyEncoder;
        private Encoder<V> valueEncoder;
        private Func1<Map<String, String>, Func1<K, Boolean>> filterFunction;
        private int maxWriteAttempts;
        private long expiryTimeInSecs;
        private Observable<Integer> minConnectionsToSubscribe;

        public Builder() {
            this.slottingStrategy = new ConsistentHashing<Group<String, V>>(this.name, HashFunctions.ketama());
            this.filterFunction = ServerSideFilters.noFiltering();
            this.maxWriteAttempts = 3;
            this.expiryTimeInSecs = Long.MAX_VALUE;
            this.minConnectionsToSubscribe = Observable.just(1);
        }

        public Builder<K, V> name(String name2) {
            if (name2 != null && name2.length() > 127) {
                throw new IllegalArgumentException("Observable name must be less than 127 characters");
            }
            this.name = name2;
            return this;
        }

        public Builder<K, V> observable(Observable<Observable<GroupedObservable<String, V>>> observable) {
            this.observable = observable;
            return this;
        }

        public Builder<K, V> maxWriteAttempts(int maxWriteAttempts) {
            this.maxWriteAttempts = maxWriteAttempts;
            return this;
        }

        public Builder<K, V> withExpirySecs(long expiryInSecs) {
            this.expiryTimeInSecs = expiryInSecs;
            return this;
        }

        public Builder<K, V> minConnectionsToSubscribe(Observable<Integer> minConnectionsToSubscribe) {
            this.minConnectionsToSubscribe = minConnectionsToSubscribe;
            return this;
        }

        public Builder<K, V> slottingStrategy(SlottingStrategy<Group<String, V>> slottingStrategy) {
            this.slottingStrategy = slottingStrategy;
            return this;
        }

        public Builder<K, V> keyEncoder(Encoder<String> keyEncoder) {
            this.keyEncoder = keyEncoder;
            return this;
        }

        public Builder<K, V> valueEncoder(Encoder<V> valueEncoder) {
            this.valueEncoder = valueEncoder;
            return this;
        }

        public Builder<K, V> serverSideFilter(Func1<Map<String, String>, Func1<K, Boolean>> filterFunc) {
            this.filterFunction = filterFunc;
            return this;
        }

        public ServeGroupedObservable<K, V> build() {
            return new ServeGroupedObservable(this);
        }
    }
}

