/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.GenericFutureListener;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.Environment;
import reactor.core.processor.CancelException;
import reactor.core.support.Exceptions;
import reactor.fn.Consumer;
import reactor.io.buffer.Buffer;
import reactor.io.net.ChannelStream;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.Spec;
import reactor.io.net.impl.netty.NettyChannelStream;
import reactor.rx.Stream;
import reactor.rx.action.support.DefaultSubscriber;
import reactor.rx.subscription.PushSubscription;

public class NettyChannelHandlerBridge<IN, OUT>
extends ChannelDuplexHandler {
    protected static final Logger log = LoggerFactory.getLogger(NettyChannelHandlerBridge.class);
    protected final ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> handler;
    private final NettyChannelStream<IN, OUT> channelStream;
    protected PushSubscription<IN> channelSubscription;
    private ByteBuf remainder;

    public NettyChannelHandlerBridge(ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> handler, NettyChannelStream<IN, OUT> channelStream) {
        this.handler = handler;
        this.channelStream = channelStream;
    }

    public PushSubscription<IN> subscription() {
        return this.channelSubscription;
    }

    public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt != null && evt.getClass().equals(ChannelInputSubscriberEvent.class)) {
            if (null == this.channelSubscription) {
                ChannelInputSubscriberEvent subscriberEvent = (ChannelInputSubscriberEvent)evt;
                this.channelSubscription = new PushSubscription<IN>(null, subscriberEvent.inputSubscriber){

                    protected void onRequest(long n) {
                        if (n == Long.MAX_VALUE) {
                            ctx.channel().config().setAutoRead(true);
                        }
                        ctx.read();
                    }
                };
                subscriberEvent.inputSubscriber.onSubscribe(this.channelSubscription);
            } else {
                this.channelSubscription.onError((Throwable)new IllegalStateException("Only one connection input subscriber allowed."));
            }
        }
        super.userEventTriggered(ctx, evt);
    }

    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        ((Publisher)this.handler.apply(this.channelStream)).subscribe((Subscriber)new DefaultSubscriber<Void>(){

            public void onSubscribe(Subscription s) {
                s.request(Long.MAX_VALUE);
            }

            public void onError(Throwable t) {
                log.error("Error processing connection. Closing the channel.", t);
                if (NettyChannelHandlerBridge.this.channelSubscription == null) {
                    ctx.channel().close();
                }
            }

            public void onComplete() {
                if (NettyChannelHandlerBridge.this.channelSubscription == null) {
                    ctx.channel().close();
                }
            }
        });
    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        block5: {
            if (this.channelSubscription == null) {
                return;
            }
            try {
                super.channelReadComplete(ctx);
                if (this.channelSubscription.pendingRequestSignals() != Long.MAX_VALUE && this.channelSubscription.pendingRequestSignals() > 1L) {
                    ctx.read();
                }
            }
            catch (Throwable throwable) {
                if (this.channelSubscription != null) {
                    this.channelSubscription.onError(throwable);
                }
                if (!Environment.alive()) break block5;
                Environment.get().routeError(throwable);
            }
        }
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        block4: {
            try {
                if (this.channelSubscription != null) {
                    this.channelSubscription.onComplete();
                    this.channelSubscription = null;
                }
                super.channelInactive(ctx);
            }
            catch (Throwable err) {
                if (this.channelSubscription != null) {
                    this.channelSubscription.onError(err);
                }
                if (!Environment.alive()) break block4;
                Environment.get().routeError(err);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        block24: {
            try {
                if (null == this.channelSubscription || msg == Unpooled.EMPTY_BUFFER) {
                    ReferenceCountUtil.release((Object)msg);
                    return;
                }
                if (this.channelStream.getDecoder() == Spec.NOOP_DECODER || !ByteBuf.class.isAssignableFrom(msg.getClass())) {
                    this.channelSubscription.onNext(msg);
                    return;
                }
                if (this.channelStream.getDecoder() == null) {
                    try {
                        this.channelSubscription.onNext((Object)new Buffer(((ByteBuf)msg).nioBuffer()));
                    }
                    finally {
                        ReferenceCountUtil.release((Object)msg);
                    }
                    return;
                }
                ByteBuf data = (ByteBuf)msg;
                if (this.remainder == null) {
                    try {
                        this.passToConnection(data);
                    }
                    finally {
                        if (data.isReadable()) {
                            this.remainder = data;
                        } else {
                            data.release();
                        }
                    }
                    return;
                }
                if (!NettyChannelHandlerBridge.bufferHasSufficientCapacity(this.remainder, data)) {
                    ByteBuf combined = NettyChannelHandlerBridge.createCombinedBuffer(this.remainder, data, ctx);
                    this.remainder.release();
                    this.remainder = combined;
                } else {
                    this.remainder.writeBytes(data);
                }
                data.release();
                try {
                    this.passToConnection(this.remainder);
                }
                finally {
                    if (this.remainder.isReadable()) {
                        this.remainder.discardSomeReadBytes();
                    } else {
                        this.remainder.release();
                        this.remainder = null;
                    }
                }
            }
            catch (Throwable t) {
                if (this.channelSubscription != null) {
                    this.channelSubscription.onError(t);
                }
                if (!Environment.alive()) break block24;
                Environment.get().routeError(t);
            }
        }
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof Stream) {
            Stream data = (Stream)msg;
            if (data.getCapacity() == Long.MAX_VALUE) {
                data.subscribe((Subscriber)new FlushOnTerminateSubscriber(ctx, promise));
            } else {
                data.subscribe((Subscriber)new FlushOnCapacitySubscriber(ctx, promise, data.getCapacity()));
            }
        } else {
            super.write(ctx, msg, promise);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (this.channelSubscription != null) {
            this.channelSubscription.onError(cause);
        } else if (Environment.alive()) {
            Environment.get().routeError(cause);
        } else {
            log.error("Unexpected issue", cause);
        }
    }

    protected ChannelFuture doOnWrite(Object data, ChannelHandlerContext ctx) {
        if (data.getClass().equals(Buffer.class)) {
            return ctx.channel().write((Object)NettyChannelHandlerBridge.convertBufferToByteBuff(ctx, (Buffer)data));
        }
        if (Unpooled.EMPTY_BUFFER != data) {
            return ctx.channel().write(data);
        }
        return null;
    }

    protected static ByteBuf convertBufferToByteBuff(ChannelHandlerContext ctx, Buffer data) {
        ByteBuf buff = ctx.alloc().buffer(data.remaining());
        return buff.writeBytes(data.byteBuffer());
    }

    protected void doOnTerminate(ChannelHandlerContext ctx, ChannelFuture last, final ChannelPromise promise) {
        if (ctx.channel().isOpen()) {
            ChannelFutureListener listener = new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        promise.trySuccess();
                    } else {
                        promise.tryFailure(future.cause());
                    }
                }
            };
            if (last != null) {
                ctx.flush();
                last.addListener((GenericFutureListener)listener);
            } else {
                ctx.writeAndFlush((Object)Unpooled.EMPTY_BUFFER).addListener((GenericFutureListener)listener);
            }
        } else {
            promise.trySuccess();
        }
    }

    private static boolean bufferHasSufficientCapacity(ByteBuf receiver, ByteBuf provider) {
        return receiver.writerIndex() <= receiver.maxCapacity() - provider.readableBytes();
    }

    private static ByteBuf createCombinedBuffer(ByteBuf partOne, ByteBuf partTwo, ChannelHandlerContext ctx) {
        ByteBuf combined = ctx.alloc().buffer(partOne.readableBytes() + partTwo.readableBytes());
        combined.writeBytes(partOne);
        combined.writeBytes(partTwo);
        return combined;
    }

    private void passToConnection(ByteBuf data) {
        Object read;
        Buffer b = new Buffer(data.nioBuffer());
        int start = b.position();
        if (null != this.channelStream.getDecoder() && null != b.byteBuffer() && (read = this.channelStream.getDecoder().apply((Object)b)) != null) {
            this.channelSubscription.onNext(read);
        }
        data.skipBytes(b.position() - start);
    }

    protected void doOnSubscribe(ChannelHandlerContext ctx, final Subscription s, long request, final Consumer<Void> cb) {
        ctx.channel().closeFuture().addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (log.isDebugEnabled()) {
                    log.debug("Cancel connection");
                }
                s.cancel();
                cb.accept(null);
            }
        });
        s.request(request);
    }

    private class FlushOnCapacitySubscriber
    extends DefaultSubscriber<Object>
    implements Runnable,
    Consumer<Void> {
        private final ChannelHandlerContext ctx;
        private final ChannelPromise promise;
        private final long capacity;
        private Subscription subscription;
        private long written = 0L;
        private final ChannelFutureListener writeListener = new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess() && future.cause() != null) {
                    log.error("error during write");
                    FlushOnCapacitySubscriber.this.promise.tryFailure(future.cause());
                    return;
                }
                if ((FlushOnCapacitySubscriber.this.capacity == 1L || --FlushOnCapacitySubscriber.this.written == 0L) && FlushOnCapacitySubscriber.this.subscription != null) {
                    FlushOnCapacitySubscriber.this.subscription.request(FlushOnCapacitySubscriber.this.capacity);
                }
            }
        };

        public FlushOnCapacitySubscriber(ChannelHandlerContext ctx, ChannelPromise promise, long capacity) {
            this.ctx = ctx;
            this.promise = promise;
            this.capacity = capacity;
        }

        public void onSubscribe(Subscription s) {
            this.subscription = s;
            NettyChannelHandlerBridge.this.doOnSubscribe(this.ctx, s, this.capacity, this);
        }

        public void onNext(Object w) {
            if (this.subscription == null) {
                throw CancelException.INSTANCE;
            }
            try {
                NettyChannelHandlerBridge.this.doOnWrite(w, this.ctx).addListener((GenericFutureListener)this.writeListener);
                if (this.capacity == 1L) {
                    this.ctx.flush();
                } else {
                    this.ctx.channel().eventLoop().execute((Runnable)this);
                }
            }
            catch (Throwable t) {
                this.onError(Exceptions.addValueAsLastCause((Throwable)t, (Object)w));
            }
        }

        public void onError(Throwable t) {
            log.error("Write error", t);
            this.subscription = null;
            NettyChannelHandlerBridge.this.doOnTerminate(this.ctx, null, this.promise);
        }

        public void onComplete() {
            this.subscription = null;
            if (log.isDebugEnabled()) {
                log.debug("Flush Connection");
            }
            NettyChannelHandlerBridge.this.doOnTerminate(this.ctx, null, this.promise);
        }

        @Override
        public void run() {
            if (++this.written == this.capacity) {
                this.ctx.flush();
            }
        }

        public void accept(Void aVoid) {
            this.subscription = null;
        }
    }

    private class FlushOnTerminateSubscriber
    extends DefaultSubscriber<Object>
    implements Consumer<Void> {
        private final ChannelHandlerContext ctx;
        private final ChannelPromise promise;
        ChannelFuture lastWrite;
        Subscription subscription;

        public FlushOnTerminateSubscriber(ChannelHandlerContext ctx, ChannelPromise promise) {
            this.ctx = ctx;
            this.promise = promise;
        }

        public void accept(Void aVoid) {
            this.subscription = null;
        }

        public void onSubscribe(Subscription s) {
            this.subscription = s;
            NettyChannelHandlerBridge.this.doOnSubscribe(this.ctx, s, Long.MAX_VALUE, this);
        }

        public void onNext(final Object w) {
            if (this.subscription == null) {
                throw CancelException.INSTANCE;
            }
            try {
                ChannelFuture cf;
                this.lastWrite = cf = NettyChannelHandlerBridge.this.doOnWrite(w, this.ctx);
                if (cf != null && log.isDebugEnabled()) {
                    cf.addListener((GenericFutureListener)new ChannelFutureListener(){

                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                log.error("write error :" + w, future.cause());
                            }
                        }
                    });
                }
            }
            catch (Throwable t) {
                this.onError(Exceptions.addValueAsLastCause((Throwable)t, (Object)w));
            }
        }

        public void onError(Throwable t) {
            this.subscription = null;
            log.error("Write error", t);
            NettyChannelHandlerBridge.this.doOnTerminate(this.ctx, this.lastWrite, this.promise);
        }

        public void onComplete() {
            if (this.subscription == null) {
                throw new IllegalStateException("already flushed");
            }
            this.subscription = null;
            NettyChannelHandlerBridge.this.doOnTerminate(this.ctx, this.lastWrite, this.promise);
        }
    }

    public static final class ChannelInputSubscriberEvent<IN> {
        private final Subscriber<IN> inputSubscriber;

        public ChannelInputSubscriberEvent(Subscriber<IN> inputSubscriber) {
            if (null == inputSubscriber) {
                throw new IllegalArgumentException("Connection input subscriber must not be null.");
            }
            this.inputSubscriber = inputSubscriber;
        }
    }
}

