/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.runtime.source.http.impl;

import com.mantisrx.common.utils.MantisMetricStringConstants;
import com.mantisrx.common.utils.NettyUtils;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.source.Index;
import io.mantisrx.runtime.source.Source;
import io.mantisrx.runtime.source.http.ClientResumePolicy;
import io.mantisrx.runtime.source.http.HttpClientFactory;
import io.mantisrx.runtime.source.http.HttpRequestFactory;
import io.mantisrx.runtime.source.http.HttpServerProvider;
import io.mantisrx.runtime.source.http.impl.OperatorResumeOnCompleted;
import io.mantisrx.runtime.source.http.impl.OperatorResumeOnError;
import io.mantisrx.runtime.source.http.impl.ResumeOnCompletedPolicy;
import io.mantisrx.runtime.source.http.impl.ResumeOnErrorPolicy;
import io.mantisrx.runtime.source.http.impl.ServerClientContext;
import io.mantisrx.runtime.source.http.impl.ServerContext;
import io.mantisrx.server.core.ServiceRegistry;
import io.mantisrx.shaded.io.netty.util.ReferenceCountUtil;
import io.reactivx.mantis.operators.DropOperator;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import mantis.io.reactivex.netty.client.RxClient;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;

public class HttpSourceImpl<R, E, T>
implements Source<T> {
    private static final String DEFAULT_BUFFER_SIZE = "0";
    private static Logger logger = LoggerFactory.getLogger(HttpSourceImpl.class);
    private final HttpRequestFactory<R> requestFactory;
    private final HttpServerProvider serverProvider;
    private final HttpClientFactory<R, E> clientFactory;
    private final Observer<HttpSourceEvent> observer;
    private final Func2<ServerContext<HttpClientResponse<E>>, E, T> postProcessor;
    private final ClientResumePolicy<R, E> resumePolicy;
    private final PublishSubject<RxClient.ServerInfo> serversToRemove;
    private final Gauge connectionGauge;
    private final Gauge retryListGauge;
    private final Gauge connectionAttemptedGauge;
    private final Counter connectionEstablishedCounter;
    private final Counter connectionUnsubscribedCounter;
    private final Counter sourceCompletedCounter;
    private final Counter subscriptionEndedCounter;
    private final Counter subscriptionEstablishedCounter;
    private final Counter subscriptionFailedCounter;
    private final Counter serverFoundCounter;
    private final Counter subscriptionCancelledCounter;
    private final Counter dropped;
    private final Metrics incomingDataMetrics;
    private final ConnectionManager<E> connectionManager = new ConnectionManager();
    private final int bufferSize;

    HttpSourceImpl(HttpRequestFactory<R> requestFactory, HttpServerProvider serverProvider, HttpClientFactory<R, E> clientFactory, Observer<HttpSourceEvent> observer, Func2<ServerContext<HttpClientResponse<E>>, E, T> postProcessor, ClientResumePolicy<R, E> resumePolicy) {
        this.requestFactory = requestFactory;
        this.serverProvider = serverProvider;
        this.clientFactory = clientFactory;
        this.observer = observer;
        this.postProcessor = postProcessor;
        this.resumePolicy = resumePolicy;
        Metrics m = new Metrics.Builder().name(HttpSourceImpl.class.getCanonicalName()).addGauge("connectionGauge").addGauge("retryListGauge").addGauge("connectionAttemptedGauge").addCounter("connectionEstablishedCounter").addCounter("connectionUnsubscribedCounter").addCounter("sourceCompletedCounter").addCounter("subscriptionEndedCounter").addCounter("subscriptionEstablishedCounter").addCounter("subscriptionFailedCounter").addCounter("serverFoundCounter").addCounter("subscriptionCancelledCounter").build();
        m = MetricsRegistry.getInstance().registerAndGet(m);
        this.connectionGauge = m.getGauge("connectionGauge");
        this.retryListGauge = m.getGauge("retryListGauge");
        this.connectionAttemptedGauge = m.getGauge("connectionAttemptedGauge");
        this.connectionEstablishedCounter = m.getCounter("connectionEstablishedCounter");
        this.connectionUnsubscribedCounter = m.getCounter("connectionUnsubscribedCounter");
        this.sourceCompletedCounter = m.getCounter("sourceCompletedCounter");
        this.subscriptionEndedCounter = m.getCounter("subscriptionEndedCounter");
        this.subscriptionEstablishedCounter = m.getCounter("subscriptionEstablishedCounter");
        this.subscriptionFailedCounter = m.getCounter("subscriptionFailedCounter");
        this.serverFoundCounter = m.getCounter("serverFoundCounter");
        this.subscriptionCancelledCounter = m.getCounter("subscriptionCancelledCounter");
        this.incomingDataMetrics = new Metrics.Builder().name(MantisMetricStringConstants.DROP_OPERATOR_INCOMING_METRIC_GROUP + "_HttpSourceImpl").addCounter("onNext").addCounter("onError").addCounter("onComplete").addGauge("subscribe").addCounter("dropped").addGauge("requested").addGauge("bufferedGauge").build();
        MetricsRegistry.getInstance().registerAndGet(this.incomingDataMetrics);
        this.dropped = this.incomingDataMetrics.getCounter("dropped");
        String bufferSizeStr = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("httpSource.buffer.size", DEFAULT_BUFFER_SIZE);
        this.bufferSize = Integer.parseInt(bufferSizeStr);
        this.serversToRemove = PublishSubject.create();
        serverProvider.getServersToRemove().subscribe(new Action1<RxClient.ServerInfo>(){

            @Override
            public void call(RxClient.ServerInfo server2) {
                HttpSourceImpl.this.serversToRemove.onNext(server2);
            }
        });
    }

    public static <R, E, T> Builder<R, E, T> builder(HttpClientFactory<R, E> clientFactory, HttpRequestFactory<R> requestFactory, Func2<ServerContext<HttpClientResponse<E>>, E, T> postProcessor) {
        return new Builder<R, E, T>(clientFactory, requestFactory, postProcessor);
    }

    public static <R, E, T> Builder<R, E, T> builder(HttpClientFactory<R, E> clientFactory, HttpRequestFactory<R> requestFactory, Func2<ServerContext<HttpClientResponse<E>>, E, T> postProcessor, ClientResumePolicy<R, E> resumePolicy) {
        return new Builder<R, E, T>(clientFactory, requestFactory, postProcessor, resumePolicy);
    }

    public static <E> Func2<ServerContext<HttpClientResponse<E>>, E, ServerContext<E>> contextWrapper() {
        return new Func2<ServerContext<HttpClientResponse<E>>, E, ServerContext<E>>(){

            @Override
            public ServerContext<E> call(ServerContext<HttpClientResponse<E>> context, E e2) {
                return new ServerContext(context.getServer(), e2);
            }
        };
    }

    public static <E> Func2<ServerContext<HttpClientResponse<E>>, E, E> identityConverter() {
        return new Func2<ServerContext<HttpClientResponse<E>>, E, E>(){

            @Override
            public E call(ServerContext<HttpClientResponse<E>> httpClientResponseServerContext, E e2) {
                return e2;
            }
        };
    }

    @Override
    public Observable<Observable<T>> call(Context context, Index index2) {
        return this.serverProvider.getServersToAdd().filter(serverInfo -> !this.connectionManager.alreadyConnected((RxClient.ServerInfo)serverInfo) && !this.connectionManager.connectionAlreadyAttempted((RxClient.ServerInfo)serverInfo)).flatMap(serverInfo -> this.streamServers(Observable.just(serverInfo))).doOnError(error -> {
            logger.error(String.format("The source encountered an error " + error.getMessage(), error));
            this.observer.onError((Throwable)error);
        }).doAfterTerminate(() -> {
            this.observer.onCompleted();
            this.connectionManager.reset();
        }).lift(new Observable.Operator<Observable<T>, Observable<T>>(){

            @Override
            public Subscriber<? super Observable<T>> call(Subscriber<? super Observable<T>> subscriber2) {
                subscriber2.add(Subscriptions.create(new Action0(){

                    @Override
                    public void call() {
                        HttpSourceImpl.this.connectionManager.reset();
                    }
                }));
                return subscriber2;
            }
        });
    }

    private Observable<Observable<T>> streamServers(Observable<RxClient.ServerInfo> servers) {
        return servers.map(server2 -> {
            HttpSourceEvent.EventType.SERVER_FOUND.newEvent(this.observer, (RxClient.ServerInfo)server2);
            this.serverFoundCounter.increment();
            return new ServerClientContext<R, E>((RxClient.ServerInfo)server2, this.clientFactory.createClient((RxClient.ServerInfo)server2), this.requestFactory, this.observer);
        }).flatMap(clientContext -> {
            Observable<HttpClientResponse<E>> response = this.streamResponseUntilServerIsRemoved((ServerClientContext<R, E>)clientContext);
            return response.map(new Func1<HttpClientResponse<E>, ServerContext<HttpClientResponse<E>>>((ServerClientContext)clientContext){
                final /* synthetic */ ServerClientContext val$clientContext;
                {
                    this.val$clientContext = serverClientContext;
                }

                @Override
                public ServerContext<HttpClientResponse<E>> call(HttpClientResponse<E> response) {
                    HttpSourceEvent.EventType.CONNECTION_ESTABLISHED.newEvent(HttpSourceImpl.this.observer, this.val$clientContext.getServer());
                    HttpSourceImpl.this.connectionEstablishedCounter.increment();
                    return new ServerContext(this.val$clientContext.getServer(), response);
                }
            }).lift(new Observable.Operator<ServerContext<HttpClientResponse<E>>, ServerContext<HttpClientResponse<E>>>((ServerClientContext)clientContext){
                final /* synthetic */ ServerClientContext val$clientContext;
                {
                    this.val$clientContext = serverClientContext;
                }

                @Override
                public Subscriber<? super ServerContext<HttpClientResponse<E>>> call(Subscriber<? super ServerContext<HttpClientResponse<E>>> subscriber2) {
                    subscriber2.add(Subscriptions.create(new Action0(){

                        @Override
                        public void call() {
                            HttpSourceImpl.this.connectionUnsubscribedCounter.increment();
                            HttpSourceEvent.EventType.CONNECTION_UNSUBSCRIBED.newEvent(HttpSourceImpl.this.observer, val$clientContext.getServer());
                        }
                    }));
                    return subscriber2;
                }
            });
        }).map(new Func1<ServerContext<HttpClientResponse<E>>, Observable<T>>(){

            @Override
            public Observable<T> call(final ServerContext<HttpClientResponse<E>> context) {
                HttpClientResponse response = context.getValue();
                RxClient.ServerInfo server2 = context.getServer();
                HttpSourceEvent.EventType.SUBSCRIPTION_ESTABLISHED.newEvent(HttpSourceImpl.this.observer, server2);
                HttpSourceImpl.this.subscriptionEstablishedCounter.increment();
                HttpSourceImpl.this.connectionManager.serverConnected(server2, context.getValue());
                HttpSourceImpl.this.connectionGauge.set(HttpSourceImpl.this.getConnectedServers().size());
                return HttpSourceImpl.this.streamResponseContent(server2, response).map(new Func1<E, T>(){

                    @Override
                    public T call(E e2) {
                        ReferenceCountUtil.retain(e2);
                        return HttpSourceImpl.this.postProcessor.call(context, e2);
                    }
                }).lift(new DropOperator(HttpSourceImpl.this.incomingDataMetrics)).lift(new Observable.Operator<T, T>(){

                    @Override
                    public Subscriber<? super T> call(Subscriber<? super T> subscriber2) {
                        subscriber2.add(Subscriptions.create(new Action0(){

                            @Override
                            public void call() {
                                HttpSourceEvent.EventType.SUBSCRIPTION_ENDED.newEvent(HttpSourceImpl.this.observer, context.getServer());
                                HttpSourceImpl.this.subscriptionEndedCounter.increment();
                            }
                        }));
                        return subscriber2;
                    }
                });
            }
        });
    }

    private void checkResponseIsSuccessful(HttpClientResponse<E> response) {
        int status = response.getStatus().code();
        if (status != 200) {
            throw new RuntimeException(String.format("Expected 200 but got status %d and reason: %s", status, response.getStatus().reasonPhrase()));
        }
    }

    private Observable<E> streamResponseContent(RxClient.ServerInfo server2, HttpClientResponse<E> response) {
        return response.getContent().takeUntil(this.serversToRemove.filter(toRemove -> toRemove != null && toRemove.equals(server2))).doOnError(throwable -> {
            HttpSourceEvent.EventType.SUBSCRIPTION_FAILED.newEvent(this.observer, server2);
            this.subscriptionFailedCounter.increment();
            this.retryListGauge.set(this.getRetryServers().size());
            logger.info("server disconnected onError1: " + server2);
            this.connectionManager.serverDisconnected(server2);
            this.connectionGauge.set(this.getConnectedServers().size());
        }).onErrorResumeNext(Observable.empty()).doOnCompleted(() -> {
            HttpSourceEvent.EventType.SOURCE_COMPLETED.newEvent(this.observer, server2);
            this.sourceCompletedCounter.increment();
            logger.info("server disconnected onComplete1: " + server2);
            this.connectionManager.serverDisconnected(server2);
            this.retryListGauge.set(this.getRetryServers().size());
            this.connectionGauge.set(this.getConnectedServers().size());
        });
    }

    private Observable<HttpClientResponse<E>> streamResponseUntilServerIsRemoved(final ServerClientContext<R, E> clientContext) {
        return clientContext.newResponse(t -> {
            this.connectionAttemptedGauge.set(this.getConnectionAttemptedServers().size());
            this.connectionManager.serverConnectionAttempted((RxClient.ServerInfo)t);
        }).lift(new OperatorResumeOnError(new ResumeOnErrorPolicy<HttpClientResponse<E>>(){

            @Override
            public Observable<HttpClientResponse<E>> call(Integer attempts, Throwable error) {
                return HttpSourceImpl.this.resumePolicy.onError(clientContext, attempts, error);
            }
        })).lift(new OperatorResumeOnCompleted(new ResumeOnCompletedPolicy<HttpClientResponse<E>>(){

            @Override
            public Observable<HttpClientResponse<E>> call(Integer attempts) {
                return HttpSourceImpl.this.resumePolicy.onCompleted(clientContext, attempts);
            }
        })).takeUntil(this.serversToRemove.filter(toRemove -> {
            boolean shouldUnsubscribe;
            boolean bl = shouldUnsubscribe = toRemove != null && toRemove.equals(clientContext.getServer());
            if (shouldUnsubscribe) {
                this.subscriptionCancelledCounter.increment();
                HttpSourceEvent.EventType.SUBSCRIPTION_CANCELED.newEvent(this.observer, (RxClient.ServerInfo)toRemove);
            }
            return shouldUnsubscribe;
        }).doOnNext(server2 -> {
            logger.info("server removed: " + server2);
            this.connectionManager.serverRemoved((RxClient.ServerInfo)server2);
            this.connectionGauge.set(this.getConnectedServers().size());
        })).doOnNext(response -> this.checkResponseIsSuccessful((HttpClientResponse<E>)response)).doOnError(error -> {
            logger.error(String.format("Connecting to server %s failed: %s", clientContext.getServer(), error.getMessage()), (Throwable)error);
            HttpSourceEvent.EventType.SUBSCRIPTION_FAILED.newEvent(this.observer, clientContext.getServer());
            this.subscriptionFailedCounter.increment();
            logger.info("server disconnected onError2: " + clientContext.getServer());
            this.connectionManager.serverDisconnected(clientContext.getServer());
            this.retryListGauge.set(this.getRetryServers().size());
            this.connectionGauge.set(this.connectionManager.getConnectedServers().size());
        }).doOnCompleted(() -> {}).onErrorResumeNext(Observable.empty());
    }

    Set<RxClient.ServerInfo> getConnectedServers() {
        return this.connectionManager.getConnectedServers();
    }

    Set<RxClient.ServerInfo> getRetryServers() {
        return this.connectionManager.getRetryServers();
    }

    Set<RxClient.ServerInfo> getConnectionAttemptedServers() {
        return this.connectionManager.getConnectionAttemptedServers();
    }

    static {
        NettyUtils.setNettyThreads();
    }

    private static class ConnectionManager<E> {
        private final ConcurrentMap<RxClient.ServerInfo, HttpClientResponse<E>> connectedServers = new ConcurrentHashMap<RxClient.ServerInfo, HttpClientResponse<E>>();
        private final Set<RxClient.ServerInfo> retryServers = new CopyOnWriteArraySet<RxClient.ServerInfo>();
        private final Set<RxClient.ServerInfo> connectionAttempted = new CopyOnWriteArraySet<RxClient.ServerInfo>();

        private ConnectionManager() {
        }

        public void serverConnected(RxClient.ServerInfo server2, HttpClientResponse<E> response) {
            this.connectedServers.put(server2, response);
            this.retryServers.remove(server2);
            this.connectionAttempted.remove(server2);
            logger.info("CM: Server connected: " + server2 + " count " + this.connectedServers.size());
        }

        public void serverConnectionAttempted(RxClient.ServerInfo server2) {
            this.connectionAttempted.add(server2);
        }

        public boolean alreadyConnected(RxClient.ServerInfo server2) {
            return this.connectedServers.containsKey(server2);
        }

        public boolean connectionAlreadyAttempted(RxClient.ServerInfo server2) {
            return this.connectionAttempted.contains(server2);
        }

        public void serverDisconnected(RxClient.ServerInfo server2) {
            this.connectedServers.remove(server2);
            this.connectionAttempted.remove(server2);
            this.retryServers.add(server2);
            logger.info("CM: Server disconnected: " + server2 + " count " + this.connectedServers.size());
        }

        public void serverRemoved(RxClient.ServerInfo server2) {
            this.connectedServers.remove(server2);
            this.connectionAttempted.remove(server2);
            this.retryServers.remove(server2);
            logger.info("CM: Server removed: " + server2 + " count " + this.connectedServers.size());
        }

        public Set<RxClient.ServerInfo> getConnectedServers() {
            return Collections.unmodifiableSet(this.connectedServers.keySet());
        }

        public Set<RxClient.ServerInfo> getRetryServers() {
            return Collections.unmodifiableSet(this.retryServers);
        }

        public Set<RxClient.ServerInfo> getConnectionAttemptedServers() {
            return Collections.unmodifiableSet(this.connectionAttempted);
        }

        public void reset() {
            this.connectedServers.clear();
            this.connectionAttempted.clear();
            this.retryServers.clear();
            logger.info("CM: reset");
        }
    }

    public static class HttpSourceEvent {
        private final RxClient.ServerInfo server;
        private final EventType eventType;

        private HttpSourceEvent(RxClient.ServerInfo server2, EventType eventType) {
            this.server = server2;
            this.eventType = eventType;
        }

        public RxClient.ServerInfo getServer() {
            return this.server;
        }

        public EventType getEventType() {
            return this.eventType;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            HttpSourceEvent that = (HttpSourceEvent)o;
            if (this.eventType != that.eventType) {
                return false;
            }
            return !(this.server != null ? !this.server.equals(that.server) : that.server != null);
        }

        public int hashCode() {
            int result = this.server != null ? this.server.hashCode() : 0;
            result = 31 * result + (this.eventType != null ? this.eventType.hashCode() : 0);
            return result;
        }

        public static enum EventType {
            SERVER_FOUND,
            CONNECTION_ATTEMPTED,
            CONNECTION_ESTABLISHED,
            CONNECTION_UNSUBSCRIBED,
            SOURCE_COMPLETED,
            SUBSCRIPTION_ESTABLISHED,
            SUBSCRIPTION_FAILED,
            SUBSCRIPTION_CANCELED,
            SUBSCRIPTION_ENDED;


            public HttpSourceEvent newEvent(Observer<HttpSourceEvent> observer, RxClient.ServerInfo server2) {
                HttpSourceEvent event = new HttpSourceEvent(server2, this);
                observer.onNext(event);
                return event;
            }
        }
    }

    public static class Builder<R, E, T> {
        public static final HttpServerProvider EMPTY_HTTP_SERVER_PROVIDER = new HttpServerProvider(){

            @Override
            public Observable<RxClient.ServerInfo> getServersToAdd() {
                return Observable.empty();
            }

            @Override
            public Observable<RxClient.ServerInfo> getServersToRemove() {
                return Observable.empty();
            }
        };
        private HttpRequestFactory<R> requestFactory;
        private HttpServerProvider serverProvider;
        private HttpClientFactory<R, E> httpClientFactory;
        private Observer<HttpSourceEvent> observer;
        private Func2<ServerContext<HttpClientResponse<E>>, E, T> postProcessor;
        private ClientResumePolicy<R, E> clientResumePolicy;

        public Builder(HttpClientFactory<R, E> clientFactory, HttpRequestFactory<R> requestFactory, Func2<ServerContext<HttpClientResponse<E>>, E, T> postProcessor) {
            this.requestFactory = requestFactory;
            this.httpClientFactory = clientFactory;
            this.serverProvider = EMPTY_HTTP_SERVER_PROVIDER;
            this.postProcessor = postProcessor;
            this.clientResumePolicy = new ClientResumePolicy<R, E>(){

                @Override
                public Observable<HttpClientResponse<E>> onError(ServerClientContext<R, E> clientContext, int attempts, Throwable error) {
                    return null;
                }

                @Override
                public Observable<HttpClientResponse<E>> onCompleted(ServerClientContext<R, E> clientContext, int attempts) {
                    return null;
                }
            };
            this.observer = PublishSubject.create();
        }

        public Builder(HttpClientFactory<R, E> clientFactory, HttpRequestFactory<R> requestFactory, Func2<ServerContext<HttpClientResponse<E>>, E, T> postProcessor, ClientResumePolicy<R, E> resumePolicy) {
            this.requestFactory = requestFactory;
            this.httpClientFactory = clientFactory;
            this.serverProvider = EMPTY_HTTP_SERVER_PROVIDER;
            this.postProcessor = postProcessor;
            this.clientResumePolicy = resumePolicy;
            this.observer = PublishSubject.create();
        }

        public Builder<R, E, T> withServerProvider(HttpServerProvider serverProvider) {
            this.serverProvider = serverProvider;
            return this;
        }

        public Builder<R, E, T> withActivityObserver(Observer<HttpSourceEvent> observer) {
            this.observer = observer;
            return this;
        }

        public Builder<R, E, T> resumeWith(ClientResumePolicy<R, E> policy) {
            this.clientResumePolicy = policy;
            return this;
        }

        public HttpSourceImpl<R, E, T> build() {
            return new HttpSourceImpl<R, E, T>(this.requestFactory, this.serverProvider, this.httpClientFactory, this.observer, this.postProcessor, this.clientResumePolicy);
        }
    }

    private static class UnsubscriptionLoggingOperator<R>
    implements Observable.Operator<R, R> {
        private final String streamDesciption;

        public UnsubscriptionLoggingOperator(String streamDesciption) {
            this.streamDesciption = streamDesciption;
        }

        public static <R> UnsubscriptionLoggingOperator<R> create(String stream) {
            return new UnsubscriptionLoggingOperator<R>(stream);
        }

        @Override
        public Subscriber<? super R> call(Subscriber<? super R> subscriber2) {
            subscriber2.add(Subscriptions.create(new Action0(){

                @Override
                public void call() {
                    logger.debug("{} unsubscribed", (Object)streamDesciption);
                }
            }));
            return subscriber2;
        }
    }
}

