/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geronimo.remoting.transport.async.bio;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Properties;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.geronimo.proxy.SimpleComponent;
import org.apache.geronimo.remoting.transport.ConnectionFailedException;
import org.apache.geronimo.remoting.transport.TransportException;
import org.apache.geronimo.remoting.transport.URISupport;
import org.apache.geronimo.remoting.transport.async.AsyncMsg;
import org.apache.geronimo.remoting.transport.async.Channel;
import org.apache.geronimo.remoting.transport.async.ChannelListner;

public class BlockingChannel
extends SimpleComponent
implements Runnable,
Channel {
    private static final Log log = LogFactory.getLog((Class)BlockingChannel.class);
    private ChannelListner listner;
    private Thread worker;
    private SocketChannel socketChannel;
    private boolean closing = false;
    private Inflater inflator;
    private Deflater deflater;
    private URI remoteURI;
    private URI requestedURI;
    static int nextId = 0;
    private Object sendMutex = new Object();

    public void open(URI remoteURI, URI backConnectURI, ChannelListner listner) throws TransportException {
        if (log.isTraceEnabled()) {
            log.trace((Object)("Connecting to : " + remoteURI));
        }
        this.listner = listner;
        this.remoteURI = remoteURI;
        int port = remoteURI.getPort();
        boolean enableTcpNoDelay = true;
        Properties params = URISupport.parseQueryParameters(remoteURI);
        enableTcpNoDelay = params.getProperty("tcp.nodelay", "true").equals("true");
        int compression = Integer.parseInt(params.getProperty("compression", "-1"));
        try {
            InetAddress addr = InetAddress.getByName(remoteURI.getHost());
            this.socketChannel = SocketChannel.open();
            this.socketChannel.configureBlocking(true);
            this.socketChannel.connect(new InetSocketAddress(addr, port));
        }
        catch (Exception e) {
            throw new ConnectionFailedException("" + e);
        }
        try {
            this.socketChannel.socket().setTcpNoDelay(enableTcpNoDelay);
            DataOutputStream out = new DataOutputStream(this.socketChannel.socket().getOutputStream());
            out.writeUTF(remoteURI.toString());
            out.writeUTF(backConnectURI.toString());
            out.flush();
            if (compression != -1) {
                this.inflator = new Inflater(true);
                this.deflater = new Deflater(compression, true);
            }
        }
        catch (Exception e) {
            throw new TransportException("Connection handshake failed: " + e);
        }
        this.worker = new Thread((Runnable)this, "Channel -> " + remoteURI);
        this.worker.setDaemon(true);
        this.worker.start();
    }

    public void init(URI localURI, SocketChannel socketChannel) throws IOException, URISyntaxException {
        this.socketChannel = socketChannel;
        DataOutputStream out = new DataOutputStream(socketChannel.socket().getOutputStream());
        out.flush();
        DataInputStream in = new DataInputStream(socketChannel.socket().getInputStream());
        String destURI = in.readUTF();
        String sourceURI = in.readUTF();
        this.remoteURI = new URI(sourceURI);
        this.requestedURI = new URI(destURI);
        if (log.isTraceEnabled()) {
            log.trace((Object)("Remote URI    : " + this.remoteURI));
            log.trace((Object)("Requested URI : " + this.requestedURI));
        }
        boolean enableTcpNoDelay = true;
        Properties params = URISupport.parseQueryParameters(this.requestedURI);
        enableTcpNoDelay = params.getProperty("tcp.nodelay", "true").equals("true");
        int compression = Integer.parseInt(params.getProperty("compression", "-1"));
        if (compression != -1) {
            this.inflator = new Inflater(true);
            this.deflater = new Deflater(compression, true);
        }
        socketChannel.socket().setTcpNoDelay(enableTcpNoDelay);
        if (log.isTraceEnabled()) {
            log.trace((Object)("Compression level : " + compression));
            log.trace((Object)("tcp no delay : " + enableTcpNoDelay));
        }
    }

    public void open(ChannelListner listner) throws TransportException {
        this.listner = listner;
        this.worker = new Thread((Runnable)this, "Channel <- " + this.remoteURI);
        this.worker.setDaemon(true);
        this.worker.start();
    }

    private synchronized int getNextID() {
        return nextId++;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void send(AsyncMsg data) throws TransportException {
        try {
            ByteBuffer[] buffers = this.serialize(data);
            Object object = this.sendMutex;
            synchronized (object) {
                if (this.closing) {
                    throw new TransportException("connection has been closed.");
                }
                this.socketChannel.write(buffers);
            }
        }
        catch (IOException e) {
            throw new TransportException("" + e);
        }
    }

    private ByteBuffer[] serialize(AsyncMsg data) throws IOException {
        ByteArrayOutputStream baos;
        ByteBuffer[] rc = new ByteBuffer[2];
        rc[0] = ByteBuffer.allocate(4);
        OutputStream t = baos = new ByteArrayOutputStream();
        if (this.deflater != null) {
            t = new DeflaterOutputStream(t, this.deflater);
        }
        DataOutputStream out = new DataOutputStream(t);
        data.writeExternal(out);
        out.close();
        rc[1] = ByteBuffer.wrap(baos.toByteArray());
        rc[0].putInt(rc[1].limit());
        rc[0].rewind();
        rc[1].rewind();
        return rc;
    }

    public AsyncMsg deserialize(ByteBuffer[] message) throws IOException {
        AsyncMsg asyncMsg = new AsyncMsg();
        InputStream t = new ByteArrayInputStream(message[1].array());
        if (this.inflator != null) {
            t = new InflaterInputStream(t, this.inflator);
        }
        DataInputStream in = new DataInputStream(t);
        asyncMsg.readExternal(in);
        in.close();
        return asyncMsg;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        ByteBuffer[] message = new ByteBuffer[]{ByteBuffer.allocate(4), ByteBuffer.allocate(10240)};
        try {
            while (true) {
                log.trace((Object)"Waiting for message");
                message[0].clear();
                this.socketChannel.read(message[0]);
                while (message[0].position() != 4) {
                    this.socketChannel.read(message[0]);
                }
                message[0].flip();
                int size = message[0].getInt();
                if (size == -1) break;
                if (size > message[1].capacity()) {
                    message[1] = ByteBuffer.allocate(size);
                }
                message[1].clear();
                message[1].limit(size);
                this.socketChannel.read(message[1]);
                while (message[1].position() != size) {
                    this.socketChannel.read(message[1]);
                }
                message[1].flip();
                this.listner.receiveEvent(this.deserialize(message));
            }
            log.trace((Object)"Stopping due to remote end closing.");
        }
        catch (IOException e) {
            log.trace((Object)"Stopping due to exception.", (Throwable)e);
        }
        finally {
            this.asyncClose();
        }
        log.trace((Object)"Stopped");
    }

    private synchronized void asyncClose() {
        if (this.socketChannel == null) {
            return;
        }
        try {
            this.socketChannel.socket().shutdownInput();
            if (this.closing) {
                this.socketChannel.close();
            } else {
                this.closing = true;
                this.listner.closeEvent();
            }
        }
        catch (IOException e) {
            this.forcedClose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void close() {
        if (this.socketChannel == null) {
            return;
        }
        try {
            ByteBuffer buffer = ByteBuffer.allocate(4);
            buffer.asIntBuffer().put(-1);
            Object object = this.sendMutex;
            synchronized (object) {
                this.socketChannel.write(buffer);
                this.socketChannel.socket().shutdownOutput();
            }
            if (this.closing) {
                this.socketChannel.close();
                this.socketChannel = null;
            } else {
                this.closing = true;
            }
        }
        catch (IOException e) {
            this.forcedClose();
        }
    }

    private void forcedClose() {
        try {
            this.socketChannel.close();
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        this.socketChannel = null;
    }

    public URI getRemoteURI() {
        return this.remoteURI;
    }

    public URI getRequestedURI() {
        return this.requestedURI;
    }
}

