/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.zmq;

import com.gs.collections.api.block.predicate.Predicate;
import com.gs.collections.api.list.MutableList;
import com.gs.collections.impl.block.predicate.checked.CheckedPredicate;
import com.gs.collections.impl.list.mutable.FastList;
import com.gs.collections.impl.list.mutable.SynchronizedMutableList;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.reactivestreams.Subscriber;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.fn.Consumer;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.Channel;
import reactor.io.net.ChannelStream;
import reactor.io.net.PeerStream;

public class ZeroMQChannelStream<IN, OUT>
extends ChannelStream<IN, OUT> {
    private final ZeroMQConsumerSpec eventSpec = new ZeroMQConsumerSpec();
    private final MutableList<Consumer<Void>> closeHandlers = SynchronizedMutableList.of((List)FastList.newList());
    private volatile String connectionId;
    private volatile ZMQ.Socket socket;
    private ZMsg currentMsg;

    public ZeroMQChannelStream(@Nonnull Environment env, long prefetch, PeerStream<IN, OUT, ChannelStream<IN, OUT>> peer, @Nonnull Dispatcher eventsDispatcher, @Nonnull Dispatcher ioDispatcher, @Nullable Codec<Buffer, IN, OUT> codec) {
        super(env, codec, prefetch, peer, ioDispatcher, eventsDispatcher);
    }

    public ZeroMQChannelStream<IN, OUT> setConnectionId(String connectionId) {
        this.connectionId = connectionId;
        return this;
    }

    public ZeroMQChannelStream<IN, OUT> setSocket(ZMQ.Socket socket) {
        this.socket = socket;
        return this;
    }

    @Override
    public InetSocketAddress remoteAddress() {
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void write(ByteBuffer data, Subscriber<?> onComplete, boolean flush) {
        boolean isNewMsg;
        ZMsg msg;
        byte[] bytes = new byte[data.remaining()];
        data.get(bytes);
        ZeroMQChannelStream zeroMQChannelStream = this;
        synchronized (zeroMQChannelStream) {
            msg = this.currentMsg;
            this.currentMsg = new ZMsg();
            if (msg == null) {
                msg = this.currentMsg;
                isNewMsg = true;
            } else {
                isNewMsg = false;
            }
        }
        if (isNewMsg) {
            switch (this.socket.getType()) {
                case 6: {
                    msg.add(new ZFrame(this.connectionId));
                    break;
                }
            }
        }
        msg.add(new ZFrame(bytes));
        if (flush) {
            this.doFlush(onComplete);
        }
    }

    @Override
    protected void write(Buffer data, Subscriber<?> onComplete, boolean flush) {
        this.write(data.byteBuffer(), onComplete, flush);
    }

    @Override
    protected void write(Object data, Subscriber<?> onComplete, boolean flush) {
        Buffer buff = (Buffer)this.getEncoder().apply(data);
        this.write(buff.byteBuffer(), onComplete, flush);
    }

    @Override
    protected synchronized void flush() {
        this.doFlush(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doFlush(Subscriber<?> onComplete) {
        ZMsg msg;
        ZeroMQChannelStream zeroMQChannelStream = this;
        synchronized (zeroMQChannelStream) {
            msg = this.currentMsg;
            this.currentMsg = null;
        }
        if (null != msg) {
            boolean success = msg.send(this.socket);
            if (null != onComplete) {
                if (success) {
                    onComplete.onComplete();
                } else {
                    onComplete.onError((Throwable)new RuntimeException("ZeroMQ Message could not be sent"));
                }
            }
        }
    }

    public void close() {
        this.getDispatcher().dispatch(null, (Consumer)new Consumer<Void>(){

            public void accept(Void v) {
                ZeroMQChannelStream.this.closeHandlers.removeIf((Predicate)new CheckedPredicate<Consumer<Void>>(){

                    public boolean safeAccept(Consumer<Void> r) throws Exception {
                        r.accept(null);
                        return true;
                    }
                });
            }
        }, null);
    }

    @Override
    public Channel.ConsumerSpec on() {
        return this.eventSpec;
    }

    public ZMQ.Socket delegate() {
        return this.socket;
    }

    public String toString() {
        return "ZeroMQNetChannel{closeHandlers=" + this.closeHandlers + ", connectionId='" + this.connectionId + '\'' + ", socket=" + this.socket + '}';
    }

    private class ZeroMQConsumerSpec
    implements Channel.ConsumerSpec {
        private ZeroMQConsumerSpec() {
        }

        @Override
        public Channel.ConsumerSpec close(Consumer<Void> onClose) {
            ZeroMQChannelStream.this.closeHandlers.add(onClose);
            return this;
        }

        @Override
        public Channel.ConsumerSpec readIdle(long idleTimeout, Consumer<Void> onReadIdle) {
            return this;
        }

        @Override
        public Channel.ConsumerSpec writeIdle(long idleTimeout, Consumer<Void> onWriteIdle) {
            return this;
        }
    }
}

