/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.reactivestreams.Subscriber;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.fn.Consumer;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.Channel;
import reactor.io.net.ChannelStream;
import reactor.io.net.PeerStream;
import reactor.io.net.impl.netty.NettyNetChannelInboundHandler;
import reactor.rx.subscription.PushSubscription;

public class NettyChannelStream<IN, OUT>
extends ChannelStream<IN, OUT> {
    private final Channel ioChannel;

    public NettyChannelStream(@Nonnull Environment env, @Nullable Codec<Buffer, IN, OUT> codec, long prefetch, @Nonnull PeerStream<IN, OUT, ChannelStream<IN, OUT>> peer, @Nonnull Dispatcher ioDispatcher, @Nonnull Dispatcher eventsDispatcher, @Nonnull Channel ioChannel) {
        super(env, codec, prefetch, peer, ioDispatcher, eventsDispatcher);
        this.ioChannel = ioChannel;
    }

    @Override
    public InetSocketAddress remoteAddress() {
        return (InetSocketAddress)this.ioChannel.remoteAddress();
    }

    @Override
    public Channel.ConsumerSpec on() {
        return new NettyConsumerSpec();
    }

    public Channel delegate() {
        return this.ioChannel;
    }

    @Override
    protected void doDecoded(IN in) {
        PushSubscription subscription;
        NettyNetChannelInboundHandler ch = (NettyNetChannelInboundHandler)this.ioChannel.pipeline().get(NettyNetChannelInboundHandler.class);
        PushSubscription pushSubscription = subscription = ch == null ? null : ch.subscription();
        if (subscription != null) {
            subscription.onNext(in);
        } else {
            super.doDecoded(in);
        }
    }

    @Override
    public void write(ByteBuffer data, Subscriber<?> onComplete, boolean flush) {
        ByteBuf buf = this.ioChannel.alloc().buffer(data.remaining());
        buf.writeBytes(data);
        this.write(buf, onComplete, flush);
    }

    @Override
    public void write(Object data, final Subscriber<?> onComplete, boolean flush) {
        ChannelFuture writeFuture = flush ? this.ioChannel.writeAndFlush(data) : this.ioChannel.write(data);
        writeFuture.addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                boolean success = future.isSuccess();
                if (!success) {
                    Throwable t = future.cause();
                    if (null != onComplete) {
                        onComplete.onError(t);
                    }
                    NettyChannelStream.this.cascadeErrorToPeer(t);
                }
                if (null != onComplete) {
                    onComplete.onComplete();
                }
            }
        });
    }

    @Override
    public void flush() {
        if (this.ioChannel.isActive()) {
            this.ioChannel.flush();
        }
    }

    public String toString() {
        return "NettyNetChannel{channel=" + this.ioChannel + '}';
    }

    private class NettyConsumerSpec
    implements Channel.ConsumerSpec {
        private NettyConsumerSpec() {
        }

        @Override
        public Channel.ConsumerSpec close(final Consumer<Void> onClose) {
            NettyChannelStream.this.ioChannel.pipeline().addLast(new ChannelHandler[]{new ChannelDuplexHandler(){

                public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                    onClose.accept(null);
                    super.channelInactive(ctx);
                }
            }});
            return this;
        }

        @Override
        public Channel.ConsumerSpec readIdle(long idleTimeout, final Consumer<Void> onReadIdle) {
            NettyChannelStream.this.ioChannel.pipeline().addFirst(new ChannelHandler[]{new IdleStateHandler(idleTimeout, 0L, 0L, TimeUnit.MILLISECONDS){

                protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
                    if (evt.state() == IdleState.READER_IDLE) {
                        onReadIdle.accept(null);
                    }
                    super.channelIdle(ctx, evt);
                }
            }});
            return this;
        }

        @Override
        public Channel.ConsumerSpec writeIdle(long idleTimeout, final Consumer<Void> onWriteIdle) {
            NettyChannelStream.this.ioChannel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(0L, idleTimeout, 0L, TimeUnit.MILLISECONDS){

                protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
                    if (evt.state() == IdleState.WRITER_IDLE) {
                        onWriteIdle.accept(null);
                    }
                    super.channelIdle(ctx, evt);
                }
            }});
            return this;
        }
    }
}

