/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.server.worker.client;

import com.mantisrx.common.utils.MantisMetricStringConstants;
import io.mantisrx.common.MantisServerSentEvent;
import io.mantisrx.common.compression.CompressionUtils;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.runtime.parameter.SinkParameter;
import io.mantisrx.runtime.parameter.SinkParameters;
import io.netty.buffer.ByteBuf;
import io.reactivx.mantis.operators.DropOperator;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import mantis.io.reactivex.netty.RxNetty;
import mantis.io.reactivex.netty.pipeline.PipelineConfigurators;
import mantis.io.reactivex.netty.protocol.http.client.HttpClient;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientBuilder;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientRequest;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientResponse;
import mantis.io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.PublishSubject;

public class SseWorkerConnection {
    private static final Logger logger = LoggerFactory.getLogger(SseWorkerConnection.class);
    private static final String metricNamePrefix = MantisMetricStringConstants.DROP_OPERATOR_INCOMING_METRIC_GROUP;
    protected final PublishSubject<Boolean> shutdownSubject = PublishSubject.create();
    final AtomicLong lastDataReceived = new AtomicLong(System.currentTimeMillis());
    private final String connectionType;
    private final String hostname;
    private final int port;
    private final MetricGroupId metricGroupId;
    private final Counter pingCounter;
    private final boolean reconnectUponConnectionReset;
    private final Action1<Boolean> updateConxStatus;
    private final Action1<Boolean> updateDataRecvngStatus;
    private final Action1<Throwable> connectionResetHandler;
    private final long dataRecvTimeoutSecs;
    private final CopyOnWriteArraySet<MetricGroupId> metricsSet;
    private final int bufferSize;
    private final SinkParameters sinkParameters;
    private final boolean disablePingFiltering;
    private final AtomicBoolean isConnected = new AtomicBoolean(false);
    private final AtomicBoolean isReceivingData = new AtomicBoolean(false);
    HttpClient<ByteBuf, ServerSentEvent> client;
    private boolean compressedBinaryInputEnabled = false;
    private volatile boolean isShutdown = false;
    private final Func1<Observable<? extends Throwable>, Observable<?>> retryLogic = new Func1<Observable<? extends Throwable>, Observable<?>>(){

        public Observable<?> call(Observable<? extends Throwable> attempts) {
            if (!SseWorkerConnection.this.reconnectUponConnectionReset) {
                return Observable.empty();
            }
            return attempts.zipWith(Observable.range((int)1, (int)Integer.MAX_VALUE), (Func2)new Func2<Throwable, Integer, Integer>(){

                public Integer call(Throwable t1, Integer integer) {
                    return integer;
                }
            }).flatMap(new Func1<Integer, Observable<?>>(){

                public Observable<?> call(Integer integer) {
                    if (SseWorkerConnection.this.isShutdown) {
                        logger.info(SseWorkerConnection.this.getName() + ": Is shutdown, stopping retries");
                        return Observable.empty();
                    }
                    long delay = 2 * (integer > 10 ? 10 : integer);
                    logger.info(SseWorkerConnection.this.getName() + ": retrying conx after sleeping for " + delay + " secs");
                    return Observable.timer((long)delay, (TimeUnit)TimeUnit.SECONDS);
                }
            });
        }
    };
    private long lastDataDropValue = 0L;

    public SseWorkerConnection(String connectionType, String hostname, Integer port, Action1<Boolean> updateConxStatus, Action1<Boolean> updateDataRecvngStatus, Action1<Throwable> connectionResetHandler, long dataRecvTimeoutSecs, boolean reconnectUponConnectionReset, CopyOnWriteArraySet<MetricGroupId> metricsSet, int bufferSize, SinkParameters sinkParameters, MetricGroupId metricGroupId) {
        this(connectionType, hostname, port, updateConxStatus, updateDataRecvngStatus, connectionResetHandler, dataRecvTimeoutSecs, reconnectUponConnectionReset, metricsSet, bufferSize, sinkParameters, false, metricGroupId);
    }

    public SseWorkerConnection(String connectionType, String hostname, Integer port, Action1<Boolean> updateConxStatus, Action1<Boolean> updateDataRecvngStatus, Action1<Throwable> connectionResetHandler, long dataRecvTimeoutSecs, boolean reconnectUponConnectionReset, CopyOnWriteArraySet<MetricGroupId> metricsSet, int bufferSize, SinkParameters sinkParameters, boolean disablePingFiltering, MetricGroupId metricGroupId) {
        this.connectionType = connectionType;
        this.hostname = hostname;
        this.port = port;
        this.metricGroupId = metricGroupId;
        MetricGroupId connHealthMetricGroup = new MetricGroupId("ConnectionHealth");
        Metrics m = new Metrics.Builder().id(connHealthMetricGroup).addCounter("pingCount").build();
        this.pingCounter = m.getCounter("pingCount");
        this.updateConxStatus = updateConxStatus;
        this.updateDataRecvngStatus = updateDataRecvngStatus;
        this.connectionResetHandler = connectionResetHandler;
        this.dataRecvTimeoutSecs = dataRecvTimeoutSecs;
        this.reconnectUponConnectionReset = reconnectUponConnectionReset;
        this.metricsSet = metricsSet;
        this.bufferSize = bufferSize;
        this.sinkParameters = sinkParameters;
        if (this.sinkParameters != null) {
            this.compressedBinaryInputEnabled = this.isCompressedBinaryInputEnabled(this.sinkParameters.getSinkParams());
        }
        this.disablePingFiltering = disablePingFiltering;
    }

    private boolean isCompressedBinaryInputEnabled(List<SinkParameter> sinkParams) {
        for (SinkParameter sinkParam : sinkParams) {
            if (!"mantis.EnableCompressedBinary".equals(sinkParam.getName()) || !"true".equalsIgnoreCase(sinkParam.getValue())) continue;
            return true;
        }
        return false;
    }

    public String getName() {
        return "Sse" + this.connectionType + "Connection: " + this.hostname + ":" + this.port;
    }

    public synchronized void close() throws Exception {
        if (this.isShutdown) {
            return;
        }
        logger.info("Closing sse connection to " + this.hostname + ":" + this.port);
        this.shutdownSubject.onNext((Object)true);
        this.shutdownSubject.onCompleted();
        this.isShutdown = true;
        this.resetConnected();
    }

    public synchronized Observable<MantisServerSentEvent> call() {
        String delimiter;
        if (this.isShutdown) {
            return Observable.empty();
        }
        this.client = (HttpClient)((HttpClientBuilder)((HttpClientBuilder)RxNetty.newHttpClientBuilder((String)this.hostname, (int)this.port).pipelineConfigurator(PipelineConfigurators.clientSseConfigurator())).withNoConnectionPooling()).build();
        StringBuilder sp = new StringBuilder();
        String string = delimiter = this.sinkParameters == null ? null : (String)this.sinkParameters.getSinkParams().stream().filter(s -> s.getName().equalsIgnoreCase("mantis.CompressionDelimiter")).findFirst().map(SinkParameter::getValue).orElse(null);
        if (this.sinkParameters != null) {
            sp.append(this.sinkParameters.toString());
        }
        sp.append(sp.length() == 0 ? this.getDefaultSinkParams("?") : this.getDefaultSinkParams("&"));
        String uri = "/" + sp.toString();
        logger.info(this.getName() + ": Using uri: " + uri);
        return this.client.submit(HttpClientRequest.createGet((String)uri)).takeUntil(this.shutdownSubject).takeWhile(serverSentEventHttpClientResponse -> !this.isShutdown).filter(response -> {
            if (!response.getStatus().reasonPhrase().equals("OK")) {
                logger.warn(this.getName() + ":Trying to continue after unexpected response from sink: " + response.getStatus().reasonPhrase());
            }
            return response.getStatus().reasonPhrase().equals("OK");
        }).flatMap(response -> {
            if (!this.isConnected.getAndSet(true) && this.updateConxStatus != null) {
                this.updateConxStatus.call((Object)true);
            }
            return this.streamContent((HttpClientResponse<ServerSentEvent>)response, this.updateDataRecvngStatus, this.dataRecvTimeoutSecs, delimiter);
        }).doOnError(throwable -> {
            this.resetConnected();
            logger.warn(this.getName() + "Error on getting response from SSE server: " + throwable.getMessage());
            this.connectionResetHandler.call(throwable);
        }).retryWhen(this.retryLogic).doOnCompleted(this::resetConnected);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void resetConnected() {
        if (this.isConnected.getAndSet(false) && this.updateConxStatus != null) {
            this.updateConxStatus.call((Object)false);
        }
        if (this.isReceivingData.compareAndSet(true, false) && this.updateDataRecvngStatus != null) {
            Action1<Boolean> action1 = this.updateDataRecvngStatus;
            synchronized (action1) {
                this.updateDataRecvngStatus.call((Object)false);
            }
        }
    }

    protected Observable<MantisServerSentEvent> streamContent(HttpClientResponse<ServerSentEvent> response, Action1<Boolean> updateDataRecvngStatus, long dataRecvTimeoutSecs, String delimiter) {
        long interval = Math.max(1L, dataRecvTimeoutSecs / 2L);
        if (updateDataRecvngStatus != null) {
            Observable.interval((long)interval, (long)interval, (TimeUnit)TimeUnit.SECONDS).doOnNext(aLong -> {
                if (!this.isShutdown) {
                    if (this.hasDataDrop() || System.currentTimeMillis() > this.lastDataReceived.get() + dataRecvTimeoutSecs * 1000L) {
                        if (this.isReceivingData.compareAndSet(true, false)) {
                            Action1 action1 = updateDataRecvngStatus;
                            synchronized (action1) {
                                updateDataRecvngStatus.call((Object)false);
                            }
                        }
                    } else if (this.isConnected.get() && this.isReceivingData.compareAndSet(false, true)) {
                        Action1 action1 = updateDataRecvngStatus;
                        synchronized (action1) {
                            updateDataRecvngStatus.call((Object)true);
                        }
                    }
                }
            }).takeUntil(this.shutdownSubject).takeWhile(o -> !this.isShutdown).doOnCompleted(() -> {
                if (this.isReceivingData.compareAndSet(true, false)) {
                    Action1 action1 = updateDataRecvngStatus;
                    synchronized (action1) {
                        updateDataRecvngStatus.call((Object)false);
                    }
                }
            }).subscribe();
        }
        return response.getContent().lift((Observable.Operator)new DropOperator(this.metricGroupId)).flatMap(t1 -> {
            this.lastDataReceived.set(System.currentTimeMillis());
            if (this.isConnected.get() && this.isReceivingData.compareAndSet(false, true) && updateDataRecvngStatus != null) {
                Action1 action1 = updateDataRecvngStatus;
                synchronized (action1) {
                    updateDataRecvngStatus.call((Object)true);
                }
            }
            if (t1.hasEventType() && t1.getEventTypeAsString().startsWith("error:")) {
                return Observable.error((Throwable)new SseException(ErrorType.Retryable, "Got error SSE event: " + t1.contentAsString()));
            }
            return Observable.just((Object)t1.contentAsString());
        }, 1).filter(data -> {
            if (data.startsWith("ping")) {
                this.pingCounter.increment();
                return this.disablePingFiltering;
            }
            return true;
        }).flatMapIterable(data -> {
            boolean useSnappy = true;
            return CompressionUtils.decompressAndBase64Decode((String)data, (boolean)this.compressedBinaryInputEnabled, (boolean)useSnappy, (String)delimiter);
        }, 1).takeUntil(this.shutdownSubject).takeWhile(event -> !this.isShutdown);
    }

    private boolean hasDataDrop() {
        Collection metrics = MetricsRegistry.getInstance().getMetrics(metricNamePrefix);
        long totalDataDrop = 0L;
        if (metrics != null && !metrics.isEmpty()) {
            for (Metrics m : metrics) {
                Counter dropped = m.getCounter("" + DropOperator.Counters.dropped);
                Counter onNext = m.getCounter("" + DropOperator.Counters.onNext);
                if (dropped == null) continue;
                totalDataDrop += dropped.value();
            }
        }
        if (totalDataDrop > this.lastDataDropValue) {
            this.lastDataDropValue = totalDataDrop;
            return true;
        }
        return false;
    }

    private String getDefaultSinkParams(String prefix) {
        String groupId = System.getenv("JOB_ID");
        String slotId = System.getenv("WORKER_INDEX");
        String id = System.getenv("WORKER_NUMBER");
        if (!(groupId == null || groupId.isEmpty() || slotId == null || slotId.isEmpty() || id == null || id.isEmpty())) {
            return prefix + "groupId=" + groupId + "&slotId=" + slotId + "&id=" + id;
        }
        return "";
    }

    private static class SseException
    extends RuntimeException {
        private final ErrorType type;

        private SseException(ErrorType type, String message) {
            super((Object)((Object)type) + ": " + message);
            this.type = type;
        }

        private SseException(ErrorType type, String message, Throwable cause) {
            super((Object)((Object)type) + ": " + message, cause);
            this.type = type;
        }
    }

    private static enum ErrorType {
        Retryable,
        Unknown;

    }
}

