/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.mantis.network.push;

import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Tag;
import io.mantisrx.common.compression.CompressionUtils;
import io.mantisrx.common.messages.MantisMetaDroppedMessage;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.shaded.io.netty.channel.Channel;
import io.mantisrx.shaded.io.netty.util.concurrent.GenericFutureListener;
import io.reactivex.mantis.network.push.AsyncConnection;
import io.reactivex.mantis.network.push.ConnectionManager;
import io.reactivex.mantis.network.push.MonitoredQueue;
import io.reactivex.mantis.network.push.MonitoredThreadPool;
import io.reactivex.mantis.network.push.NamedThreadFactory;
import io.reactivex.mantis.network.push.PushTrigger;
import io.reactivex.mantis.network.push.ServerConfig;
import io.reactivex.mantis.network.push.SingleThreadedChunker;
import io.reactivex.mantis.network.push.TimedChunker;
import io.reactivx.mantis.operators.DisableBackPressureOperator;
import io.reactivx.mantis.operators.DropOperator;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import mantis.io.reactivex.netty.channel.DefaultChannelWriter;
import mantis.io.reactivex.netty.server.RxServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;

public abstract class PushServer<T, R> {
    private static final Logger logger = LoggerFactory.getLogger(PushServer.class);
    final byte[] prefix = "data: ".getBytes();
    final byte[] nwnw = "\n\n".getBytes();
    protected int port;
    protected MonitoredQueue<T> outboundBuffer;
    protected ConnectionManager<T> connectionManager;
    private int writeRetryCount;
    private RxServer<?, ?> server;
    private Counter processedWrites;
    private Counter successfulWrites;
    private Counter failedWrites;
    private Gauge batchWriteSize;
    private Set<Future<Void>> consumerThreadFutures = new HashSet<Future<Void>>();
    private Observable<String> serverSignals;
    private String serverName;

    public PushServer(final PushTrigger<T> trigger, ServerConfig<T> config, Observable<String> serverSignals) {
        this.serverSignals = serverSignals;
        this.serverName = config.getName();
        MetricsRegistry metricsRegistry = config.getMetricsRegistry();
        this.outboundBuffer = new MonitoredQueue(this.serverName, config.getBufferCapacity(), config.useSpscQueue());
        trigger.setBuffer(this.outboundBuffer);
        Action0 doOnFirstConnection = new Action0(){

            @Override
            public void call() {
                trigger.start();
            }
        };
        Action0 doOnZeroConnections = new Action0(){

            @Override
            public void call() {
                logger.info("doOnZeroConnections Triggered");
                trigger.stop();
            }
        };
        String serverNameValue = Optional.ofNullable(this.serverName).orElse("none");
        BasicTag idTag = new BasicTag("groupId", serverNameValue);
        MetricGroupId metricsGroup = new MetricGroupId("PushServer", new Tag[]{idTag});
        this.connectionManager = new ConnectionManager(metricsRegistry, doOnFirstConnection, doOnZeroConnections);
        int numQueueProcessingThreads = config.getNumQueueConsumers();
        MonitoredThreadPool consumerThreads = new MonitoredThreadPool("QueueConsumerPool", new ThreadPoolExecutor(numQueueProcessingThreads, numQueueProcessingThreads, 5L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(numQueueProcessingThreads), new NamedThreadFactory("QueueConsumerPool")));
        if (config.useSpscQueue()) {
            this.consumerThreadFutures.add(consumerThreads.submit(new SingleThreadedChunker<T>(config.getChunkProcessor(), this.outboundBuffer, config.getMaxChunkSize(), config.getMaxChunkTimeMSec(), this.connectionManager)));
        } else {
            for (int i = 0; i < numQueueProcessingThreads; ++i) {
                this.consumerThreadFutures.add(consumerThreads.submit(new TimedChunker<T>(this.outboundBuffer, config.getMaxChunkSize(), config.getMaxChunkTimeMSec(), config.getChunkProcessor(), this.connectionManager)));
            }
        }
        Metrics serverMetrics = new Metrics.Builder().id(metricsGroup).addCounter("numProcessedWrites").addCounter("numSuccessfulWrites").addCounter("numFailedWrites").addGauge(this.connectionManager.getActiveConnections(metricsGroup)).addGauge("batchWriteSize").build();
        this.successfulWrites = serverMetrics.getCounter("numSuccessfulWrites");
        this.failedWrites = serverMetrics.getCounter("numFailedWrites");
        this.batchWriteSize = serverMetrics.getGauge("batchWriteSize");
        this.processedWrites = serverMetrics.getCounter("numProcessedWrites");
        this.registerMetrics(metricsRegistry, serverMetrics, consumerThreads.getMetrics(), this.outboundBuffer.getMetrics(), trigger.getMetrics(), config.getChunkProcessor().router.getMetrics());
        this.port = config.getPort();
        this.writeRetryCount = config.getWriteRetryCount();
    }

    private void registerMetrics(MetricsRegistry registry, Metrics serverMetrics, Metrics consumerPoolMetrics, Metrics queueMetrics, Metrics pushTriggerMetrics, Metrics routerMetrics) {
        registry.registerAndGet(serverMetrics);
        registry.registerAndGet(consumerPoolMetrics);
        registry.registerAndGet(queueMetrics);
        registry.registerAndGet(pushTriggerMetrics);
        registry.registerAndGet(routerMetrics);
    }

    protected Observable<Void> manageConnection(DefaultChannelWriter<R> writer2, String host, int port, String groupId, String slotId, String id, AtomicLong lastWriteTime, boolean applicationHeartbeats, Subscription heartbeatSubscription, boolean applySampling, long samplingRateMSec, Func1<T, Boolean> predicate, Action0 connectionClosedCallback, Counter legacyMsgProcessedCounter, Counter legacyDroppedWrites, Action0 connectionSubscribeCallback) {
        return this.manageConnection(writer2, host, port, groupId, slotId, id, lastWriteTime, applicationHeartbeats, heartbeatSubscription, applySampling, samplingRateMSec, null, null, predicate, connectionClosedCallback, legacyMsgProcessedCounter, legacyDroppedWrites, connectionSubscribeCallback);
    }

    protected Observable<Void> manageConnection(DefaultChannelWriter<R> writer2, String host, int port, String groupId, String slotId, String id, AtomicLong lastWriteTime, boolean applicationHeartbeats, Subscription heartbeatSubscription, boolean applySampling, long samplingRateMSec, SerializedSubject<String, String> metaMsgSubject, Subscription metaMsgSubscription, Func1<T, Boolean> predicate, Action0 connectionClosedCallback, Counter legacyMsgProcessedCounter, Counter legacyDroppedWrites, Action0 connectionSubscribeCallback) {
        return this.manageConnectionWithCompression(writer2, host, port, groupId, slotId, id, lastWriteTime, applicationHeartbeats, heartbeatSubscription, applySampling, samplingRateMSec, null, null, predicate, connectionClosedCallback, legacyMsgProcessedCounter, legacyDroppedWrites, connectionSubscribeCallback, false, false, null);
    }

    protected Observable<Void> manageConnectionWithCompression(DefaultChannelWriter<R> writer2, String host, int port, String groupId, String slotId, String id, AtomicLong lastWriteTime, boolean applicationHeartbeats, final Subscription heartbeatSubscription, boolean applySampling, long samplingRateMSec, SerializedSubject<String, String> metaMsgSubject, final Subscription metaMsgSubscription, Func1<T, Boolean> predicate, final Action0 connectionClosedCallback, Counter legacyMsgProcessedCounter, Counter legacyDroppedWrites, Action0 connectionSubscribeCallback, boolean compressOutput, boolean isSSE, byte[] delimiter) {
        if (id == null || id.isEmpty()) {
            id = host + "_" + port + "_" + System.currentTimeMillis();
        }
        if (slotId == null || slotId.isEmpty()) {
            slotId = id;
        }
        if (groupId == null || groupId.isEmpty()) {
            groupId = id;
        }
        BasicTag slotIdTag = new BasicTag("slotId", slotId);
        SerializedSubject subject = new SerializedSubject(PublishSubject.create());
        Observable<Object> observable = subject.lift(new DropOperator("batch_writes", new Tag[]{slotIdTag}));
        if (applySampling) {
            observable = observable.sample(samplingRateMSec, TimeUnit.MILLISECONDS).map(list -> {
                LinkedList singleItem = new LinkedList();
                if (!list.isEmpty()) {
                    singleItem.add(list.get(list.size() - 1));
                }
                return singleItem;
            });
        }
        final AsyncConnection<T> connection = new AsyncConnection<T>(host, port, id, slotId, groupId, subject, predicate);
        Channel channel = writer2.getChannel();
        channel.closeFuture().addListener((GenericFutureListener<? extends io.mantisrx.shaded.io.netty.util.concurrent.Future<? super Void>>)new GenericFutureListener<io.mantisrx.shaded.io.netty.util.concurrent.Future<Void>>(){

            @Override
            public void operationComplete(io.mantisrx.shaded.io.netty.util.concurrent.Future<Void> future2) throws Exception {
                PushServer.this.connectionManager.remove(connection);
                PushServer.this.connectionCleanup(heartbeatSubscription, connectionClosedCallback, metaMsgSubscription);
            }
        });
        return observable.doOnSubscribe(() -> {
            this.connectionManager.add(connection);
            if (connectionSubscribeCallback != null) {
                connectionSubscribeCallback.call();
            }
        }).lift(new DisableBackPressureOperator()).buffer(200L, TimeUnit.MILLISECONDS).flatMap(bufferOfBuffers -> {
            if (bufferOfBuffers != null && !bufferOfBuffers.isEmpty()) {
                ByteBuffer blockBuffer = null;
                int size = 0;
                for (List buffer : bufferOfBuffers) {
                    size += buffer.size();
                }
                int batchSize = size;
                this.processedWrites.increment(batchSize);
                if (channel.isActive() && channel.isWritable()) {
                    if (isSSE) {
                        if (compressOutput) {
                            boolean useSnappy = true;
                            byte[] compressedData = delimiter == null ? CompressionUtils.compressAndBase64EncodeBytes(bufferOfBuffers, useSnappy) : CompressionUtils.compressAndBase64EncodeBytes(bufferOfBuffers, useSnappy, delimiter);
                            blockBuffer = ByteBuffer.allocate(this.prefix.length + compressedData.length + this.nwnw.length);
                            blockBuffer.put(this.prefix);
                            blockBuffer.put(compressedData);
                            blockBuffer.put(this.nwnw);
                        } else {
                            Object data2;
                            int totalBytes = 0;
                            for (Object buffer : bufferOfBuffers) {
                                Iterator iterator = buffer.iterator();
                                while (iterator.hasNext()) {
                                    data2 = (byte[])iterator.next();
                                    totalBytes += ((byte[])data2).length + this.prefix.length + this.nwnw.length;
                                }
                            }
                            byte[] block = new byte[totalBytes];
                            blockBuffer = ByteBuffer.wrap(block);
                            for (List buffer : bufferOfBuffers) {
                                data2 = buffer.iterator();
                                while (data2.hasNext()) {
                                    byte[] data3 = (byte[])data2.next();
                                    blockBuffer.put(this.prefix);
                                    blockBuffer.put(data3);
                                    blockBuffer.put(this.nwnw);
                                }
                            }
                        }
                    } else {
                        int totalBytes = 0;
                        for (List buffer : bufferOfBuffers) {
                            for (byte[] data4 : buffer) {
                                totalBytes += data4.length;
                            }
                        }
                        Object block = new byte[totalBytes];
                        blockBuffer = ByteBuffer.wrap((byte[])block);
                        for (Object buffer : bufferOfBuffers) {
                            Iterator iterator = buffer.iterator();
                            while (iterator.hasNext()) {
                                byte[] data5 = (byte[])iterator.next();
                                blockBuffer.put(data5);
                            }
                        }
                    }
                    return writer2.writeBytesAndFlush(blockBuffer.array()).retry(this.writeRetryCount).doOnError(t1 -> this.failedToWriteBatch(connection, batchSize, legacyDroppedWrites, metaMsgSubject)).doOnCompleted(() -> {
                        if (applicationHeartbeats && lastWriteTime != null) {
                            lastWriteTime.set(System.currentTimeMillis());
                        }
                        if (legacyMsgProcessedCounter != null) {
                            legacyMsgProcessedCounter.increment(batchSize);
                        }
                        this.successfulWrites.increment(batchSize);
                        this.connectionManager.successfulWrites(connection, batchSize);
                    }).doOnTerminate(() -> this.batchWriteSize.set(batchSize));
                }
                this.failedToWriteBatch(connection, batchSize, legacyDroppedWrites, metaMsgSubject);
            }
            return Observable.empty();
        });
    }

    protected void failedToWriteBatch(AsyncConnection<T> connection, int batchSize, Counter legacyDroppedWrites, SerializedSubject<String, String> metaMsgSubject) {
        if (legacyDroppedWrites != null) {
            legacyDroppedWrites.increment(batchSize);
        }
        if (metaMsgSubject != null) {
            MantisMetaDroppedMessage msg = new MantisMetaDroppedMessage(batchSize, System.currentTimeMillis());
            metaMsgSubject.onNext(msg.toString());
        }
        this.failedWrites.increment(batchSize);
        this.connectionManager.failedWrites(connection, batchSize);
    }

    protected void connectionCleanup(Subscription heartbeatSubscription, Action0 connectionClosedCallback, Subscription metaMsgSubscription) {
        if (heartbeatSubscription != null) {
            logger.info("Unsubscribing from heartbeats");
            heartbeatSubscription.unsubscribe();
        }
        if (metaMsgSubscription != null) {
            logger.info("Unsubscribing from metaMsg subject");
            metaMsgSubscription.unsubscribe();
        }
        if (connectionClosedCallback != null) {
            connectionClosedCallback.call();
        }
    }

    public abstract RxServer<?, ?> createServer();

    public void start() {
        this.server = this.createServer();
        this.server.start();
        this.serverSignals.subscribe(message -> logger.info("Signal received for server: " + this.serverName + " signal: " + message), t -> logger.info("Signal received for server: " + this.serverName + " signal: SERVER_ERROR", (Throwable)t), () -> logger.info("Signal received for server: " + this.serverName + " signal: SERVER_COMPLETED"));
    }

    public void blockUntilShutdown() {
        this.serverSignals.toBlocking().forEach(message -> {});
    }

    public void startAndBlock() {
        this.server = this.createServer();
        this.server.start();
        try {
            this.serverSignals.toBlocking().forEach(message -> logger.info("Signal received for server: " + this.serverName + " signal: " + message));
        }
        catch (Throwable t) {
            logger.info("Signal received for server: " + this.serverName + " signal: SERVER_ERROR", t);
            throw t;
        }
        logger.info("Signal received for server: " + this.serverName + " signal: SERVER_COMPLETED");
    }

    public void shutdown() {
        for (Future<Void> thread2 : this.consumerThreadFutures) {
            thread2.cancel(true);
        }
        try {
            this.server.shutdown();
        }
        catch (InterruptedException e2) {
            throw new RuntimeException(e2);
        }
    }
}

