/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.client;

import com.mantisrx.common.utils.NettyUtils;
import io.mantisrx.client.SinkConnection;
import io.mantisrx.client.SinkConnectionFunc;
import io.mantisrx.common.MantisServerSentEvent;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.runtime.parameter.SinkParameters;
import io.mantisrx.server.core.ServiceRegistry;
import io.mantisrx.server.worker.client.SseWorkerConnection;
import io.reactivx.mantis.operators.DropOperator;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Action1;

public class SseSinkConnectionFunction
implements SinkConnectionFunc<MantisServerSentEvent> {
    private static final String DEFAULT_BUFFER_SIZE_STR = "0";
    private static final Logger logger = LoggerFactory.getLogger(SseSinkConnectionFunction.class);
    private static final CopyOnWriteArraySet<MetricGroupId> metricsSet = new CopyOnWriteArraySet();
    private static final Action1<Throwable> defaultConxResetHandler = new Action1<Throwable>(){

        public void call(Throwable throwable) {
            logger.warn("Retrying reset connection");
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException ie) {
                logger.debug("Interrupted waiting for retrying connection");
            }
        }
    };
    private final boolean reconnectUponConnectionRest;
    private final Action1<Throwable> connectionResetHandler;
    private final SinkParameters sinkParameters;
    private final int bufferSize;

    public SseSinkConnectionFunction(boolean reconnectUponConnectionRest, Action1<Throwable> connectionResetHandler) {
        this(reconnectUponConnectionRest, connectionResetHandler, null);
    }

    public SseSinkConnectionFunction(boolean reconnectUponConnectionRest, Action1<Throwable> connectionResetHandler, SinkParameters sinkParameters) {
        this.reconnectUponConnectionRest = reconnectUponConnectionRest;
        this.connectionResetHandler = connectionResetHandler == null ? defaultConxResetHandler : connectionResetHandler;
        this.sinkParameters = sinkParameters;
        String bufferSizeStr = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantisClient.buffer.size", DEFAULT_BUFFER_SIZE_STR);
        this.bufferSize = Integer.parseInt(Optional.ofNullable(bufferSizeStr).orElse(DEFAULT_BUFFER_SIZE_STR));
    }

    public SinkConnection<MantisServerSentEvent> call(String hostname, Integer port) {
        return this.call(hostname, port, null, null, 5L);
    }

    @Override
    public SinkConnection<MantisServerSentEvent> call(final String hostname, final Integer port, final Action1<Boolean> updateConxStatus, final Action1<Boolean> updateDataRecvngStatus, final long dataRecvTimeoutSecs, final boolean disablePingFiltering) {
        return new SinkConnection<MantisServerSentEvent>(){
            private final SseWorkerConnection workerConn;
            {
                this.workerConn = new SseWorkerConnection("Sink", hostname, port, updateConxStatus, updateDataRecvngStatus, SseSinkConnectionFunction.this.connectionResetHandler, dataRecvTimeoutSecs, SseSinkConnectionFunction.this.reconnectUponConnectionRest, metricsSet, SseSinkConnectionFunction.this.bufferSize, SseSinkConnectionFunction.this.sinkParameters, disablePingFiltering);
            }

            @Override
            public String getName() {
                return this.workerConn.getName();
            }

            @Override
            public void close() throws Exception {
                this.workerConn.close();
            }

            public Observable<MantisServerSentEvent> call() {
                return this.workerConn.call();
            }
        };
    }

    static {
        NettyUtils.setNettyThreads();
        logger.info("SETTING UP METRICS PRINTER THREAD");
        new ScheduledThreadPoolExecutor(1).scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                try {
                    HashSet metricGroups = new HashSet(metricsSet);
                    if (!metricGroups.isEmpty()) {
                        for (MetricGroupId metricGroup : metricGroups) {
                            Metrics metric = MetricsRegistry.getInstance().getMetric(metricGroup);
                            if (metric == null) continue;
                            Counter onNext = metric.getCounter("" + DropOperator.Counters.onNext);
                            Counter onError = metric.getCounter("" + DropOperator.Counters.onError);
                            Counter onComplete = metric.getCounter("" + DropOperator.Counters.onComplete);
                            Counter dropped = metric.getCounter("" + DropOperator.Counters.dropped);
                            logger.info(metricGroup.id() + ": onNext=" + onNext.value() + ", onError=" + onError.value() + ", onComplete=" + onComplete.value() + ", dropped=" + dropped.value());
                        }
                    }
                }
                catch (Exception e) {
                    logger.warn("Unexpected error in metrics printer thread: " + e.getMessage(), (Throwable)e);
                }
            }
        }, 60L, 60L, TimeUnit.SECONDS);
    }
}

