/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.zmq.tcp;

import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.Assert;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.core.support.UUIDUtils;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.config.ServerSocketOptions;
import reactor.io.net.config.SslOptions;
import reactor.io.net.impl.zmq.ZeroMQChannelStream;
import reactor.io.net.impl.zmq.ZeroMQServerSocketOptions;
import reactor.io.net.impl.zmq.ZeroMQWorker;
import reactor.io.net.impl.zmq.tcp.ZeroMQ;
import reactor.io.net.tcp.TcpServer;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Stream;
import reactor.rx.action.support.DefaultSubscriber;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.broadcast.SerializedBroadcaster;
import reactor.rx.stream.GroupedStream;

public class ZeroMQTcpServer<IN, OUT>
extends TcpServer<IN, OUT> {
    private static final Logger log = LoggerFactory.getLogger(ZeroMQTcpServer.class);
    private final int ioThreadCount = this.getDefaultEnvironment().getIntProperty("reactor.zmq.ioThreadCount", 1);
    private final ZeroMQServerSocketOptions zmqOpts;
    private final ExecutorService threadPool;
    private volatile ZeroMQWorker worker;
    private volatile Future<?> workerFuture;

    public ZeroMQTcpServer(Environment env, Dispatcher eventsDispatcher, InetSocketAddress listenAddress, ServerSocketOptions options, SslOptions sslOptions, Codec<Buffer, IN, OUT> codec) {
        super(env, eventsDispatcher, listenAddress, options, sslOptions, codec);
        this.zmqOpts = options instanceof ZeroMQServerSocketOptions ? (ZeroMQServerSocketOptions)options : null;
        this.threadPool = Executors.newCachedThreadPool((ThreadFactory)new NamedDaemonThreadFactory("zmq-server"));
    }

    @Override
    protected Promise<Void> doStart(final ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> handler) {
        Assert.isNull((Object)this.worker, (String)"This ZeroMQ server has already been started");
        final Promise promise = Promises.ready((Environment)this.getDefaultEnvironment(), (Dispatcher)this.getDefaultDispatcher());
        final UUID id = UUIDUtils.random();
        final int socketType = null != this.zmqOpts ? this.zmqOpts.socketType() : 6;
        ZContext zmq = null != this.zmqOpts ? this.zmqOpts.context() : null;
        Broadcaster broadcaster = SerializedBroadcaster.create((Environment)this.getDefaultEnvironment());
        final Stream grouped = broadcaster.groupBy((Function)new Function<ZMsg, String>(){

            public String apply(ZMsg msg) {
                String connId;
                switch (socketType) {
                    case 6: {
                        connId = msg.popString();
                        break;
                    }
                    default: {
                        connId = id.toString();
                    }
                }
                return connId;
            }
        });
        this.worker = new ZeroMQWorker(id, socketType, this.ioThreadCount, zmq, broadcaster){

            @Override
            protected void configure(ZMQ.Socket socket) {
                socket.setReceiveBufferSize((long)ZeroMQTcpServer.this.getOptions().rcvbuf());
                socket.setSendBufferSize((long)ZeroMQTcpServer.this.getOptions().sndbuf());
                socket.setBacklog((long)ZeroMQTcpServer.this.getOptions().backlog());
                if (ZeroMQTcpServer.this.getOptions().keepAlive()) {
                    socket.setTCPKeepAlive(1);
                }
                if (null != ZeroMQTcpServer.this.zmqOpts && null != ZeroMQTcpServer.this.zmqOpts.socketConfigurer()) {
                    ZeroMQTcpServer.this.zmqOpts.socketConfigurer().accept((Object)socket);
                }
            }

            @Override
            protected void start(final ZMQ.Socket socket) {
                try {
                    String addr = null != ZeroMQTcpServer.this.zmqOpts && null != ZeroMQTcpServer.this.zmqOpts.listenAddresses() ? ZeroMQTcpServer.this.zmqOpts.listenAddresses() : "tcp://" + ZeroMQTcpServer.this.getListenAddress().getHostString() + ":" + ZeroMQTcpServer.this.getListenAddress().getPort();
                    if (log.isInfoEnabled()) {
                        String type = ZeroMQ.findSocketTypeName(socket.getType());
                        log.info("BIND: starting ZeroMQ {} socket on {}", (Object)type, (Object)addr);
                    }
                    socket.bind(addr);
                    grouped.consume((Consumer)new Consumer<GroupedStream<String, ZMsg>>(){

                        public void accept(final GroupedStream<String, ZMsg> stringZMsgGroupedStream) {
                            final ZeroMQChannelStream netChannel = ZeroMQTcpServer.this.bindChannel().setConnectionId((String)stringZMsgGroupedStream.key()).setSocket(socket);
                            ((Publisher)handler.apply(netChannel)).subscribe((Subscriber)new DefaultSubscriber<Void>(){

                                public void onSubscribe(Subscription s) {
                                    s.request(Long.MAX_VALUE);
                                }

                                public void onComplete() {
                                    log.debug("Closing handler " + (String)stringZMsgGroupedStream.key());
                                    netChannel.close();
                                }

                                public void onError(Throwable t) {
                                    log.error("Error during registration " + (String)stringZMsgGroupedStream.key(), t);
                                    netChannel.close();
                                }
                            });
                            stringZMsgGroupedStream.consume((Consumer)new Consumer<ZMsg>(){

                                public void accept(ZMsg msg) {
                                    ZFrame content;
                                    while (null != (content = msg.pop())) {
                                        if (netChannel.getDecoder() != null) {
                                            netChannel.getDecoder().apply((Object)Buffer.wrap((byte[])content.getData()));
                                            continue;
                                        }
                                        netChannel.doDecoded(Buffer.wrap((byte[])content.getData()));
                                    }
                                    msg.destroy();
                                }
                            }, null, (Consumer)new Consumer<Void>(){

                                public void accept(Void aVoid) {
                                    netChannel.close();
                                }
                            });
                        }
                    });
                    promise.onComplete();
                }
                catch (Exception e) {
                    promise.onError((Throwable)e);
                }
            }
        };
        this.workerFuture = this.threadPool.submit(this.worker);
        return promise;
    }

    protected ZeroMQChannelStream<IN, OUT> bindChannel() {
        return new ZeroMQChannelStream(this.getDefaultEnvironment(), this.getDefaultPrefetchSize(), this.getDefaultDispatcher(), null, this.getDefaultCodec());
    }

    @Override
    protected Promise<Void> doShutdown() {
        if (null == this.worker) {
            return Promises.error((Throwable)new IllegalStateException("This ZeroMQ server has not been started"));
        }
        this.worker.shutdown();
        if (!this.workerFuture.isDone()) {
            this.workerFuture.cancel(true);
        }
        this.threadPool.shutdownNow();
        return Promises.success();
    }
}

