/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.EmptyByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.io.buffer.Buffer;
import reactor.io.net.Spec;
import reactor.io.net.impl.netty.NettyChannelStream;
import reactor.rx.subscription.PushSubscription;

public class NettyNetChannelInboundHandler<IN>
extends ChannelInboundHandlerAdapter {
    private final Logger log = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private final Subscriber<? super IN> subscriber;
    protected final NettyChannelStream<IN, ?> channelStream;
    private volatile ByteBuf remainder;
    private volatile PushSubscription<IN> channelSubscription;

    public NettyNetChannelInboundHandler(Subscriber<? super IN> subscriber, NettyChannelStream<IN, ?> channelStream) {
        this.subscriber = subscriber;
        this.channelStream = channelStream;
    }

    public PushSubscription<IN> subscription() {
        return this.channelSubscription;
    }

    public NettyChannelStream<IN, ?> channelStream() {
        return this.channelStream;
    }

    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
        try {
            if (this.channelSubscription != null) {
                super.channelActive(ctx);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("RESUME: " + ctx.channel());
                }
                return;
            }
            this.channelSubscription = new PushSubscription<IN>(null, this.subscriber){

                protected void onRequest(long n) {
                    if (n == Long.MAX_VALUE) {
                        ctx.channel().config().setAutoRead(true);
                    }
                    ctx.read();
                }

                public void cancel() {
                    super.cancel();
                    if (ctx.channel().isOpen()) {
                        ctx.close();
                    }
                }
            };
            this.channelStream.registerOnPeer();
            this.subscriber.onSubscribe(this.channelSubscription);
            super.channelActive(ctx);
        }
        catch (Throwable err) {
            this.subscriber.onError(err);
        }
    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        if (this.channelSubscription.isComplete()) {
            return;
        }
        try {
            super.channelReadComplete(ctx);
            if (this.channelSubscription.pendingRequestSignals() != Long.MAX_VALUE && this.channelSubscription.pendingRequestSignals() > 1L) {
                ctx.read();
            }
        }
        catch (Throwable throwable) {
            this.channelSubscription.onError(throwable);
        }
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        try {
            this.channelSubscription.onComplete();
            super.channelInactive(ctx);
        }
        catch (Throwable err) {
            this.channelSubscription.onError(err);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            if (this.channelSubscription.isComplete() || msg.getClass() == EmptyByteBuf.class) {
                return;
            }
            if (this.channelStream.getDecoder() == Spec.NOOP_DECODER || !ByteBuf.class.isAssignableFrom(msg.getClass())) {
                this.channelSubscription.onNext(msg);
                return;
            }
            if (this.channelStream.getDecoder() == null) {
                this.channelSubscription.onNext((Object)new Buffer(((ByteBuf)msg).nioBuffer()));
                return;
            }
            ByteBuf data = (ByteBuf)msg;
            if (this.remainder == null) {
                try {
                    this.passToConnection(data);
                }
                finally {
                    if (data.isReadable()) {
                        this.remainder = data;
                    } else {
                        data.release();
                    }
                }
                return;
            }
            if (!this.bufferHasSufficientCapacity(this.remainder, data)) {
                ByteBuf combined = this.createCombinedBuffer(this.remainder, data, ctx);
                this.remainder.release();
                this.remainder = combined;
            } else {
                this.remainder.writeBytes(data);
            }
            data.release();
            try {
                this.passToConnection(this.remainder);
            }
            finally {
                if (this.remainder.isReadable()) {
                    this.remainder.discardSomeReadBytes();
                } else {
                    this.remainder.release();
                    this.remainder = null;
                }
            }
        }
        catch (Throwable t) {
            this.channelSubscription.onError(t);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (("Broken pipe".equals(cause.getMessage()) || "Connection reset by peer".equals(cause.getMessage())) && this.log.isDebugEnabled()) {
            this.log.debug(ctx.channel().toString() + " " + cause.getMessage());
        }
        this.channelSubscription.onError(cause);
    }

    private boolean bufferHasSufficientCapacity(ByteBuf receiver, ByteBuf provider) {
        return receiver.writerIndex() <= receiver.maxCapacity() - provider.readableBytes();
    }

    private ByteBuf createCombinedBuffer(ByteBuf partOne, ByteBuf partTwo, ChannelHandlerContext ctx) {
        ByteBuf combined = ctx.alloc().buffer(partOne.readableBytes() + partTwo.readableBytes());
        combined.writeBytes(partOne);
        combined.writeBytes(partTwo);
        return combined;
    }

    private void passToConnection(ByteBuf data) {
        Object read;
        Buffer b = new Buffer(data.nioBuffer());
        int start = b.position();
        if (null != this.channelStream.getDecoder() && null != b.byteBuffer() && (read = this.channelStream.getDecoder().apply((Object)b)) != null) {
            this.channelSubscription.onNext(read);
        }
        data.skipBytes(b.position() - start);
    }
}

