package org.codehaus.wadi.group;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.wadi.group.impl.AbstractMsgDispatcher;
import org.codehaus.wadi.group.impl.EnvelopeHelper;

/* loaded from: input_file:org/codehaus/wadi/group/AbstractTestGroup.class */
public abstract class AbstractTestGroup extends TestCase {
    protected static Log _log = LogFactory.getLog(AbstractTestGroup.class);
    protected DispatcherFactory _dispatcherFactory;

    /* loaded from: input_file:org/codehaus/wadi/group/AbstractTestGroup$AsyncServiceEndpoint.class */
    class AsyncServiceEndpoint extends AbstractMsgDispatcher {
        protected final Address localAddress;
        protected final CountDownLatch latch;

        AsyncServiceEndpoint(Dispatcher dispatcher, Class cls, Address address, CountDownLatch countDownLatch) {
            super(dispatcher, cls);
            this.localAddress = address;
            this.latch = countDownLatch;
        }

        public void dispatch(Envelope envelope) throws Exception {
            Address payload = envelope.getPayload();
            Address address = envelope.getAddress();
            Assert.assertSame(this.localAddress, payload);
            Assert.assertSame(this.localAddress, address);
            Assert.assertSame(payload, address);
            this.latch.countDown();
        }
    }

    /* loaded from: input_file:org/codehaus/wadi/group/AbstractTestGroup$DispatcherFactory.class */
    protected interface DispatcherFactory {
        Dispatcher create(String str, String str2, long j) throws Exception;
    }

    /* loaded from: input_file:org/codehaus/wadi/group/AbstractTestGroup$MyClusterListener.class */
    class MyClusterListener implements ClusterListener {
        public int _numRemotePeers = 0;

        MyClusterListener() {
        }

        public void onListenerRegistration(Cluster cluster, Set set) {
            this._numRemotePeers += set.size();
        }

        public void onMembershipChanged(Cluster cluster, Set set, Set set2) {
            this._numRemotePeers += set.size();
            this._numRemotePeers -= set2.size();
            AbstractTestGroup._log.info(cluster.getLocalPeer().getName() + " - onMembershipChanged - joiners:" + set + ", leavers:" + set2);
        }
    }

    /* loaded from: input_file:org/codehaus/wadi/group/AbstractTestGroup$RendezVousEndPoint.class */
    protected class RendezVousEndPoint implements ServiceEndpoint {
        private final Dispatcher dispatcher;

        public RendezVousEndPoint(Dispatcher dispatcher) {
            if (null == dispatcher) {
                throw new IllegalArgumentException("dispatcher is required.");
            }
            this.dispatcher = dispatcher;
        }

        public void dispatch(Envelope envelope) throws Exception {
            this.dispatcher.addRendezVousEnvelope(envelope);
        }

        public void dispose(int i, long j) {
        }

        public boolean testDispatchEnvelope(Envelope envelope) {
            return EnvelopeHelper.isReply(envelope);
        }
    }

    /* loaded from: input_file:org/codehaus/wadi/group/AbstractTestGroup$SyncServiceEndpoint.class */
    class SyncServiceEndpoint extends AbstractMsgDispatcher {
        protected final Address _local;

        SyncServiceEndpoint(Dispatcher dispatcher, Class cls, Address address) {
            super(dispatcher, cls);
            this._local = address;
        }

        public void dispatch(Envelope envelope) throws Exception {
            Address payload = envelope.getPayload();
            Address address = envelope.getAddress();
            Assert.assertSame(this._local, payload);
            Assert.assertSame(this._local, address);
            Assert.assertSame(payload, address);
            this._dispatcher.reply(envelope, payload);
        }
    }

    public abstract DispatcherFactory getDispatcherFactory() throws Exception;

    protected void setUp() throws Exception {
        super.setUp();
        this._dispatcherFactory = getDispatcherFactory();
    }

    protected void tearDown() throws Exception {
        super.tearDown();
    }

    public void testMembership() throws Exception {
        String str = "org.codehaus.wadi.cluster.TEST-" + Math.random();
        Dispatcher create = this._dispatcherFactory.create(str, "red", 5000L);
        create.start();
        MyClusterListener myClusterListener = new MyClusterListener();
        Cluster cluster = create.getCluster();
        cluster.addClusterListener(myClusterListener);
        assertTrue(cluster.waitOnMembershipCount(1, 10000L));
        _log.info(cluster);
        _log.info(create);
        Dispatcher create2 = this._dispatcherFactory.create(str, "green", 5000L);
        create2.start();
        Cluster cluster2 = create2.getCluster();
        MyClusterListener myClusterListener2 = new MyClusterListener();
        cluster2.addClusterListener(myClusterListener2);
        assertTrue(cluster.waitOnMembershipCount(2, 10000L));
        assertTrue(cluster2.waitOnMembershipCount(2, 10000L));
        _log.info(cluster2);
        _log.info(create2);
        assertEquals(1, myClusterListener._numRemotePeers);
        assertEquals(1, cluster.getRemotePeers().size());
        assertEquals(1, myClusterListener2._numRemotePeers);
        assertEquals(1, cluster2.getRemotePeers().size());
        Map remotePeers = cluster.getRemotePeers();
        Peer peer = (Peer) remotePeers.values().iterator().next();
        assertTrue(remotePeers.get(peer.getAddress()) == peer);
        Map remotePeers2 = cluster2.getRemotePeers();
        Peer peer2 = (Peer) remotePeers2.values().iterator().next();
        assertTrue(remotePeers2.get(peer2.getAddress()) == peer2);
        create2.stop();
        cluster2.removeClusterListener(myClusterListener2);
        assertTrue(cluster.waitOnMembershipCount(1, 10000L));
        assertEquals(0, myClusterListener._numRemotePeers);
        assertEquals(0, cluster.getRemotePeers().size());
        create.stop();
        cluster.removeClusterListener(myClusterListener);
    }

    public void testDispatcher() throws Exception {
        String str = "org.codehaus.wadi.cluster.TEST-" + Math.random();
        Dispatcher create = this._dispatcherFactory.create(str, "red", 5000L);
        Dispatcher create2 = this._dispatcherFactory.create(str, "green", 5000L);
        Cluster cluster = create.getCluster();
        Cluster cluster2 = create2.getCluster();
        create.start();
        _log.info(cluster);
        _log.info(create);
        _log.info(cluster.getRemotePeers());
        assertTrue(cluster.waitOnMembershipCount(1, 10000L));
        _log.info(cluster.getRemotePeers());
        create2.start();
        _log.info(cluster2);
        _log.info(create2);
        assertTrue(cluster.waitOnMembershipCount(2, 10000L));
        assertTrue(cluster2.waitOnMembershipCount(2, 10000L));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AsyncServiceEndpoint asyncServiceEndpoint = new AsyncServiceEndpoint(create2, Address.class, cluster2.getLocalPeer().getAddress(), countDownLatch);
        create2.register(asyncServiceEndpoint);
        Peer peer = (Peer) cluster.getRemotePeers().values().iterator().next();
        create.send(peer.getAddress(), peer.getAddress());
        countDownLatch.await();
        create2.unregister(asyncServiceEndpoint, 10, 500L);
        RendezVousEndPoint rendezVousEndPoint = new RendezVousEndPoint(create);
        create.register(rendezVousEndPoint);
        SyncServiceEndpoint syncServiceEndpoint = new SyncServiceEndpoint(create2, Address.class, cluster2.getLocalPeer().getAddress());
        create2.register(syncServiceEndpoint);
        Address address = ((Peer) cluster.getRemotePeers().values().iterator().next()).getAddress();
        assertTrue(address == ((Address) create.exchangeSend(address, address, 5000L).getPayload()));
        create2.unregister(syncServiceEndpoint, 10, 500L);
        create.unregister(rendezVousEndPoint, 10, 5000L);
        create2.stop();
        assertTrue(cluster.waitOnMembershipCount(1, 10000L));
        create.stop();
    }
}
