/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.runtime.sink;

import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Tag;
import io.mantisrx.common.compression.CompressionUtils;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.network.Endpoint;
import io.mantisrx.common.network.HashFunctions;
import io.mantisrx.common.network.ServerSlotManager;
import io.mantisrx.common.network.WritableEndpoint;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.sink.predicate.Predicate;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivx.mantis.operators.DisableBackPressureOperator;
import io.reactivx.mantis.operators.DropOperator;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import mantis.io.reactivex.netty.protocol.http.server.HttpServerRequest;
import mantis.io.reactivex.netty.protocol.http.server.HttpServerResponse;
import mantis.io.reactivex.netty.protocol.http.server.RequestHandler;
import mantis.io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;

public class ServerSentEventRequestHandler<T>
implements RequestHandler<ByteBuf, ServerSentEvent> {
    protected static final Object BINARY_FORMAT = "binary";
    private static final String TWO_NEWLINES = "\n\n";
    private static final String SSE_DATA_PREFIX = "data: ";
    private static final Logger LOG = LoggerFactory.getLogger(ServerSentEventRequestHandler.class);
    private static final String ENABLE_PINGS_PARAM = "enablePings";
    private static final String SAMPLE_PARAM = "sample";
    private static final String SAMPLE_PARAM_MSEC = "sampleMSec";
    private static final String CLIENT_ID_PARAM = "clientId";
    private static final int PING_INTERVAL = 2000;
    private static final String TEXT_FORMAT = "text";
    private static final String DEFAULT_FORMAT = "text";
    private static final String FORMAT_PARAM = "format";
    private static final byte[] EVENT_PREFIX_BYTES = "event: ".getBytes();
    private static final byte[] NEW_LINE_AS_BYTES = "\n\n".getBytes();
    private static final byte[] ID_PREFIX_AS_BYTES = "id: ".getBytes();
    private static final byte[] DATA_PREFIX_AS_BYTES = "data: ".getBytes();
    private static final String PING = "\ndata: ping\n\n";
    final ServerSlotManager ssm = new ServerSlotManager(HashFunctions.ketama());
    private Observable<T> observableToServe;
    private Func1<T, String> encoder;
    private Func1<Throwable, String> errorEncoder;
    private Predicate<T> predicate;
    private Func2<Map<String, List<String>>, Context, Void> requestPreprocessor;
    private Func2<Map<String, List<String>>, Context, Void> requestPostprocessor;
    private Context context;
    private boolean pingsEnabled = true;
    private int flushIntervalMillis = 250;
    private String format = "text";

    public ServerSentEventRequestHandler(Observable<T> observableToServe, Func1<T, String> encoder, Func1<Throwable, String> errorEncoder, Predicate<T> predicate, Func2<Map<String, List<String>>, Context, Void> requestPreprocessor, Func2<Map<String, List<String>>, Context, Void> requestPostprocessor, Context context, int batchInterval) {
        this.observableToServe = observableToServe;
        this.encoder = encoder;
        this.errorEncoder = errorEncoder;
        this.predicate = predicate;
        this.requestPreprocessor = requestPreprocessor;
        this.requestPostprocessor = requestPostprocessor;
        this.context = context;
        this.flushIntervalMillis = batchInterval;
    }

    @Override
    public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ServerSentEvent> response) {
        InetSocketAddress socketAddress = (InetSocketAddress)response.getChannel().remoteAddress();
        LOG.info("HTTP SSE connection received from " + socketAddress.getAddress() + ":" + socketAddress.getPort() + "  queryParams: " + request.getQueryParameters());
        String socketAddrStr = socketAddress.getAddress().toString();
        final WritableEndpoint sn = new WritableEndpoint(socketAddress.getHostString(), socketAddress.getPort(), Endpoint.uniqueHost(socketAddress.getHostString(), socketAddress.getPort(), null));
        final Map<String, List<String>> queryParameters = request.getQueryParameters();
        final ServerSlotManager.SlotAssignmentManager slotMgr = this.ssm.registerServer(sn, queryParameters);
        AtomicLong lastResponseFlush = new AtomicLong();
        lastResponseFlush.set(-1L);
        final AtomicLong lastResponseSent = new AtomicLong(-1L);
        Observable<Object> requestObservable = this.observableToServe;
        String decoupleSSE = "false";
        if ("true".equals(decoupleSSE)) {
            BasicTag sockAddrTag = new BasicTag("sockAddr", Optional.ofNullable(socketAddrStr).orElse("none"));
            requestObservable = requestObservable.lift(new DropOperator("outgoing_ServerSentEventRequestHandler", new Tag[]{sockAddrTag})).observeOn(Schedulers.io());
        }
        response.getHeaders().set("Access-Control-Allow-Origin", (Object)"*");
        response.getHeaders().set("content-type", (Object)"text/event-stream");
        response.getHeaders().set("Cache-Control", (Object)"no-cache, no-store, max-age=0, must-revalidate");
        response.getHeaders().set("Pragma", (Object)"no-cache");
        response.flush();
        String uniqueClientId = socketAddrStr;
        Tag[] tags = new Tag[2];
        String clientId = Optional.ofNullable(uniqueClientId).orElse("none");
        String sockAddr = Optional.ofNullable(socketAddrStr).orElse("none");
        tags[0] = new BasicTag(CLIENT_ID_PARAM, clientId);
        tags[1] = new BasicTag("sockAddr", sockAddr);
        Metrics sseSinkMetrics = new Metrics.Builder().id("ServerSentEventRequestHandler", tags).addCounter("processedCounter").addCounter("pingCounter").addCounter("errorCounter").addCounter("droppedCounter").addCounter("flushCounter").addCounter("sourceJobNameMismatchRejection").build();
        final Counter msgProcessedCounter = sseSinkMetrics.getCounter("processedCounter");
        final Counter pingCounter = sseSinkMetrics.getCounter("pingCounter");
        final Counter errorCounter = sseSinkMetrics.getCounter("errorCounter");
        final Counter droppedWrites = sseSinkMetrics.getCounter("droppedCounter");
        final Counter flushCounter = sseSinkMetrics.getCounter("flushCounter");
        Counter sourceJobNameMismatchRejectionCounter = sseSinkMetrics.getCounter("sourceJobNameMismatchRejection");
        if (queryParameters != null && queryParameters.containsKey("sourceJobName")) {
            String targetJob = queryParameters.get("sourceJobName").get(0);
            String currentJob = this.context.getWorkerInfo().getJobClusterName();
            if (!currentJob.equalsIgnoreCase(targetJob)) {
                LOG.info("Rejecting connection from {}. Client is targeting job {} but this is job {}.", uniqueClientId, targetJob, currentJob);
                sourceJobNameMismatchRejectionCounter.increment();
                response.setStatus(HttpResponseStatus.BAD_REQUEST);
                response.writeStringAndFlush("data: sourceJobName is " + targetJob + " but this is " + currentJob + "." + TWO_NEWLINES);
                return response.close();
            }
        }
        if (queryParameters != null && queryParameters.containsKey(CLIENT_ID_PARAM)) {
            uniqueClientId = queryParameters.get(CLIENT_ID_PARAM).get(0);
        }
        if (queryParameters != null && queryParameters.containsKey(FORMAT_PARAM)) {
            this.format = queryParameters.get(FORMAT_PARAM).get(0);
        }
        if (queryParameters != null && this.requestPreprocessor != null) {
            this.requestPreprocessor.call(queryParameters, this.context);
        }
        if (queryParameters != null && queryParameters.containsKey(SAMPLE_PARAM_MSEC)) {
            int samplingRate = Integer.parseInt(queryParameters.get(SAMPLE_PARAM_MSEC).get(0));
            requestObservable = requestObservable.sample(samplingRate, TimeUnit.MILLISECONDS);
        }
        if (queryParameters != null && queryParameters.containsKey(SAMPLE_PARAM)) {
            int samplingRate = Integer.parseInt(queryParameters.get(SAMPLE_PARAM).get(0));
            requestObservable = requestObservable.sample(samplingRate, TimeUnit.SECONDS);
        }
        if (queryParameters != null && queryParameters.containsKey(ENABLE_PINGS_PARAM)) {
            String enablePings = queryParameters.get(ENABLE_PINGS_PARAM).get(0);
            this.pingsEnabled = "true".equalsIgnoreCase(enablePings);
        }
        if (queryParameters != null && queryParameters.containsKey("delay")) {
            try {
                int flushInterval = Integer.parseInt(queryParameters.get("delay").get(0));
                if (flushInterval >= 50) {
                    this.flushIntervalMillis = flushInterval;
                } else {
                    LOG.warn("delay parameter too small " + flushInterval + " min. is 100");
                }
            }
            catch (Exception e2) {
                e2.printStackTrace();
            }
        }
        final byte[] delimiter = queryParameters != null && queryParameters.containsKey("mantis.CompressionDelimiter") && queryParameters.get("mantis.CompressionDelimiter").get(0) != null ? queryParameters.get("mantis.CompressionDelimiter").get(0).getBytes() : null;
        Func1 filterFunction = new Func1<T, Boolean>(){

            @Override
            public Boolean call(T t1) {
                return true;
            }
        };
        if (queryParameters != null && this.predicate != null) {
            filterFunction = this.predicate.getPredicate().call(queryParameters);
        }
        final Subscription timerSubscription = Observable.interval(1L, TimeUnit.SECONDS).doOnNext(new Action1<Long>(){

            @Override
            public void call(Long t1) {
                long currentTime = System.currentTimeMillis();
                if (ServerSentEventRequestHandler.this.pingsEnabled && (lastResponseSent.get() == -1L || currentTime > lastResponseSent.get() + 2000L)) {
                    pingCounter.increment();
                    response.writeStringAndFlush(ServerSentEventRequestHandler.PING);
                    lastResponseSent.set(currentTime);
                }
            }
        }).subscribe();
        return requestObservable.filter(filterFunction).map(this.encoder).lift(new DisableBackPressureOperator()).buffer((long)this.flushIntervalMillis, TimeUnit.MILLISECONDS).flatMap(new Func1<List<String>, Observable<Void>>(){

            @Override
            public Observable<Void> call(List<String> valueList) {
                if (response.isCloseIssued() || !response.getChannel().isActive()) {
                    LOG.info("Client closed detected, throwing closed channel exception");
                    return Observable.error(new ClosedChannelException());
                }
                List<String> filteredList = valueList.stream().filter(e2 -> slotMgr.filter(sn, e2.getBytes())).collect(Collectors.toList());
                if (response.getChannel().isWritable()) {
                    flushCounter.increment();
                    if (ServerSentEventRequestHandler.this.format.equals(BINARY_FORMAT)) {
                        boolean useSnappy = true;
                        try {
                            String compressedList = delimiter == null ? CompressionUtils.compressAndBase64Encode(filteredList, useSnappy) : CompressionUtils.compressAndBase64Encode(filteredList, useSnappy, delimiter);
                            StringBuilder sb = new StringBuilder(3);
                            sb.append(ServerSentEventRequestHandler.SSE_DATA_PREFIX);
                            sb.append(compressedList);
                            sb.append(ServerSentEventRequestHandler.TWO_NEWLINES);
                            msgProcessedCounter.increment(valueList.size());
                            lastResponseSent.set(System.currentTimeMillis());
                            return response.writeStringAndFlush(sb.toString());
                        }
                        catch (Exception e3) {
                            LOG.warn("Could not compress data" + e3.getMessage());
                            droppedWrites.increment(valueList.size());
                            return Observable.empty();
                        }
                    }
                    int noOfMsgs = 0;
                    StringBuilder sb = new StringBuilder(valueList.size() * 3);
                    for (String s : filteredList) {
                        sb.append(ServerSentEventRequestHandler.SSE_DATA_PREFIX);
                        sb.append(s);
                        sb.append(ServerSentEventRequestHandler.TWO_NEWLINES);
                        ++noOfMsgs;
                    }
                    msgProcessedCounter.increment(noOfMsgs);
                    lastResponseSent.set(System.currentTimeMillis());
                    return response.writeStringAndFlush(sb.toString());
                }
                droppedWrites.increment(filteredList.size());
                return Observable.empty();
            }
        }).onErrorResumeNext((Func1<Throwable, Observable<Void>>)new Func1<Throwable, Observable<? extends Void>>(){

            @Override
            public Observable<? extends Void> call(Throwable throwable) {
                Throwable cause = throwable.getCause();
                errorCounter.increment();
                if (cause != null && !(cause instanceof ClosedChannelException)) {
                    LOG.warn("Error detected in SSE sink", cause);
                    if (ServerSentEventRequestHandler.this.errorEncoder != null) {
                        ByteBuf errType = response.getAllocator().buffer().writeBytes("error: ".getBytes());
                        ByteBuf errRes = response.getAllocator().buffer().writeBytes(((String)ServerSentEventRequestHandler.this.errorEncoder.call(throwable)).getBytes());
                        response.writeAndFlush(ServerSentEvent.withEventType(errType, errRes));
                    }
                    throwable.printStackTrace();
                }
                if (ServerSentEventRequestHandler.this.requestPostprocessor != null && queryParameters != null) {
                    ServerSentEventRequestHandler.this.requestPostprocessor.call(queryParameters, ServerSentEventRequestHandler.this.context);
                }
                ServerSentEventRequestHandler.this.ssm.deregisterServer(sn, queryParameters);
                timerSubscription.unsubscribe();
                return Observable.error(throwable);
            }
        });
    }
}

