/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.mantis.remote.observable;

import io.mantisrx.common.MantisGroup;
import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.network.Endpoint;
import io.reactivex.mantis.remote.observable.ConnectToGroupedObservable;
import io.reactivex.mantis.remote.observable.ConnectToObservable;
import io.reactivex.mantis.remote.observable.EndpointChange;
import io.reactivex.mantis.remote.observable.EndpointInjector;
import io.reactivex.mantis.remote.observable.RemoteObservable;
import io.reactivex.mantis.remote.observable.RemoteRxConnection;
import io.reactivex.mantis.remote.observable.reconciliator.ConnectionSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func3;
import rx.observables.GroupedObservable;
import rx.subjects.PublishSubject;

public class DynamicConnectionSet<T>
implements ConnectionSet<T> {
    private static final Logger logger = LoggerFactory.getLogger(DynamicConnectionSet.class);
    private static final SpscArrayQueue<MantisGroup<?, ?>> inputQueue = new SpscArrayQueue(1000);
    private static int MIN_TIME_SEC_DEFAULT = 1;
    private static int MAX_TIME_SEC_DEFAULT = 10;
    private EndpointInjector endpointInjector;
    private PublishSubject<EndpointChange> reconciliatorConnector = PublishSubject.create();
    private Func3<Endpoint, Action0, PublishSubject<Integer>, RemoteRxConnection<T>> toObservableFunc;
    private Metrics connectionMetrics;
    private PublishSubject<Set<Endpoint>> activeConnectionsSubject = PublishSubject.create();
    private Lock activeConnectionsLock = new ReentrantLock();
    private Map<String, Endpoint> currentActiveConnections = new HashMap<String, Endpoint>();
    private int minTimeoutOnUnexpectedTerminateSec;
    private int maxTimeoutOnUnexpectedTerminateSec;
    private Gauge activeConnectionsGauge;
    private Gauge closedConnections;
    private Gauge forceCompletedConnections;
    private Random random = new Random();

    public DynamicConnectionSet(Func3<Endpoint, Action0, PublishSubject<Integer>, RemoteRxConnection<T>> toObservableFunc, int minTimeoutOnUnexpectedTerminateSec, int maxTimeoutOnUnexpectedTerminateSec) {
        this.toObservableFunc = toObservableFunc;
        this.connectionMetrics = new Metrics.Builder().name("DynamicConnectionSet").addGauge("activeConnections").addGauge("closedConnections").addGauge("forceCompletedConnections").build();
        this.activeConnectionsGauge = this.connectionMetrics.getGauge("activeConnections");
        this.closedConnections = this.connectionMetrics.getGauge("closedConnections");
        this.forceCompletedConnections = this.connectionMetrics.getGauge("forceCompletedConnections");
        this.minTimeoutOnUnexpectedTerminateSec = minTimeoutOnUnexpectedTerminateSec;
        this.maxTimeoutOnUnexpectedTerminateSec = maxTimeoutOnUnexpectedTerminateSec;
    }

    public DynamicConnectionSet(Func3<Endpoint, Action0, PublishSubject<Integer>, RemoteRxConnection<T>> toObservableFunc) {
        this(toObservableFunc, MIN_TIME_SEC_DEFAULT, MAX_TIME_SEC_DEFAULT);
    }

    public static <K, V> DynamicConnectionSet<GroupedObservable<K, V>> create(final ConnectToGroupedObservable.Builder<K, V> config, int maxTimeBeforeDisconnectSec) {
        Func3 toObservableFunc = new Func3<Endpoint, Action0, PublishSubject<Integer>, RemoteRxConnection<GroupedObservable<K, V>>>(){

            public RemoteRxConnection<GroupedObservable<K, V>> call(Endpoint endpoint, Action0 disconnectCallback, PublishSubject<Integer> closeConnectionTrigger) {
                ConnectToGroupedObservable.Builder configCopy = new ConnectToGroupedObservable.Builder(config);
                configCopy.host(endpoint.getHost()).port(endpoint.getPort()).closeTrigger(closeConnectionTrigger).connectionDisconnectCallback(disconnectCallback).slotId(endpoint.getSlotId());
                return RemoteObservable.connect(configCopy.build());
            }
        };
        return new DynamicConnectionSet<GroupedObservable<K, V>>(toObservableFunc, MIN_TIME_SEC_DEFAULT, maxTimeBeforeDisconnectSec);
    }

    public static <K, V> DynamicConnectionSet<MantisGroup<K, V>> createMGO(final ConnectToGroupedObservable.Builder<K, V> config, int maxTimeBeforeDisconnectSec, final SpscArrayQueue<MantisGroup<?, ?>> inputQueue) {
        Func3 toObservableFunc = new Func3<Endpoint, Action0, PublishSubject<Integer>, RemoteRxConnection<MantisGroup<K, V>>>(){

            public RemoteRxConnection<MantisGroup<K, V>> call(Endpoint endpoint, Action0 disconnectCallback, PublishSubject<Integer> closeConnectionTrigger) {
                ConnectToGroupedObservable.Builder configCopy = new ConnectToGroupedObservable.Builder(config);
                configCopy.host(endpoint.getHost()).port(endpoint.getPort()).closeTrigger(closeConnectionTrigger).connectionDisconnectCallback(disconnectCallback).slotId(endpoint.getSlotId());
                return RemoteObservable.connectToMGO(configCopy.build(), inputQueue);
            }
        };
        return new DynamicConnectionSet<MantisGroup<K, V>>(toObservableFunc, MIN_TIME_SEC_DEFAULT, maxTimeBeforeDisconnectSec);
    }

    public static <K, V> DynamicConnectionSet<GroupedObservable<K, V>> create(ConnectToGroupedObservable.Builder<K, V> config) {
        return DynamicConnectionSet.create(config, MAX_TIME_SEC_DEFAULT);
    }

    public static <K, V> DynamicConnectionSet<MantisGroup<K, V>> createMGO(ConnectToGroupedObservable.Builder<K, V> config) {
        return DynamicConnectionSet.createMGO(config, MAX_TIME_SEC_DEFAULT, inputQueue);
    }

    public static <T> DynamicConnectionSet<T> create(final ConnectToObservable.Builder<T> config, int maxTimeBeforeDisconnectSec) {
        Func3 toObservableFunc = new Func3<Endpoint, Action0, PublishSubject<Integer>, RemoteRxConnection<T>>(){

            public RemoteRxConnection<T> call(Endpoint endpoint, Action0 disconnectCallback, PublishSubject<Integer> closeConnectionTrigger) {
                ConnectToObservable.Builder configCopy = new ConnectToObservable.Builder(config);
                configCopy.host(endpoint.getHost()).port(endpoint.getPort()).closeTrigger(closeConnectionTrigger).connectionDisconnectCallback(disconnectCallback).slotId(endpoint.getSlotId());
                return RemoteObservable.connect(configCopy.build());
            }
        };
        return new DynamicConnectionSet<T>(toObservableFunc, MIN_TIME_SEC_DEFAULT, maxTimeBeforeDisconnectSec);
    }

    public static <T> DynamicConnectionSet<T> create(ConnectToObservable.Builder<T> config) {
        return DynamicConnectionSet.create(config, MAX_TIME_SEC_DEFAULT);
    }

    public void setEndpointInjector(EndpointInjector endpointInjector) {
        this.endpointInjector = endpointInjector;
    }

    public Observer<EndpointChange> reconciliatorObserver() {
        return this.reconciliatorConnector;
    }

    public Metrics getConnectionMetrics() {
        return this.connectionMetrics;
    }

    @Override
    public Observable<Observable<T>> observables() {
        return this.endpointInjector.deltas().doOnCompleted(() -> logger.info("onComplete on injector deltas")).doOnError(t -> logger.error("caught unexpected error {}", (Object)t.getMessage(), t)).doOnSubscribe(new Action0(){

            public void call() {
                logger.info("Subscribing, clearing active connection set");
                DynamicConnectionSet.this.resetActiveConnections();
            }
        }).groupBy((Func1)new Func1<EndpointChange, String>(){

            public String call(EndpointChange t1) {
                return Endpoint.uniqueHost((String)t1.getEndpoint().getHost(), (int)t1.getEndpoint().getPort(), (String)t1.getEndpoint().getSlotId());
            }
        }).flatMap(new Func1<GroupedObservable<String, EndpointChange>, Observable<Observable<T>>>(){

            public Observable<Observable<T>> call(final GroupedObservable<String, EndpointChange> group) {
                final PublishSubject closeConnectionTrigger = PublishSubject.create();
                return group.doOnNext((Action1)new Action1<EndpointChange>(){

                    public void call(EndpointChange change) {
                        if (EndpointChange.Type.complete == change.getType() && DynamicConnectionSet.this.activeConnectionsContains((String)group.getKey(), change.getEndpoint())) {
                            logger.info("Received complete request, removing connection from active set, " + change.getEndpoint().getHost() + " port: " + change.getEndpoint().getPort() + " id: " + change.getEndpoint().getSlotId());
                            DynamicConnectionSet.this.forceCompletedConnections.increment();
                            DynamicConnectionSet.this.removeConnection((String)group.getKey(), change.getEndpoint());
                            closeConnectionTrigger.onNext((Object)1);
                        }
                    }
                }).filter((Func1)new Func1<EndpointChange, Boolean>(){

                    public Boolean call(EndpointChange change) {
                        boolean contains = DynamicConnectionSet.this.activeConnectionsContains((String)group.getKey(), change.getEndpoint());
                        if (contains) {
                            logger.info("Skipping latent add for endpoint, already in active set: " + change);
                        }
                        return EndpointChange.Type.add == change.getType() && !contains;
                    }
                }).map(new Func1<EndpointChange, Observable<T>>(){

                    public Observable<T> call(final EndpointChange toAdd) {
                        logger.info("Received add request, adding connection to active set, " + toAdd.getEndpoint().getHost() + " port: " + toAdd.getEndpoint().getPort() + ", with client id: " + toAdd.getEndpoint().getSlotId());
                        DynamicConnectionSet.this.addConnection((String)group.getKey(), toAdd.getEndpoint());
                        Action0 disconnectCallback = new Action0(){

                            public void call() {
                                int timeToWait = DynamicConnectionSet.this.random.nextInt(DynamicConnectionSet.this.maxTimeoutOnUnexpectedTerminateSec - DynamicConnectionSet.this.minTimeoutOnUnexpectedTerminateSec + 1) + DynamicConnectionSet.this.minTimeoutOnUnexpectedTerminateSec;
                                logger.info("Connection disconnected, waiting " + timeToWait + " seconds before removing from active set of connections: " + toAdd);
                                Observable.timer((long)timeToWait, (TimeUnit)TimeUnit.SECONDS).doOnCompleted(new Action0(){

                                    public void call() {
                                        logger.warn("Removing connection from active set, " + toAdd);
                                        DynamicConnectionSet.this.closedConnections.increment();
                                        DynamicConnectionSet.this.removeConnection((String)group.getKey(), toAdd.getEndpoint());
                                    }
                                }).subscribe();
                            }
                        };
                        RemoteRxConnection connection = (RemoteRxConnection)DynamicConnectionSet.this.toObservableFunc.call((Object)toAdd.getEndpoint(), (Object)disconnectCallback, (Object)closeConnectionTrigger);
                        return connection.getObservable().doOnCompleted(toAdd.getEndpoint().getCompletedCallback()).doOnError(toAdd.getEndpoint().getErrorCallback());
                    }
                });
            }
        });
    }

    @Override
    public Observable<Set<Endpoint>> activeConnections() {
        return this.activeConnectionsSubject;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean activeConnectionsContains(String id, Endpoint endpoint) {
        try {
            this.activeConnectionsLock.lock();
            boolean bl = this.currentActiveConnections.containsKey(id);
            return bl;
        }
        finally {
            this.activeConnectionsLock.unlock();
        }
    }

    public void resetActiveConnections() {
        try {
            this.activeConnectionsLock.lock();
            this.currentActiveConnections.clear();
            this.activeConnectionsGauge.set(0L);
            this.activeConnectionsSubject.onNext(new HashSet());
        }
        finally {
            this.activeConnectionsLock.unlock();
        }
    }

    public void addConnection(String id, Endpoint toAdd) {
        try {
            this.activeConnectionsLock.lock();
            if (!this.currentActiveConnections.containsKey(id)) {
                this.currentActiveConnections.put(id, new Endpoint(toAdd.getHost(), toAdd.getPort(), toAdd.getSlotId(), toAdd.getCompletedCallback(), toAdd.getErrorCallback()));
                this.activeConnectionsGauge.increment();
                this.activeConnectionsSubject.onNext(new HashSet<Endpoint>(this.currentActiveConnections.values()));
            }
        }
        finally {
            this.activeConnectionsLock.unlock();
        }
    }

    public void removeConnection(String id, Endpoint toRemove) {
        try {
            this.activeConnectionsLock.lock();
            if (this.currentActiveConnections.containsKey(id)) {
                this.currentActiveConnections.remove(id);
                this.activeConnectionsGauge.decrement();
                this.activeConnectionsSubject.onNext(new HashSet<Endpoint>(this.currentActiveConnections.values()));
            }
        }
        finally {
            this.activeConnectionsLock.unlock();
        }
    }
}

