/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.zmq.tcp;

import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.core.support.UUIDUtils;
import reactor.fn.Consumer;
import reactor.fn.Supplier;
import reactor.fn.tuple.Tuple2;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.Reconnect;
import reactor.io.net.config.ClientSocketOptions;
import reactor.io.net.config.SslOptions;
import reactor.io.net.impl.zmq.ZeroMQChannelStream;
import reactor.io.net.impl.zmq.ZeroMQClientSocketOptions;
import reactor.io.net.impl.zmq.ZeroMQWorker;
import reactor.io.net.impl.zmq.tcp.ZeroMQ;
import reactor.io.net.tcp.TcpClient;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Stream;
import reactor.rx.action.support.DefaultSubscriber;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.broadcast.SerializedBroadcaster;

public class ZeroMQTcpClient<IN, OUT>
extends TcpClient<IN, OUT> {
    private static final Logger log = LoggerFactory.getLogger(ZeroMQTcpClient.class);
    private final int ioThreadCount = this.getDefaultEnvironment().getIntProperty("reactor.zmq.ioThreadCount", 1);
    private final ZeroMQClientSocketOptions zmqOpts;
    private final ExecutorService threadPool;

    public ZeroMQTcpClient(@Nonnull Environment env, @Nonnull Dispatcher eventsDispatcher, @Nonnull Supplier<InetSocketAddress> connectAddress, @Nullable ClientSocketOptions options, @Nullable SslOptions sslOptions, @Nullable Codec<Buffer, IN, OUT> codec) {
        super(env, eventsDispatcher, connectAddress, options, sslOptions, codec);
        this.zmqOpts = options instanceof ZeroMQClientSocketOptions ? (ZeroMQClientSocketOptions)options : null;
        this.threadPool = Executors.newCachedThreadPool((ThreadFactory)new NamedDaemonThreadFactory("zmq-client"));
    }

    @Override
    protected Stream<Tuple2<InetSocketAddress, Integer>> doStart(ReactorChannelHandler handler, Reconnect reconnect) {
        throw new IllegalStateException("Reconnects are handled transparently by the ZeroMQ network library");
    }

    @Override
    protected Promise<Void> doShutdown() {
        Promise promise = Promises.prepare();
        this.threadPool.shutdownNow();
        promise.onComplete();
        return promise;
    }

    protected ZeroMQChannelStream<IN, OUT> bindChannel() {
        return new ZeroMQChannelStream(this.getDefaultEnvironment(), this.getDefaultPrefetchSize(), this.getDefaultDispatcher(), this.getConnectAddress(), this.getDefaultCodec());
    }

    @Override
    protected Promise<Void> doStart(final ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> handler) {
        final UUID id = UUIDUtils.random();
        final Promise p = Promises.prepare();
        int socketType = null != this.zmqOpts ? this.zmqOpts.socketType() : 5;
        ZContext zmq = null != this.zmqOpts ? this.zmqOpts.context() : null;
        final Broadcaster broadcaster = SerializedBroadcaster.create((Environment)this.getDefaultEnvironment());
        ZeroMQWorker worker = new ZeroMQWorker(id, socketType, this.ioThreadCount, zmq, broadcaster){

            @Override
            protected void configure(ZMQ.Socket socket) {
                socket.setReceiveBufferSize((long)ZeroMQTcpClient.this.getOptions().rcvbuf());
                socket.setSendBufferSize((long)ZeroMQTcpClient.this.getOptions().sndbuf());
                if (ZeroMQTcpClient.this.getOptions().keepAlive()) {
                    socket.setTCPKeepAlive(1);
                }
                if (null != ZeroMQTcpClient.this.zmqOpts && null != ZeroMQTcpClient.this.zmqOpts.socketConfigurer()) {
                    ZeroMQTcpClient.this.zmqOpts.socketConfigurer().accept((Object)socket);
                }
            }

            @Override
            protected void start(ZMQ.Socket socket) {
                try {
                    String addr = ZeroMQTcpClient.this.createConnectAddress();
                    if (log.isInfoEnabled()) {
                        String type = ZeroMQ.findSocketTypeName(socket.getType());
                        log.info("CONNECT: connecting ZeroMQ {} socket to {}", (Object)type, (Object)addr);
                    }
                    socket.connect(addr);
                    final ZeroMQChannelStream netChannel = ZeroMQTcpClient.this.bindChannel().setConnectionId(id.toString()).setSocket(socket);
                    ((Publisher)handler.apply(netChannel)).subscribe((Subscriber)new DefaultSubscriber<Void>(){

                        public void onSubscribe(Subscription s) {
                            s.request(Long.MAX_VALUE);
                        }

                        public void onComplete() {
                            log.debug("Closing Client Worker " + id);
                            netChannel.close();
                        }

                        public void onError(Throwable t) {
                            log.error("Error during registration", t);
                            netChannel.close();
                        }
                    });
                    broadcaster.consume((Consumer)new Consumer<ZMsg>(){

                        public void accept(ZMsg msg) {
                            ZFrame content;
                            while (null != (content = msg.pop())) {
                                if (netChannel.getDecoder() != null) {
                                    netChannel.getDecoder().apply((Object)Buffer.wrap((byte[])content.getData()));
                                    continue;
                                }
                                netChannel.doDecoded(Buffer.wrap((byte[])content.getData()));
                            }
                            msg.destroy();
                        }
                    });
                    p.onComplete();
                }
                catch (Exception e) {
                    p.onError((Throwable)e);
                }
            }
        };
        this.threadPool.submit(worker);
        return p;
    }

    private String createConnectAddress() {
        String addrs = null != this.zmqOpts && null != this.zmqOpts.connectAddresses() ? this.zmqOpts.connectAddresses() : "tcp://" + this.getConnectAddress().getHostString() + ":" + this.getConnectAddress().getPort();
        return addrs;
    }
}

