/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geronimo.remoting.transport.async.nio;

import EDU.oswego.cs.dl.util.concurrent.Mutex;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Properties;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.geronimo.proxy.SimpleComponent;
import org.apache.geronimo.remoting.transport.ConnectionFailedException;
import org.apache.geronimo.remoting.transport.TransportException;
import org.apache.geronimo.remoting.transport.URISupport;
import org.apache.geronimo.remoting.transport.async.AsyncMsg;
import org.apache.geronimo.remoting.transport.async.Channel;
import org.apache.geronimo.remoting.transport.async.ChannelListner;
import org.apache.geronimo.remoting.transport.async.nio.SelectionEventListner;
import org.apache.geronimo.remoting.transport.async.nio.SelectorManager;

public class NonBlockingChannel
extends SimpleComponent
implements Channel,
SelectionEventListner {
    private static final Log log = LogFactory.getLog((Class)NonBlockingChannel.class);
    private ChannelListner listner;
    private Thread worker;
    private SocketChannel socketChannel;
    private URI remoteURI;
    private boolean closing = false;
    private Inflater inflator;
    private Deflater deflater;
    private SelectorManager selectorManager;
    private SelectionKey selectionKey;
    private URI requestedURI;
    static int nextId = 0;
    Mutex sendMutex = new Mutex();
    ByteBuffer[] receiveBuffer = new ByteBuffer[]{ByteBuffer.allocate(4), ByteBuffer.allocate(10240)};
    ByteBuffer[] sendBuffer = new ByteBuffer[]{ByteBuffer.allocate(4), ByteBuffer.allocate(0)};

    public void open(URI remoteURI, URI backConnectURI, ChannelListner listner) throws TransportException {
        if (log.isTraceEnabled()) {
            log.trace((Object)("Connecting to : " + remoteURI));
        }
        this.listner = listner;
        this.remoteURI = remoteURI;
        int port = remoteURI.getPort();
        boolean enableTcpNoDelay = true;
        Properties params = URISupport.parseQueryParameters(remoteURI);
        enableTcpNoDelay = params.getProperty("tcp.nodelay", "true").equals("true");
        int compression = Integer.parseInt(params.getProperty("compression", "-1"));
        try {
            InetAddress addr = InetAddress.getByName(remoteURI.getHost());
            this.socketChannel = SocketChannel.open();
            this.socketChannel.configureBlocking(true);
            this.socketChannel.connect(new InetSocketAddress(addr, port));
        }
        catch (Exception e) {
            throw new ConnectionFailedException("" + e);
        }
        try {
            this.socketChannel.socket().setTcpNoDelay(enableTcpNoDelay);
            DataOutputStream out = new DataOutputStream(this.socketChannel.socket().getOutputStream());
            out.writeUTF(remoteURI.toString());
            out.writeUTF(backConnectURI.toString());
            out.flush();
            if (compression != -1) {
                this.inflator = new Inflater(true);
                this.deflater = new Deflater(compression, true);
            }
            this.socketChannel.configureBlocking(false);
            this.selectorManager = SelectorManager.getInstance();
            this.selectorManager.start();
            this.selectionKey = this.selectorManager.register(this.socketChannel, 1, this);
        }
        catch (Exception e) {
            throw new TransportException("Connection handshake failed: " + e);
        }
    }

    public void init(URI localURI, SocketChannel socketChannel) throws IOException, URISyntaxException {
        this.socketChannel = socketChannel;
        DataOutputStream out = new DataOutputStream(socketChannel.socket().getOutputStream());
        out.flush();
        DataInputStream in = new DataInputStream(socketChannel.socket().getInputStream());
        String destURI = in.readUTF();
        String sourceURI = in.readUTF();
        this.remoteURI = new URI(sourceURI);
        this.requestedURI = new URI(destURI);
        if (log.isTraceEnabled()) {
            log.trace((Object)("Remote URI    : " + this.remoteURI));
            log.trace((Object)("Requested URI : " + this.requestedURI));
        }
        boolean enableTcpNoDelay = true;
        Properties params = URISupport.parseQueryParameters(this.requestedURI);
        enableTcpNoDelay = params.getProperty("tcp.nodelay", "true").equals("true");
        int compression = Integer.parseInt(params.getProperty("compression", "-1"));
        if (compression != -1) {
            this.inflator = new Inflater(true);
            this.deflater = new Deflater(compression, true);
        }
        socketChannel.socket().setTcpNoDelay(enableTcpNoDelay);
        if (log.isTraceEnabled()) {
            log.trace((Object)("Compression level : " + compression));
            log.trace((Object)("tcp no delay : " + enableTcpNoDelay));
        }
    }

    public void open(ChannelListner listner) throws TransportException {
        try {
            this.listner = listner;
            this.socketChannel.configureBlocking(false);
            this.selectorManager = SelectorManager.getInstance();
            this.selectorManager.start();
            this.selectionKey = this.selectorManager.register(this.socketChannel, 1, this);
        }
        catch (Exception e) {
            throw new TransportException("Connection handshake failed: " + e);
        }
    }

    private synchronized int getNextID() {
        return nextId++;
    }

    private ByteBuffer[] serialize(AsyncMsg data) throws IOException {
        ByteArrayOutputStream baos;
        ByteBuffer[] rc = new ByteBuffer[2];
        rc[0] = ByteBuffer.allocate(4);
        OutputStream t = baos = new ByteArrayOutputStream();
        if (this.deflater != null) {
            t = new DeflaterOutputStream(t, this.deflater);
        }
        DataOutputStream out = new DataOutputStream(t);
        data.writeExternal(out);
        out.close();
        rc[1] = ByteBuffer.wrap(baos.toByteArray());
        rc[0].putInt(rc[1].limit());
        rc[0].rewind();
        rc[1].rewind();
        return rc;
    }

    public AsyncMsg deserialize(ByteBuffer[] message) throws IOException {
        AsyncMsg asyncMsg = new AsyncMsg();
        InputStream t = new ByteArrayInputStream(message[1].array());
        if (this.inflator != null) {
            t = new InflaterInputStream(t, this.inflator);
        }
        DataInputStream in = new DataInputStream(t);
        asyncMsg.readExternal(in);
        in.close();
        return asyncMsg;
    }

    private synchronized void asyncClose() {
        if (this.socketChannel == null) {
            return;
        }
        try {
            this.socketChannel.socket().shutdownInput();
            if (this.closing) {
                this.forcedClose();
            } else {
                this.closing = true;
                this.listner.closeEvent();
            }
        }
        catch (IOException e) {
            this.forcedClose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void close() {
        if (this.socketChannel == null) {
            return;
        }
        try {
            ByteBuffer buffer = ByteBuffer.allocate(4);
            buffer.asIntBuffer().put(-1);
            Mutex mutex = this.sendMutex;
            synchronized (mutex) {
                this.socketChannel.write(buffer);
                this.socketChannel.socket().shutdownOutput();
            }
            if (this.closing) {
                this.forcedClose();
            } else {
                this.closing = true;
            }
        }
        catch (IOException e) {
            this.forcedClose();
        }
    }

    private void forcedClose() {
        if (this.socketChannel == null) {
            return;
        }
        try {
            this.selectionKey.cancel();
            this.socketChannel.close();
            this.socketChannel = null;
            SelectorManager.getInstance().stop();
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        this.socketChannel = null;
    }

    public URI getRemoteURI() {
        return this.remoteURI;
    }

    public synchronized void selectionEvent(SelectionKey selection) {
        if (selection.isWritable()) {
            this.serviceWrite();
        }
        if (selection.isReadable()) {
            this.serviceRead();
        }
    }

    public void send(AsyncMsg data) throws TransportException {
        try {
            ByteBuffer[] buffers = this.serialize(data);
            if (!this.sendMutex.attempt(10000L)) {
                throw new TransportException("Send timeout.");
            }
            if (this.closing) {
                throw new TransportException("connection has been closed.");
            }
            this.sendBuffer = buffers;
            this.flushSendBuffer();
        }
        catch (IOException e) {
            throw new TransportException("" + e);
        }
        catch (InterruptedException e) {
            throw new TransportException("" + e);
        }
    }

    private void flushSendBuffer() throws IOException {
        this.socketChannel.write(this.sendBuffer);
        if (this.sendBuffer[1].hasRemaining()) {
            this.selectorManager.setInterestOps(this.selectionKey, 5);
        } else {
            this.selectorManager.setInterestOps(this.selectionKey, 1);
            this.sendMutex.release();
        }
    }

    private void serviceWrite() {
        try {
            this.flushSendBuffer();
        }
        catch (IOException e) {
            log.debug((Object)"Communications error, closing connection: ", (Throwable)e);
            this.asyncClose();
        }
    }

    private void serviceRead() {
        boolean tracing = log.isTraceEnabled();
        if (tracing) {
            log.trace((Object)"ReadDataAction triggered.");
        }
        try {
            while (true) {
                if (this.receiveBuffer[0].hasRemaining()) {
                    if (tracing) {
                        log.trace((Object)"Reading header");
                    }
                    this.socketChannel.read(this.receiveBuffer[0]);
                    if (this.receiveBuffer[0].hasRemaining()) break;
                    this.receiveBuffer[0].flip();
                    int size = this.receiveBuffer[0].getInt();
                    if (size == -1) break;
                    if (size > this.receiveBuffer[1].capacity()) {
                        this.receiveBuffer[1] = ByteBuffer.allocate(size);
                    }
                    this.receiveBuffer[1].clear();
                    this.receiveBuffer[1].limit(size);
                }
                if (!this.receiveBuffer[1].hasRemaining()) continue;
                if (tracing) {
                    log.trace((Object)"Reading body");
                }
                this.socketChannel.read(this.receiveBuffer[1]);
                if (this.receiveBuffer[1].hasRemaining()) break;
                this.receiveBuffer[0].flip();
                this.listner.receiveEvent(this.deserialize(this.receiveBuffer));
                this.receiveBuffer[0].clear();
            }
            if (tracing) {
                log.trace((Object)"No more data available to be read.");
            }
        }
        catch (IOException e) {
            log.debug((Object)"Communications error, closing connection: ", (Throwable)e);
            this.asyncClose();
        }
    }

    public URI getRequestedURI() {
        return this.requestedURI;
    }
}

