/*
 * Decompiled with CFR 0.152.
 */
package redradishes;

import com.google.common.base.Throwables;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.xnio.IoUtils;
import org.xnio.Pool;
import org.xnio.Pooled;
import org.xnio.StreamConnection;
import org.xnio.channels.Channels;
import org.xnio.channels.StreamSinkChannel;
import org.xnio.channels.SuspendableWriteChannel;
import org.xnio.conduits.ConduitStreamSourceChannel;
import redradishes.ByteBufferBundle;
import redradishes.ByteBufferSink;
import redradishes.encoder.ByteSink;

class RedisClientConnection {
    private final BlockingQueue<ReplyDecoder> decoderQueue = new LinkedBlockingQueue<ReplyDecoder>();
    private final StreamSinkChannel sinkChannel;
    private ReplyDecoder currentDecoder;

    RedisClientConnection(StreamConnection connection, Pool<ByteBuffer> bufferPool, Charset charset, BlockingQueue<CommandEncoderDecoder> commandsQueue) {
        CharsetDecoder charsetDecoder = charset.newDecoder();
        ConduitStreamSourceChannel sourceChannel = connection.getSourceChannel();
        this.sinkChannel = connection.getSinkChannel();
        sourceChannel.getReadSetter().set(inChannel -> {
            try (Pooled pooledByteBuffer = bufferPool.allocate();){
                ByteBuffer readBuffer = (ByteBuffer)pooledByteBuffer.getResource();
                while (inChannel.read(readBuffer) > 0) {
                    readBuffer.flip();
                    try {
                        while (readBuffer.hasRemaining()) {
                            if (!this.decoder().parse(readBuffer, charsetDecoder)) continue;
                            this.currentDecoder = null;
                        }
                    }
                    finally {
                        readBuffer.clear();
                    }
                }
            }
            catch (Throwable e) {
                IoUtils.safeClose((Closeable)this.sinkChannel);
                IoUtils.safeClose((Closeable)inChannel);
                this.failUnfinished(e);
            }
        });
        sourceChannel.getCloseSetter().set(inChannel -> {
            IoUtils.safeClose((Closeable)this.sinkChannel);
            this.failUnfinished(new IOException("Server closed connection"));
        });
        sourceChannel.resumeReads();
        ByteBufferBundle byteBufferBundle = new ByteBufferBundle(bufferPool);
        this.sinkChannel.getWriteSetter().set(outChannel -> {
            try {
                while (!commandsQueue.isEmpty() || !byteBufferBundle.isEmpty()) {
                    CommandEncoderDecoder command;
                    ByteBufferSink sink = new ByteBufferSink(byteBufferBundle);
                    while (byteBufferBundle.allocSize() <= 1 && (command = (CommandEncoderDecoder)commandsQueue.poll()) != null) {
                        this.decoderQueue.add(command);
                        command.writeTo(sink);
                    }
                    long bytesWritten = byteBufferBundle.writeTo((GatheringByteChannel)outChannel);
                    if (bytesWritten != 0L) continue;
                    return;
                }
            }
            catch (Throwable e) {
                IoUtils.safeClose((Closeable)this.sinkChannel);
                this.failUnfinished(e);
            }
            outChannel.suspendWrites();
        });
    }

    private void failUnfinished(Throwable e) {
        ReplyDecoder decoder;
        if (this.currentDecoder != null) {
            this.currentDecoder.fail(e);
            this.currentDecoder = null;
        }
        while ((decoder = (ReplyDecoder)this.decoderQueue.poll()) != null) {
            decoder.fail(e);
        }
    }

    private ReplyDecoder decoder() {
        if (this.currentDecoder == null) {
            this.currentDecoder = (ReplyDecoder)this.decoderQueue.poll();
            if (this.currentDecoder == null) {
                this.currentDecoder = new ReplyDecoder(){

                    @Override
                    public boolean parse(ByteBuffer buffer, CharsetDecoder charsetDecoder) throws IOException {
                        int len = buffer.remaining();
                        byte[] bytes = new byte[len];
                        buffer.get(bytes);
                        throw new IllegalStateException("Unexpected input: " + Arrays.toString(bytes));
                    }

                    @Override
                    public void fail(Throwable e) {
                        throw Throwables.propagate((Throwable)e);
                    }

                    @Override
                    public void cancel() {
                    }
                };
            }
        }
        return this.currentDecoder;
    }

    void commandAdded() {
        Channels.resumeWritesAsync((SuspendableWriteChannel)this.sinkChannel);
    }

    public void close() {
        if (this.currentDecoder != null) {
            this.currentDecoder.cancel();
        }
        this.decoderQueue.forEach(ReplyDecoder::cancel);
    }

    static interface CommandEncoderDecoder
    extends ReplyDecoder {
        public void writeTo(ByteSink var1);
    }

    static interface ReplyDecoder {
        public boolean parse(ByteBuffer var1, CharsetDecoder var2) throws IOException;

        public void fail(Throwable var1);

        public void cancel();
    }
}

