/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geronimo.remoting.transport.async;

import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.Semaphore;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.geronimo.remoting.router.Router;
import org.apache.geronimo.remoting.transport.Msg;
import org.apache.geronimo.remoting.transport.TransportException;
import org.apache.geronimo.remoting.transport.async.AsyncMsg;
import org.apache.geronimo.remoting.transport.async.Channel;
import org.apache.geronimo.remoting.transport.async.ChannelListner;
import org.apache.geronimo.remoting.transport.async.Correlator;
import org.apache.geronimo.remoting.transport.async.Registry;
import org.apache.geronimo.remoting.transport.async.TransportFactory;

public class ChannelPool
implements Router {
    private static final Log log = LogFactory.getLog((Class)ChannelPool.class);
    private final URI remoteURI;
    private URI backConnectURI;
    private final List available = new ArrayList();
    private final Correlator responseManager = new Correlator();
    private Router dispatcher;
    private int createdChannelCount = 0;
    private Executor workManager = Registry.instance.getWorkManager();
    private Semaphore maxOpenConnections = new Semaphore((long)Registry.MAX_CONNECTION_POOL_SIZE);

    public ChannelPool(URI uri, Router dispatcher) {
        this.remoteURI = uri;
        this.dispatcher = dispatcher;
        try {
            this.backConnectURI = Registry.instance.getServerForClientRequest() == null ? new URI("async://localhost:0") : Registry.instance.getServerForClientRequest().getClientConnectURI();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - void declaration
     */
    public void dispose() {
        void var1_4;
        List list = this.available;
        synchronized (list) {
            ArrayList al = new ArrayList();
            al.addAll(this.available);
            Iterator iterator = al.iterator();
        }
        while (var1_4.hasNext()) {
            PooledAsynchChannel c = (PooledAsynchChannel)var1_4.next();
            try {
                c.closeInternal();
            }
            catch (Exception exception) {}
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void associate(Channel c) throws TransportException {
        List list = this.available;
        synchronized (list) {
            PooledAsynchChannel channel = new PooledAsynchChannel(c);
            channel.open();
            this.available.add(channel);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void returnToPool(PooledAsynchChannel c) {
        List list = this.available;
        synchronized (list) {
            this.available.add(c);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void expireIdleConnections(long connectionTimeout) {
        List list = this.available;
        synchronized (list) {
            if (this.available.isEmpty()) {
                return;
            }
            long limit = System.currentTimeMillis() - connectionTimeout;
            for (int i = 0; i < this.available.size(); ++i) {
                PooledAsynchChannel pc = (PooledAsynchChannel)this.available.get(i);
                if (pc.lastUsed >= limit) break;
                this.available.remove(i);
                try {
                    pc.closeInternal();
                    continue;
                }
                catch (TransportException e) {
                    log.trace((Object)"Could not close out a channel correctly.", (Throwable)e);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized PooledAsynchChannel getNextAvailable() throws TransportException {
        try {
            do {
                List list = this.available;
                synchronized (list) {
                    if (!this.available.isEmpty()) {
                        return (PooledAsynchChannel)this.available.remove(this.available.size() - 1);
                    }
                }
            } while (!this.maxOpenConnections.attempt(100L));
        }
        catch (InterruptedException e1) {
            throw new TransportException("(" + this.remoteURI + "): " + e1);
        }
        try {
            log.debug((Object)("channel connecting to: " + this.remoteURI));
            PooledAsynchChannel c = new PooledAsynchChannel(TransportFactory.instance.createAsynchChannel());
            c.open(this.remoteURI, this.backConnectURI);
            return c;
        }
        catch (Exception e) {
            this.maxOpenConnections.release();
            log.debug((Object)"Connect Failed: ", (Throwable)e);
            if (log.isDebugEnabled()) {
                log.debug((Object)("channel connection to: " + this.remoteURI + " failed"), (Throwable)e);
            }
            throw new TransportException("(" + this.remoteURI + "): " + e);
        }
    }

    private void dispatch(AsyncMsg message) {
        boolean trace = log.isTraceEnabled();
        try {
            switch (message.type) {
                case 0: {
                    if (trace) {
                        log.trace((Object)"received datagram request data.");
                    }
                    this.dispatchDatagram(new URI(message.to), message, this);
                    return;
                }
                case 1: {
                    if (trace) {
                        log.trace((Object)("received request data for request: " + message.requestId));
                    }
                    this.dispatchRequest(new URI(message.to), message, this);
                    return;
                }
                case 2: {
                    if (trace) {
                        log.trace((Object)("received response data for request: " + message.requestId));
                    }
                    this.responseManager.dispatchResponse(message.requestId, message);
                    return;
                }
            }
            log.warn((Object)("Protocol Error: unknown message type: " + message.type));
            return;
        }
        catch (URISyntaxException e) {
            log.debug((Object)"Bad request: ", (Throwable)e);
            return;
        }
    }

    public void dispatchDatagram(final URI to, final Msg data, ChannelPool source) {
        if (this.dispatcher == null) {
            log.warn((Object)"Received a datagram but the dispatcher has not been registed.");
            return;
        }
        Runnable work = new Runnable(){

            public void run() {
                try {
                    ChannelPool.this.dispatcher.sendDatagram(to, data);
                }
                catch (Throwable e) {
                    log.trace((Object)"Request Failed.", e);
                }
            }
        };
        try {
            this.workManager.execute(work);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void dispatchRequest(final URI to, final AsyncMsg data, final ChannelPool source) {
        if (this.dispatcher == null) {
            log.warn((Object)"Received a request but the dispatcher has not been registed.");
            return;
        }
        Runnable work = new Runnable(){

            public void run() {
                try {
                    Msg result = ChannelPool.this.dispatcher.sendRequest(to, data);
                    source.sendResponse(result, data.requestId);
                }
                catch (Throwable e) {
                    log.trace((Object)"Request failed.", e);
                }
            }
        };
        try {
            this.workManager.execute(work);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void safeClose(PooledAsynchChannel c) {
        if (c == null) {
            return;
        }
        try {
            c.close();
        }
        catch (TransportException transportException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendDatagram(URI to, Msg data) throws TransportException {
        AsyncMsg d = (AsyncMsg)data;
        PooledAsynchChannel c = this.getNextAvailable();
        try {
            d.type = 0;
            d.to = to.toString();
            c.send(d);
        }
        finally {
            this.safeClose(c);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Msg sendRequest(URI to, Msg data) throws TransportException {
        AsyncMsg d = (AsyncMsg)data;
        PooledAsynchChannel c = this.getNextAvailable();
        Correlator.FutureResult requestId = this.responseManager.getNextFutureResult();
        try {
            d.type = 1;
            d.to = to.toString();
            d.requestId = requestId.getID();
            if (log.isTraceEnabled()) {
                log.trace((Object)("sending request data for request: " + requestId.getID()));
            }
            c.send(d);
        }
        finally {
            this.safeClose(c);
        }
        try {
            AsyncMsg result = (AsyncMsg)requestId.poll(Registry.REQUEST_TIMEOUT);
            if (log.isTraceEnabled()) {
                log.trace((Object)("response data was corelated for request: " + requestId.getID()));
            }
            if (result == null) {
                throw new TransportException("Request time out.");
            }
            return result;
        }
        catch (InterruptedException e) {
            throw new TransportException(e.getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendResponse(Msg data, int requestId) throws TransportException {
        AsyncMsg d = (AsyncMsg)data;
        PooledAsynchChannel c = this.getNextAvailable();
        try {
            d.type = (byte)2;
            d.requestId = requestId;
            if (log.isTraceEnabled()) {
                log.trace((Object)("sending response data for request: " + requestId));
            }
            c.send(d);
        }
        finally {
            this.safeClose(c);
        }
    }

    public int getCreatedChannelCount() {
        return this.createdChannelCount;
    }

    public URI getBackConnectURI() {
        return this.backConnectURI;
    }

    public void setBackConnectURI(URI backConnectURI) {
        this.backConnectURI = backConnectURI;
    }

    private class PooledAsynchChannel
    implements ChannelListner {
        private Channel next;
        boolean doCloseInternal;
        long lastUsed = System.currentTimeMillis();

        PooledAsynchChannel(Channel next) {
            this.next = next;
            ChannelPool.this.createdChannelCount++;
        }

        public void open(URI uri, URI localuri) throws TransportException, TransportException {
            try {
                this.next.open(uri, localuri, this);
            }
            catch (TransportException e) {
                this.doCloseInternal = true;
                throw e;
            }
        }

        public void open() throws TransportException {
            try {
                this.next.open(this);
            }
            catch (TransportException e) {
                this.doCloseInternal = true;
                throw e;
            }
        }

        public void close() throws TransportException {
            if (this.doCloseInternal) {
                this.closeInternal();
            } else {
                ChannelPool.this.returnToPool(this);
            }
        }

        public void closeInternal() throws TransportException {
            ChannelPool.this.createdChannelCount--;
            this.next.close();
            ChannelPool.this.maxOpenConnections.release();
        }

        public void send(AsyncMsg data) throws TransportException {
            try {
                this.lastUsed = System.currentTimeMillis();
                this.next.send(data);
            }
            catch (TransportException e) {
                this.doCloseInternal = true;
                throw e;
            }
        }

        protected void finalize() throws Throwable {
            try {
                this.closeInternal();
            }
            catch (TransportException transportException) {
                // empty catch block
            }
            super.finalize();
        }

        public void receiveEvent(AsyncMsg data) {
            this.lastUsed = System.currentTimeMillis();
            ChannelPool.this.dispatch(data);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void closeEvent() {
            this.doCloseInternal = true;
            List list = ChannelPool.this.available;
            synchronized (list) {
                ChannelPool.this.available.remove(this);
            }
            try {
                this.close();
            }
            catch (TransportException transportException) {
                // empty catch block
            }
        }
    }
}

