/*
 * Decompiled with CFR 0.152.
 */
package hydra.training.dunit;

import dunit.AsyncInvocation;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.SerializableRunnable;
import dunit.VM;
import hydra.RmiRegistryHelper;
import hydra.training.RemoteBlockingQueue;
import hydra.training.RemoteBlockingQueueImpl;
import java.rmi.Naming;
import junit.framework.TestCase;

public class RemoteBlockingQueueTest
extends DistributedTestCase {
    private static final int QUEUE_CAPACITY = 10;
    private String queueURL;

    public RemoteBlockingQueueTest(String name) {
        super(name);
    }

    @Override
    public void setUp() throws Exception {
        String queueName = this.getUniqueName();
        Host host = Host.getHost(0);
        this.queueURL = RmiRegistryHelper.getMasterRegistryURL() + queueName;
        RemoteBlockingQueueImpl queue = new RemoteBlockingQueueImpl(10);
        DistributedTestCase.getLogWriter().info("Binding queue named \"" + this.queueURL + "\"");
        Naming.bind(this.queueURL, queue);
    }

    @Override
    public void tearDown2() throws Exception {
        Naming.unbind(this.queueURL);
    }

    protected RemoteBlockingQueue getQueue() throws Exception {
        return (RemoteBlockingQueue)Naming.lookup(this.queueURL);
    }

    public void testDistributedPut() throws Exception {
        Host host = Host.getHost(0);
        VM vm0 = host.getVM(0);
        VM vm1 = host.getVM(1);
        final Integer element = new Integer(123);
        vm0.invoke(new SerializableRunnable("Put element"){

            @Override
            public void run() {
                try {
                    RemoteBlockingQueue queue = RemoteBlockingQueueTest.this.getQueue();
                    boolean offered = queue.offer(element);
                    TestCase.assertTrue((boolean)offered);
                }
                catch (Exception ex) {
                    DistributedTestCase.fail("Unexpected exception", ex);
                }
            }
        });
        vm1.invoke(new SerializableRunnable("Get element"){

            @Override
            public void run() {
                try {
                    RemoteBlockingQueue queue = RemoteBlockingQueueTest.this.getQueue();
                    Object removed = queue.poll();
                    TestCase.assertEquals((Object)element, (Object)removed);
                }
                catch (Exception ex) {
                    DistributedTestCase.fail("Unexpected exception", ex);
                }
            }
        });
    }

    public void testProducersConsumer() throws Exception {
        Host host = Host.getHost(0);
        int maxElements = 100;
        final int vmCount = host.getVMCount();
        VM vm0 = host.getVM(0);
        AsyncInvocation consumer = vm0.invokeAsync(new SerializableRunnable("Consumer"){

            @Override
            public void run() {
                int[] nextCounts = new int[vmCount - 1];
                try {
                    RemoteBlockingQueue queue = RemoteBlockingQueueTest.this.getQueue();
                    for (int i = 0; i < 100 * (vmCount - 1); ++i) {
                        int value = (Integer)queue.take();
                        DistributedTestCase.getLogWriter().info("Consumed " + value);
                        int whichVM = value / 100;
                        int count = value % 100;
                        TestCase.assertEquals((int)nextCounts[whichVM], (int)count);
                        nextCounts[whichVM] = count + 1;
                    }
                }
                catch (Exception ex) {
                    DistributedTestCase.fail("While consuming", ex);
                }
            }
        });
        AsyncInvocation[] producers = new AsyncInvocation[vmCount - 1];
        int i = 1;
        while (i <= vmCount - 1) {
            final int whichVM = i++;
            VM vm = host.getVM(whichVM);
            producers[whichVM - 1] = vm.invokeAsync(new SerializableRunnable("Produce"){

                @Override
                public void run() {
                    try {
                        RemoteBlockingQueue queue = RemoteBlockingQueueTest.this.getQueue();
                        for (int j = 0; j < 100; ++j) {
                            int value = (whichVM - 1) * 100 + j;
                            DistributedTestCase.getLogWriter().info("Produced " + value);
                            queue.put(new Integer(value));
                        }
                    }
                    catch (Exception ex) {
                        DistributedTestCase.fail("While producing", ex);
                    }
                }
            });
        }
        for (i = 0; i < producers.length; ++i) {
            producers[i].join();
            if (!producers[i].exceptionOccurred()) continue;
            RemoteBlockingQueueTest.fail("Producer " + i + " failed", producers[i].getException());
        }
        consumer.join();
        if (consumer.exceptionOccurred()) {
            RemoteBlockingQueueTest.fail("Consumer failed", consumer.getException());
        }
        RemoteBlockingQueueTest.assertNull((Object)this.getQueue().peek());
    }
}

