/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.zmq.tcp;

import com.gs.collections.api.block.function.Function0;
import com.gs.collections.api.block.predicate.Predicate;
import com.gs.collections.api.list.MutableList;
import com.gs.collections.impl.block.function.checked.CheckedFunction0;
import com.gs.collections.impl.block.predicate.checked.CheckedPredicate;
import com.gs.collections.impl.list.mutable.FastList;
import com.gs.collections.impl.list.mutable.SynchronizedMutableList;
import com.gs.collections.impl.map.mutable.SynchronizedMutableMap;
import com.gs.collections.impl.map.mutable.UnifiedMap;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.Assert;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.codec.StandardCodecs;
import reactor.io.net.ChannelStream;
import reactor.io.net.Client;
import reactor.io.net.NetStreams;
import reactor.io.net.Server;
import reactor.io.net.Spec;
import reactor.io.net.impl.zmq.ZeroMQClientSocketOptions;
import reactor.io.net.impl.zmq.ZeroMQServerSocketOptions;
import reactor.io.net.impl.zmq.tcp.ZeroMQTcpClient;
import reactor.io.net.impl.zmq.tcp.ZeroMQTcpServer;
import reactor.io.net.tcp.TcpClient;
import reactor.io.net.tcp.TcpServer;
import reactor.rx.Promise;

public class ZeroMQ<T> {
    private static final SynchronizedMutableMap<Integer, String> SOCKET_TYPES = SynchronizedMutableMap.of((Map)UnifiedMap.newMap());
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final MutableList<TcpClient> clients = SynchronizedMutableList.of((List)FastList.newList());
    private final MutableList<TcpServer> servers = SynchronizedMutableList.of((List)FastList.newList());
    private final Environment env;
    private final Dispatcher dispatcher;
    private final ZContext zmqCtx;
    private volatile Codec<Buffer, T, T> codec = StandardCodecs.PASS_THROUGH_CODEC;
    private volatile boolean shutdown = false;

    public ZeroMQ(Environment env) {
        this(env, env.getDefaultDispatcher());
    }

    public ZeroMQ(Environment env, String dispatcher) {
        this(env, env.getDispatcher(dispatcher));
    }

    public ZeroMQ(Environment env, Dispatcher dispatcher) {
        this.env = env;
        this.dispatcher = dispatcher;
        this.zmqCtx = new ZContext();
        this.zmqCtx.setLinger(100);
    }

    public static String findSocketTypeName(final int socketType) {
        return (String)SOCKET_TYPES.getIfAbsentPut((Object)socketType, (Function0)new CheckedFunction0<String>(){

            public String safeValue() throws Exception {
                for (Field f : ZMQ.class.getDeclaredFields()) {
                    if (!Integer.TYPE.isAssignableFrom(f.getType())) continue;
                    f.setAccessible(true);
                    try {
                        int val = f.getInt(null);
                        if (socketType != val) continue;
                        return f.getName();
                    }
                    catch (IllegalAccessException e) {
                        // empty catch block
                    }
                }
                return "";
            }
        });
    }

    public ZeroMQ<T> codec(Codec<Buffer, T, T> codec) {
        this.codec = codec;
        return this;
    }

    public Promise<ChannelStream<T, T>> dealer(String addrs) {
        return this.createClient(addrs, 5);
    }

    public Promise<ChannelStream<T, T>> push(String addrs) {
        return this.createClient(addrs, 8);
    }

    public Promise<ChannelStream<T, T>> pull(String addrs) {
        return this.createServer(addrs, 7);
    }

    public Promise<ChannelStream<T, T>> request(String addrs) {
        return this.createClient(addrs, 3);
    }

    public Promise<ChannelStream<T, T>> reply(String addrs) {
        return this.createServer(addrs, 4);
    }

    public Promise<ChannelStream<T, T>> router(String addrs) {
        return this.createServer(addrs, 6);
    }

    public Promise<ChannelStream<T, T>> createClient(final String addrs, final int socketType) {
        Assert.isTrue((!this.shutdown ? 1 : 0) != 0, (String)"This ZeroMQ instance has been shut down");
        TcpClient client = NetStreams.tcpClient(ZeroMQTcpClient.class, new Function<Spec.TcpClient<T, T>, Spec.TcpClient<T, T>>(){

            public Spec.TcpClient<T, T> apply(Spec.TcpClient<T, T> spec) {
                return ((Spec.TcpClient)((Spec.TcpClient)spec.env(ZeroMQ.this.env)).dispatcher(ZeroMQ.this.dispatcher)).codec(ZeroMQ.this.codec).options(new ZeroMQClientSocketOptions().context(ZeroMQ.this.zmqCtx).connectAddresses(addrs).socketType(socketType));
            }
        });
        this.clients.add(client);
        Promise promise = client.next();
        client.open();
        return promise;
    }

    public Promise<ChannelStream<T, T>> createServer(final String addrs, final int socketType) {
        Assert.isTrue((!this.shutdown ? 1 : 0) != 0, (String)"This ZeroMQ instance has been shut down");
        TcpServer server = NetStreams.tcpServer(ZeroMQTcpServer.class, new Function<Spec.TcpServer<T, T>, Spec.TcpServer<T, T>>(){

            public Spec.TcpServer<T, T> apply(Spec.TcpServer<T, T> spec) {
                return (Spec.TcpServer)((Object)((Spec.TcpServer)((Object)((Spec.TcpServer)((Spec.TcpServer)spec.env(ZeroMQ.this.env)).dispatcher(ZeroMQ.this.dispatcher)).codec(ZeroMQ.this.codec))).options(new ZeroMQServerSocketOptions().context(ZeroMQ.this.zmqCtx).listenAddresses(addrs).socketType(socketType)));
            }
        });
        Promise d = server.next();
        this.servers.add(server);
        server.start();
        return d;
    }

    public void shutdown() {
        if (this.shutdown) {
            return;
        }
        this.shutdown = true;
        this.servers.removeIf((Predicate)new CheckedPredicate<Server>(){

            public boolean safeAccept(Server server) throws Exception {
                Promise<Boolean> promise = server.shutdown();
                promise.await(60L, TimeUnit.SECONDS);
                Assert.isTrue((boolean)promise.isSuccess(), (String)("Server " + server + " not properly shut down"));
                return true;
            }
        });
        this.clients.removeIf((Predicate)new CheckedPredicate<Client>(){

            public boolean safeAccept(Client client) throws Exception {
                Promise<Boolean> promise = client.close();
                promise.await(60L, TimeUnit.SECONDS);
                Assert.isTrue((boolean)promise.isSuccess(), (String)("Client " + client + " not properly shut down"));
                return true;
            }
        });
    }
}

