/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.zmq.tcp;

import com.gs.collections.api.block.procedure.Procedure2;
import com.gs.collections.api.map.MutableMap;
import com.gs.collections.impl.block.procedure.checked.CheckedProcedure2;
import com.gs.collections.impl.map.mutable.SynchronizedMutableMap;
import com.gs.collections.impl.map.mutable.UnifiedMap;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.core.support.UUIDUtils;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.io.net.Reconnect;
import reactor.io.net.config.ClientSocketOptions;
import reactor.io.net.config.SslOptions;
import reactor.io.net.impl.zmq.ZeroMQChannelStream;
import reactor.io.net.impl.zmq.ZeroMQClientSocketOptions;
import reactor.io.net.impl.zmq.ZeroMQWorker;
import reactor.io.net.impl.zmq.tcp.ZeroMQ;
import reactor.io.net.tcp.TcpClient;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Stream;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.broadcast.SerializedBroadcaster;

public class ZeroMQTcpClient<IN, OUT>
extends TcpClient<IN, OUT> {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final MutableMap<ZeroMQWorker, Future<?>> workers = SynchronizedMutableMap.of((Map)UnifiedMap.newMap());
    private final int ioThreadCount = (Integer)this.getEnvironment().getProperty("reactor.zmq.ioThreadCount", Integer.class, (Object)1);
    private final ZeroMQClientSocketOptions zmqOpts;
    private final ExecutorService threadPool;

    public ZeroMQTcpClient(@Nonnull Environment env, @Nonnull Dispatcher eventsDispatcher, @Nonnull InetSocketAddress connectAddress, @Nullable ClientSocketOptions options, @Nullable SslOptions sslOptions, @Nullable Codec<Buffer, IN, OUT> codec) {
        super(env, eventsDispatcher, connectAddress, options, sslOptions, codec);
        this.zmqOpts = options instanceof ZeroMQClientSocketOptions ? (ZeroMQClientSocketOptions)options : null;
        this.threadPool = Executors.newCachedThreadPool((ThreadFactory)new NamedDaemonThreadFactory("zmq-client"));
    }

    @Override
    public Promise<Boolean> open() {
        Promise d = this.map(new Function<ChannelStream<IN, OUT>, Boolean>(){

            public Boolean apply(ChannelStream<IN, OUT> channelStream) {
                return true;
            }
        }).next();
        this.doOpen();
        return d;
    }

    @Override
    public Stream<Boolean> open(Reconnect reconnect) {
        throw new IllegalStateException("Reconnects are handled transparently by the ZeroMQ network library");
    }

    @Override
    public Promise<Boolean> close() {
        if (this.workers.isEmpty()) {
            throw new IllegalStateException("This ZeroMQ server has not been started");
        }
        Promise promise = Promises.ready((Environment)this.getEnvironment(), (Dispatcher)this.getDispatcher());
        this.workers.forEachKeyValue((Procedure2)new CheckedProcedure2<ZeroMQWorker, Future<?>>(){

            public void safeValue(ZeroMQWorker w, Future<?> f) throws Exception {
                w.shutdown();
                if (!f.isDone()) {
                    f.cancel(true);
                }
            }
        });
        this.threadPool.shutdownNow();
        this.notifyShutdown();
        promise.onNext((Object)true);
        return promise;
    }

    @Override
    protected ZeroMQChannelStream<IN, OUT> bindChannel(Object ioChannel, long prefetch) {
        return new ZeroMQChannelStream(this.getEnvironment(), prefetch == -1L ? this.getPrefetchSize() : prefetch, this, this.getDispatcher(), this.getDispatcher(), this.getDefaultCodec());
    }

    private void doOpen() {
        final UUID id = UUIDUtils.random();
        int socketType = null != this.zmqOpts ? this.zmqOpts.socketType() : 5;
        ZContext zmq = null != this.zmqOpts ? this.zmqOpts.context() : null;
        final Broadcaster broadcaster = SerializedBroadcaster.create((Environment)this.getEnvironment());
        ZeroMQWorker worker = new ZeroMQWorker(id, socketType, this.ioThreadCount, zmq, broadcaster){

            @Override
            protected void configure(ZMQ.Socket socket) {
                socket.setReceiveBufferSize((long)ZeroMQTcpClient.this.getOptions().rcvbuf());
                socket.setSendBufferSize((long)ZeroMQTcpClient.this.getOptions().sndbuf());
                if (ZeroMQTcpClient.this.getOptions().keepAlive()) {
                    socket.setTCPKeepAlive(1);
                }
                if (null != ZeroMQTcpClient.this.zmqOpts && null != ZeroMQTcpClient.this.zmqOpts.socketConfigurer()) {
                    ZeroMQTcpClient.this.zmqOpts.socketConfigurer().accept((Object)socket);
                }
            }

            @Override
            protected void start(ZMQ.Socket socket) {
                try {
                    String addr = ZeroMQTcpClient.this.createConnectAddress();
                    if (ZeroMQTcpClient.this.log.isInfoEnabled()) {
                        String type = ZeroMQ.findSocketTypeName(socket.getType());
                        ZeroMQTcpClient.this.log.info("CONNECT: connecting ZeroMQ {} socket to {}", (Object)type, (Object)addr);
                    }
                    socket.connect(addr);
                    final ZeroMQChannelStream netChannel = ((ZeroMQChannelStream)ZeroMQTcpClient.this.bindChannel(null, null != ZeroMQTcpClient.this.zmqOpts ? ZeroMQTcpClient.this.zmqOpts.prefetch() : -1L)).setConnectionId(id.toString()).setSocket(socket);
                    netChannel.registerOnPeer();
                    broadcaster.consume((Consumer)new Consumer<ZMsg>(){

                        public void accept(ZMsg msg) {
                            ZFrame content;
                            while (null != (content = msg.pop())) {
                                if (netChannel.getDecoder() != null) {
                                    netChannel.getDecoder().apply((Object)Buffer.wrap((byte[])content.getData()));
                                    continue;
                                }
                                netChannel.in().onNext((Object)Buffer.wrap((byte[])content.getData()));
                            }
                            msg.destroy();
                        }
                    }, ZeroMQTcpClient.this.createErrorConsumer(netChannel), (Consumer)new Consumer<Void>(){

                        public void accept(Void aVoid) {
                            netChannel.close();
                        }
                    });
                }
                catch (Exception e) {
                    ZeroMQTcpClient.this.notifyError(e);
                }
            }
        };
        this.workers.put((Object)worker, this.threadPool.submit(worker));
    }

    private String createConnectAddress() {
        String addrs = null != this.zmqOpts && null != this.zmqOpts.connectAddresses() ? this.zmqOpts.connectAddresses() : "tcp://" + this.getConnectAddress().getHostString() + ":" + this.getConnectAddress().getPort();
        return addrs;
    }
}

