/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.api.push;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.netflix.spectator.api.Counter;
import com.netflix.zuul.netty.SpectatorUtils;
import io.mantisrx.api.Util;
import io.mantisrx.api.push.PushConnectionDetails;
import io.mantisrx.api.services.JobDiscoveryService;
import io.mantisrx.api.tunnel.MantisCrossRegionalClient;
import io.mantisrx.client.MantisClient;
import io.mantisrx.client.SinkConnectionFunc;
import io.mantisrx.client.SseSinkConnectionFunction;
import io.mantisrx.common.MantisServerSentEvent;
import io.mantisrx.runtime.parameter.SinkParameters;
import io.mantisrx.server.worker.client.MetricsClient;
import io.mantisrx.server.worker.client.SseWorkerConnectionFunction;
import io.mantisrx.server.worker.client.WorkerConnectionFunc;
import io.mantisrx.server.worker.client.WorkerConnectionsStatus;
import io.mantisrx.server.worker.client.WorkerMetricsClient;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import io.vavr.CheckedFunction0;
import io.vavr.control.Try;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientRequest;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientResponse;
import mantis.io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.functions.Action1;

@Singleton
public class ConnectionBroker {
    private static final Logger log = LoggerFactory.getLogger(ConnectionBroker.class);
    private final MantisClient mantisClient;
    private final MantisCrossRegionalClient mantisCrossRegionalClient;
    private final WorkerMetricsClient workerMetricsClient;
    private final JobDiscoveryService jobDiscoveryService;
    private final Scheduler scheduler;
    private final ObjectMapper objectMapper;
    private final Map<PushConnectionDetails, Observable<String>> connectionCache = new WeakHashMap<PushConnectionDetails, Observable<String>>();

    @Inject
    public ConnectionBroker(MantisClient mantisClient, MantisCrossRegionalClient mantisCrossRegionalClient, WorkerMetricsClient workerMetricsClient, @Named(value="io-scheduler") Scheduler scheduler, ObjectMapper objectMapper) {
        this.mantisClient = mantisClient;
        this.mantisCrossRegionalClient = mantisCrossRegionalClient;
        this.workerMetricsClient = workerMetricsClient;
        this.jobDiscoveryService = JobDiscoveryService.getInstance(mantisClient, scheduler);
        this.scheduler = scheduler;
        this.objectMapper = objectMapper;
    }

    public Observable<String> connect(PushConnectionDetails details) {
        if (!this.connectionCache.containsKey(details)) {
            switch (details.type) {
                case CONNECT_BY_NAME: {
                    return this.getConnectByNameFor(details).subscribeOn(this.scheduler).doOnUnsubscribe(() -> {
                        log.info("Purging {} from cache.", (Object)details);
                        this.connectionCache.remove(details);
                    }).doOnCompleted(() -> {
                        log.info("Purging {} from cache.", (Object)details);
                        this.connectionCache.remove(details);
                    }).share();
                }
                case CONNECT_BY_ID: {
                    return this.getConnectByIdFor(details).subscribeOn(this.scheduler).doOnUnsubscribe(() -> {
                        log.info("Purging {} from cache.", (Object)details);
                        this.connectionCache.remove(details);
                    }).doOnCompleted(() -> {
                        log.info("Purging {} from cache.", (Object)details);
                        this.connectionCache.remove(details);
                    }).share();
                }
                case METRICS: {
                    return this.getWorkerMetrics(details).subscribeOn(this.scheduler).doOnUnsubscribe(() -> {
                        log.info("Purging {} from cache.", (Object)details);
                        this.connectionCache.remove(details);
                    }).doOnCompleted(() -> {
                        log.info("Purging {} from cache.", (Object)details);
                        this.connectionCache.remove(details);
                    });
                }
                case JOB_STATUS: {
                    this.connectionCache.put(details, (Observable<String>)this.mantisClient.getJobStatusObservable(details.target).subscribeOn(this.scheduler).doOnCompleted(() -> {
                        log.info("Purging {} from cache.", (Object)details);
                        this.connectionCache.remove(details);
                    }).doOnUnsubscribe(() -> {
                        log.info("Purging {} from cache.", (Object)details);
                        this.connectionCache.remove(details);
                    }).replay(25).autoConnect());
                    break;
                }
                case JOB_SCHEDULING_INFO: {
                    this.connectionCache.put(details, (Observable<String>)this.mantisClient.getSchedulingChanges(details.target).subscribeOn(this.scheduler).map(changes -> (String)Try.of((CheckedFunction0 & Serializable)() -> this.objectMapper.writeValueAsString(changes)).getOrElse((Object)"Error")).doOnCompleted(() -> {
                        log.info("Purging {} from cache.", (Object)details);
                        this.connectionCache.remove(details);
                    }).doOnUnsubscribe(() -> {
                        log.info("Purging {} from cache.", (Object)details);
                        this.connectionCache.remove(details);
                    }).replay(1).autoConnect());
                    break;
                }
                case JOB_CLUSTER_DISCOVERY: {
                    this.connectionCache.put(details, (Observable<String>)this.jobDiscoveryService.jobDiscoveryInfoStream(this.jobDiscoveryService.key(JobDiscoveryService.LookupType.JOB_CLUSTER, details.target)).subscribeOn(this.scheduler).map(jdi -> (String)Try.of((CheckedFunction0 & Serializable)() -> this.objectMapper.writeValueAsString(jdi)).getOrElse((Object)"Error")).doOnCompleted(() -> {
                        log.info("Purging {} from cache.", (Object)details);
                        this.connectionCache.remove(details);
                    }).doOnUnsubscribe(() -> {
                        log.info("Purging {} from cache.", (Object)details);
                        this.connectionCache.remove(details);
                    }).replay(1).autoConnect());
                }
            }
            log.info("Caching connection for: {}", (Object)details);
        }
        return this.connectionCache.get(details);
    }

    private Observable<String> getConnectByNameFor(PushConnectionDetails details) {
        return details.regions.isEmpty() ? ConnectionBroker.getResults(false, this.mantisClient, details.target, details.getSinkparameters()).flatMap(m -> m).map(MantisServerSentEvent::getEventAsString) : this.getRemoteDataObservable(details.getUri(), details.target, details.getRegions().asJava());
    }

    private Observable<String> getConnectByIdFor(PushConnectionDetails details) {
        return details.getRegions().isEmpty() ? ConnectionBroker.getResults(true, this.mantisClient, details.target, details.getSinkparameters()).flatMap(m -> m).map(MantisServerSentEvent::getEventAsString) : this.getRemoteDataObservable(details.getUri(), details.target, details.getRegions().asJava());
    }

    private static SinkConnectionFunc<MantisServerSentEvent> getSseConnFunc(String target, SinkParameters sinkParameters) {
        return new SseSinkConnectionFunction(true, t -> log.warn("Reconnecting to sink of job " + target + " after error: " + t.getMessage()), sinkParameters);
    }

    private static Observable<Observable<MantisServerSentEvent>> getResults(boolean isJobId, MantisClient mantisClient, String target, SinkParameters sinkParameters) {
        AtomicBoolean hasError = new AtomicBoolean();
        return isJobId ? mantisClient.getSinkClientByJobId(target, ConnectionBroker.getSseConnFunc(target, sinkParameters), null).getResults() : mantisClient.getSinkClientByJobName(target, ConnectionBroker.getSseConnFunc(target, sinkParameters), null).switchMap(serverSentEventSinkClient -> {
            if (serverSentEventSinkClient.hasError()) {
                hasError.set(true);
                return Observable.error((Throwable)new Exception(serverSentEventSinkClient.getError()));
            }
            return serverSentEventSinkClient.getResults();
        }).takeWhile(o -> !hasError.get());
    }

    private Observable<String> getRemoteDataObservable(String uri, String target, List<String> regions) {
        return Observable.from(regions).flatMap(region -> {
            String originReplacement = "\\{\"mantis.meta.origin\": \"" + region + "\", ";
            if (region.equalsIgnoreCase(Util.getLocalRegion())) {
                return this.connect(PushConnectionDetails.from(uri)).map(datum -> datum.replaceFirst("^\\{", originReplacement));
            }
            log.info("Connecting to remote region {} at {}.", region, (Object)uri);
            return this.mantisCrossRegionalClient.getSecureSseClient((String)region).submit(HttpClientRequest.createGet((String)uri)).retryWhen(Util.getRetryFunc(log, uri + " in " + region)).doOnError(throwable -> log.warn("Error getting response from remote SSE server for uri {} in region {}: {}", new Object[]{uri, region, throwable.getMessage(), throwable})).flatMap(remoteResponse -> {
                if (!remoteResponse.getStatus().reasonPhrase().equals("OK")) {
                    log.warn("Unexpected response from remote sink for uri {} region {}: {}", new Object[]{uri, region, remoteResponse.getStatus().reasonPhrase()});
                    String err = remoteResponse.getHeaders().get("mantis.meta.error.message");
                    if (err == null || err.isEmpty()) {
                        err = remoteResponse.getStatus().reasonPhrase();
                    }
                    return Observable.error((Throwable)new Exception(err)).map(datum -> datum.getEventAsString());
                }
                return this.clientResponseToObservable((HttpClientResponse<ServerSentEvent>)remoteResponse, target, (String)region, uri).map(datum -> datum.replaceFirst("^\\{", originReplacement)).doOnError(t -> log.error(t.getMessage()));
            }).subscribeOn(this.scheduler).observeOn(this.scheduler).doOnError(t -> log.warn("Error streaming in remote data ({}). Will retry: {}", new Object[]{region, t.getMessage(), t})).doOnCompleted(() -> log.info(String.format("remote sink connection complete for uri %s, region=%s", uri, region)));
        }).observeOn(this.scheduler).subscribeOn(this.scheduler).doOnError(t -> log.error("Error in flatMapped cross-regional observable for {}", (Object)uri, t));
    }

    private Observable<String> clientResponseToObservable(HttpClientResponse<ServerSentEvent> response, String target, String region, String uri) {
        Counter numRemoteBytes = SpectatorUtils.newCounter((String)"numRemoteSinkBytes", (String)target, (String[])new String[]{"region", region});
        Counter numRemoteMessages = SpectatorUtils.newCounter((String)"numRemoteMessages", (String)target, (String[])new String[]{"region", region});
        Counter numSseErrors = SpectatorUtils.newCounter((String)"numSseErrors", (String)target, (String[])new String[]{"region", region});
        return response.getContent().doOnError(t -> log.warn(t.getMessage())).timeout(36L, TimeUnit.SECONDS).doOnError(t -> log.warn("Timeout getting data from remote {} connection for {}", (Object)region, (Object)uri)).filter(sse -> sse.hasEventType() && sse.getEventTypeAsString().startsWith("error:") || !"MantisApiTunnelPing".equals(sse.contentAsString())).map(t1 -> {
            String data = "";
            if (t1.hasEventType() && t1.getEventTypeAsString().startsWith("error:")) {
                log.error("SSE has error, type=" + t1.getEventTypeAsString() + ", content=" + t1.contentAsString());
                numSseErrors.increment();
                throw new RuntimeException("Got error SSE event: " + t1.contentAsString());
            }
            try {
                data = t1.contentAsString();
                if (data != null) {
                    numRemoteBytes.increment((long)data.length());
                    numRemoteMessages.increment();
                }
            }
            catch (Exception e) {
                log.error("Could not extract data from SSE " + e.getMessage(), (Throwable)e);
            }
            return data;
        });
    }

    private Observable<String> getWorkerMetrics(PushConnectionDetails details) {
        String jobId = details.target;
        SinkParameters metricNamesFilter = details.getSinkparameters();
        MetricsClient metricsClient = this.workerMetricsClient.getMetricsClientByJobId(jobId, (WorkerConnectionFunc)new SseWorkerConnectionFunction(true, (Action1)new Action1<Throwable>(){

            public void call(Throwable throwable) {
                log.error("Metric connection error: " + throwable.getMessage());
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException ie) {
                    log.error("Interrupted waiting for retrying connection");
                }
            }
        }, metricNamesFilter), (Observer)new Observer<WorkerConnectionsStatus>(){

            public void onCompleted() {
                log.info("got onCompleted in WorkerConnStatus obs");
            }

            public void onError(Throwable e) {
                log.info("got onError in WorkerConnStatus obs");
            }

            public void onNext(WorkerConnectionsStatus workerConnectionsStatus) {
                log.info("got WorkerConnStatus {}", (Object)workerConnectionsStatus);
            }
        });
        return metricsClient.getResults().flatMap(metrics -> metrics.map(MantisServerSentEvent::getEventAsString));
    }
}

