package org.marketcetera.modules.async;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import javax.management.Attribute;
import javax.management.AttributeList;
import javax.management.AttributeNotFoundException;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanInfo;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.marketcetera.module.BlockingSinkDataListener;
import org.marketcetera.module.CopierModule;
import org.marketcetera.module.CopierModuleFactory;
import org.marketcetera.module.DataFlowID;
import org.marketcetera.module.DataFlowInfo;
import org.marketcetera.module.DataRequest;
import org.marketcetera.module.ExpectedFailure;
import org.marketcetera.module.IllegalRequestParameterValue;
import org.marketcetera.module.Messages;
import org.marketcetera.module.ModuleManager;
import org.marketcetera.module.ModuleState;
import org.marketcetera.module.ModuleTestBase;
import org.marketcetera.module.ModuleURN;
import org.marketcetera.module.SinkDataListener;
import org.marketcetera.module.SinkModuleFactory;
import org.marketcetera.util.misc.ClassVersion;

@ClassVersion("$Id$")
/* loaded from: input_file:org/marketcetera/modules/async/AsyncModuleTest.class */
public class AsyncModuleTest extends ModuleTestBase {
    private ModuleManager mManager;

    /* loaded from: input_file:org/marketcetera/modules/async/AsyncModuleTest$FlowSpecificListener.class */
    private static class FlowSpecificListener implements SinkDataListener {
        private final Map<DataFlowID, BlockingQueue<Object>> mFlowData = new ConcurrentHashMap();
        private final Map<DataFlowID, String> mThreadNames = new HashMap();

        private FlowSpecificListener() {
        }

        public void receivedData(DataFlowID dataFlowID, Object obj) {
            BlockingQueue<Object> blockingQueue = this.mFlowData.get(dataFlowID);
            if (blockingQueue == null) {
                synchronized (this) {
                    blockingQueue = this.mFlowData.get(dataFlowID);
                    if (blockingQueue == null) {
                        blockingQueue = new LinkedBlockingQueue();
                        this.mFlowData.put(dataFlowID, blockingQueue);
                        this.mThreadNames.put(dataFlowID, Thread.currentThread().getName());
                    }
                }
            }
            blockingQueue.add(obj);
        }

        public Object getNextDataFor(DataFlowID dataFlowID) throws InterruptedException {
            BlockingQueue<Object> blockingQueue = this.mFlowData.get(dataFlowID);
            if (blockingQueue == null) {
                return null;
            }
            return blockingQueue.take();
        }

        public String getThreadNameFor(DataFlowID dataFlowID) {
            String str;
            synchronized (this) {
                str = this.mThreadNames.get(dataFlowID);
            }
            return str;
        }

        public Set<DataFlowID> getFlows() {
            return this.mFlowData.keySet();
        }
    }

    @Test
    public void info() throws Exception {
        assertProviderInfo(this.mManager, SimpleAsyncProcessorFactory.PROVIDER_URN, new String[]{ModuleURN.class.getName()}, new Class[]{ModuleURN.class}, Messages.PROVIDER_DESCRIPTION.getText(), true, true);
        ModuleURN moduleURN = new ModuleURN(SimpleAsyncProcessorFactory.PROVIDER_URN, "mymodule");
        Assert.assertEquals(moduleURN, this.mManager.createModule(SimpleAsyncProcessorFactory.PROVIDER_URN, new Object[]{moduleURN}));
        assertModuleInfo(this.mManager, moduleURN, ModuleState.STARTED, null, null, false, true, true, true, false);
        Assert.assertTrue(getAttributes(moduleURN).isEmpty());
        this.mManager.stop(moduleURN);
        this.mManager.deleteModule(moduleURN);
    }

    @Test
    public void requestParameters() throws Exception {
        final ModuleURN moduleURN = new ModuleURN(SimpleAsyncProcessorFactory.PROVIDER_URN, "mymodule");
        new ExpectedFailure<IllegalRequestParameterValue>(Messages.ILLEGAL_REQ_PARM_VALUE, new Object[]{moduleURN.getValue(), "not null value"}) { // from class: org.marketcetera.modules.async.AsyncModuleTest.1
            public void run() throws Exception {
                AsyncModuleTest.this.mManager.createDataFlow(new DataRequest[]{new DataRequest(CopierModuleFactory.INSTANCE_URN, "doesnt matter"), new DataRequest(moduleURN, "not null value")});
            }
        };
        List moduleInstances = this.mManager.getModuleInstances(SimpleAsyncProcessorFactory.PROVIDER_URN);
        Assert.assertTrue(moduleInstances.toString(), moduleInstances.isEmpty());
    }

    /* JADX WARN: Type inference failed for: r0v82, types: [org.marketcetera.modules.async.AsyncModuleTest$6] */
    @Test
    public void simpleFlowAndJMX() throws Exception {
        ModuleURN moduleURN = new ModuleURN(SimpleAsyncProcessorFactory.PROVIDER_URN, "mymodule");
        Object[] objArr = {BigDecimal.ONE, 2, "three"};
        CopierModule.SynchronousRequest synchronousRequest = new CopierModule.SynchronousRequest(objArr);
        synchronousRequest.semaphore.acquire();
        BlockingSinkDataListener blockingSinkDataListener = new BlockingSinkDataListener();
        this.mManager.addSinkListener(blockingSinkDataListener);
        DataFlowID createDataFlow = this.mManager.createDataFlow(new DataRequest[]{new DataRequest(CopierModuleFactory.INSTANCE_URN, synchronousRequest), new DataRequest(moduleURN, (Object) null)});
        synchronousRequest.semaphore.acquire();
        for (Object obj : objArr) {
            Assert.assertEquals(obj, blockingSinkDataListener.getNextData());
        }
        DataFlowInfo dataFlowInfo = this.mManager.getDataFlowInfo(createDataFlow);
        assertFlowInfo(dataFlowInfo, createDataFlow, 3, true, false, null, null);
        assertFlowStep(dataFlowInfo.getFlowSteps()[1], moduleURN, true, 3, 0, null, true, 3, 0, null, moduleURN, null);
        assertFlowStep(dataFlowInfo.getFlowSteps()[2], SinkModuleFactory.INSTANCE_URN, false, 0, 0, null, true, 3, 0, null, SinkModuleFactory.INSTANCE_URN, null);
        assertModuleInfo(this.mManager, moduleURN, ModuleState.STARTED, null, new DataFlowID[]{createDataFlow}, true, true, true, true, false);
        final ObjectName objectName = moduleURN.toObjectName();
        final MBeanServer mBeanServer = getMBeanServer();
        Assert.assertTrue(mBeanServer.isRegistered(objectName));
        MBeanInfo mBeanInfo = mBeanServer.getMBeanInfo(objectName);
        Assert.assertEquals(SimpleAsyncProcessor.class.getName(), mBeanInfo.getClassName());
        Assert.assertEquals(Messages.JMX_MXBEAN_DESCRIPTION.getText(), mBeanInfo.getDescription());
        Assert.assertEquals(0L, mBeanInfo.getOperations().length);
        Assert.assertEquals(0L, mBeanInfo.getConstructors().length);
        Assert.assertEquals(0L, mBeanInfo.getNotifications().length);
        Assert.assertEquals(0L, mBeanInfo.getDescriptor().getFieldNames().length);
        MBeanAttributeInfo[] attributes = mBeanInfo.getAttributes();
        Assert.assertEquals(1L, attributes.length);
        final String str = "Flow" + createDataFlow;
        Assert.assertEquals(str, attributes[0].getName());
        Assert.assertEquals(Integer.class.getName(), attributes[0].getType());
        Assert.assertEquals(Messages.JMX_ATTRIBUTE_FLOW_CNT_DESCRIPTION.getText(createDataFlow), attributes[0].getDescription());
        Assert.assertEquals(0L, attributes[0].getDescriptor().getFieldNames().length);
        Assert.assertFalse(attributes[0].isIs());
        Assert.assertFalse(attributes[0].isWritable());
        Assert.assertTrue(attributes[0].isReadable());
        Assert.assertEquals(0, (Integer) mBeanServer.getAttribute(objectName, "Flow" + createDataFlow));
        new ExpectedFailure<AttributeNotFoundException>("Flow1") { // from class: org.marketcetera.modules.async.AsyncModuleTest.2
            protected void run() throws Exception {
                mBeanServer.getAttribute(objectName, "Flow1");
            }
        };
        new ExpectedFailure<AttributeNotFoundException>("blah") { // from class: org.marketcetera.modules.async.AsyncModuleTest.3
            protected void run() throws Exception {
                mBeanServer.getAttribute(objectName, "blah");
            }
        };
        AttributeList attributes2 = mBeanServer.getAttributes(objectName, new String[]{str, "Flow1"});
        Assert.assertEquals(1L, attributes2.size());
        Assert.assertEquals(new Attribute(str, 0), attributes2.get(0));
        new ExpectedFailure<AttributeNotFoundException>() { // from class: org.marketcetera.modules.async.AsyncModuleTest.4
            protected void run() throws Exception {
                mBeanServer.setAttribute(objectName, new Attribute(str, 34));
            }
        };
        new ExpectedFailure<AttributeNotFoundException>() { // from class: org.marketcetera.modules.async.AsyncModuleTest.5
            protected void run() throws Exception {
                mBeanServer.setAttribute(objectName, new Attribute("Flow1", 34));
            }
        };
        Assert.assertEquals(0L, mBeanServer.setAttributes(objectName, new AttributeList(Arrays.asList(new Attribute(str, 12), new Attribute("Flow1", 13)))).size());
        ReflectionException exception = new ExpectedFailure<ReflectionException>() { // from class: org.marketcetera.modules.async.AsyncModuleTest.6
            protected void run() throws Exception {
                mBeanServer.invoke(objectName, "getQueueSizes", (Object[]) null, (String[]) null);
            }
        }.getException();
        Assert.assertTrue(exception.toString(), exception.getCause() instanceof NoSuchMethodException);
        this.mManager.cancel(createDataFlow);
        List moduleInstances = this.mManager.getModuleInstances(SimpleAsyncProcessorFactory.PROVIDER_URN);
        Assert.assertTrue(moduleInstances.toString(), moduleInstances.isEmpty());
        this.mManager.removeSinkListener(blockingSinkDataListener);
    }

    @Test
    public void noEmitFromOtherFlows() throws Exception {
        ModuleURN moduleURN = new ModuleURN(SimpleAsyncProcessorFactory.PROVIDER_URN, "mymodule");
        FlowSpecificListener flowSpecificListener = new FlowSpecificListener();
        this.mManager.addSinkListener(flowSpecificListener);
        DataFlowID createDataFlow = this.mManager.createDataFlow(new DataRequest[]{new DataRequest(moduleURN)});
        Object[] objArr = {"firstOne", BigDecimal.TEN, "uno"};
        Object[] objArr2 = {"secondOne", BigDecimal.ZERO, "dos"};
        DataFlowID createDataFlow2 = this.mManager.createDataFlow(new DataRequest[]{new DataRequest(CopierModuleFactory.INSTANCE_URN, objArr), new DataRequest(moduleURN)});
        DataFlowID createDataFlow3 = this.mManager.createDataFlow(new DataRequest[]{new DataRequest(CopierModuleFactory.INSTANCE_URN, objArr2), new DataRequest(moduleURN)});
        while (!flowSpecificListener.getFlows().contains(createDataFlow3)) {
            Thread.sleep(100L);
        }
        for (Object obj : objArr2) {
            Assert.assertEquals(obj, flowSpecificListener.getNextDataFor(createDataFlow3));
        }
        while (!flowSpecificListener.getFlows().contains(createDataFlow2)) {
            Thread.sleep(100L);
        }
        for (Object obj2 : objArr) {
            Assert.assertEquals(obj2, flowSpecificListener.getNextDataFor(createDataFlow2));
        }
        Assert.assertThat(flowSpecificListener.getFlows(), Matchers.not(Matchers.hasItem(createDataFlow)));
        Assert.assertEquals(0, getMBeanServer().getAttribute(moduleURN.toObjectName(), "Flow" + createDataFlow));
        Assert.assertEquals(0, getMBeanServer().getAttribute(moduleURN.toObjectName(), "Flow" + createDataFlow2));
        Assert.assertEquals(0, getMBeanServer().getAttribute(moduleURN.toObjectName(), "Flow" + createDataFlow3));
        Assert.assertEquals((Object) null, flowSpecificListener.getThreadNameFor(createDataFlow));
        Assert.assertThat(flowSpecificListener.getThreadNameFor(createDataFlow2), Matchers.startsWith("SimpleAsyncProc-" + moduleURN.instanceName()));
        Assert.assertThat(flowSpecificListener.getThreadNameFor(createDataFlow3), Matchers.startsWith("SimpleAsyncProc-" + moduleURN.instanceName()));
        Assert.assertThat(flowSpecificListener.getThreadNameFor(createDataFlow2), Matchers.not(Matchers.equalTo(flowSpecificListener.getThreadNameFor(createDataFlow3))));
        Assert.assertThat(flowSpecificListener.getThreadNameFor(createDataFlow2), Matchers.not(Matchers.equalTo(Thread.currentThread().getName())));
        this.mManager.cancel(createDataFlow);
        this.mManager.cancel(createDataFlow2);
        List<String> attributes = getAttributes(moduleURN);
        Assert.assertEquals(1L, attributes.size());
        Assert.assertThat(attributes, Matchers.hasItem("Flow" + createDataFlow3));
        this.mManager.cancel(createDataFlow3);
        List moduleInstances = this.mManager.getModuleInstances(SimpleAsyncProcessorFactory.PROVIDER_URN);
        Assert.assertTrue(moduleInstances.toString(), moduleInstances.isEmpty());
        this.mManager.removeSinkListener(flowSpecificListener);
    }

    @Test(timeout = 10000)
    public void slowConsumer() throws Exception {
        ModuleURN moduleURN = new ModuleURN(SimpleAsyncProcessorFactory.PROVIDER_URN, "mymodule");
        Object[] objArr = {"item1", "item2", "item3", "item4"};
        DataFlowID createDataFlow = this.mManager.createDataFlow(new DataRequest[]{new DataRequest(CopierModuleFactory.INSTANCE_URN, objArr), new DataRequest(moduleURN), new DataRequest(BlockingModuleFactory.INSTANCE_URN)});
        do {
            Thread.sleep(100L);
        } while (this.mManager.getDataFlowInfo(createDataFlow).getFlowSteps()[0].getNumEmitted() < 4);
        for (int i = 0; i < objArr.length; i++) {
            BlockingModuleFactory.getLastInstance().getSemaphore().acquire();
            assertFlowStep(this.mManager.getDataFlowInfo(createDataFlow).getFlowSteps()[2], BlockingModuleFactory.INSTANCE_URN, false, 0, 0, null, true, i + 1, 0, null, BlockingModuleFactory.INSTANCE_URN, null);
            if (i < objArr.length - 1) {
                Assert.assertEquals(Integer.valueOf((objArr.length - 1) - i), getMBeanServer().getAttribute(moduleURN.toObjectName(), "Flow" + createDataFlow));
            }
            Assert.assertEquals(objArr[i], BlockingModuleFactory.getLastInstance().getNextData());
        }
        Assert.assertEquals(0, getMBeanServer().getAttribute(moduleURN.toObjectName(), "Flow" + createDataFlow));
        this.mManager.cancel(createDataFlow);
    }

    private List<String> getAttributes(ModuleURN moduleURN) throws Exception {
        MBeanInfo mBeanInfo = getMBeanServer().getMBeanInfo(moduleURN.toObjectName());
        ArrayList arrayList = new ArrayList();
        for (MBeanAttributeInfo mBeanAttributeInfo : mBeanInfo.getAttributes()) {
            arrayList.add(mBeanAttributeInfo.getName());
        }
        return arrayList;
    }

    @Before
    public void setup() throws Exception {
        this.mManager = new ModuleManager();
        this.mManager.init();
    }

    @After
    public void cleanup() throws Exception {
        this.mManager.stop();
        this.mManager = null;
    }
}
