/*
 * Decompiled with CFR 0.152.
 */
package cq;

import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.CacheListener;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.query.CqAttributes;
import com.gemstone.gemfire.cache.query.CqAttributesFactory;
import com.gemstone.gemfire.cache.query.CqClosedException;
import com.gemstone.gemfire.cache.query.CqQuery;
import com.gemstone.gemfire.cache.query.CqResults;
import com.gemstone.gemfire.cache.query.Query;
import com.gemstone.gemfire.cache.query.QueryService;
import com.gemstone.gemfire.cache.query.SelectResults;
import com.gemstone.gemfire.internal.NanoTimer;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import cq.CQUtil;
import cq.CQUtilBB;
import cq.CQUtilPrms;
import cq.ConcCQBB;
import cq.EntryEventBB;
import cq.EntryEventListener;
import hct.HctPrms;
import hydra.BridgeHelper;
import hydra.CacheHelper;
import hydra.ClientVmInfo;
import hydra.ClientVmMgr;
import hydra.ClientVmNotFoundException;
import hydra.ConfigPrms;
import hydra.GsRandom;
import hydra.Log;
import hydra.RegionHelper;
import hydra.TestConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import mapregion.MapBB;
import mapregion.MapPrms;
import mapregion.MapRegionTest;
import util.PRObserver;
import util.TestException;
import util.TestHelper;

public class ConcCQTest
extends MapRegionTest {
    private static String bridgeRegionName;
    private static String edgeRegionName;
    protected static QueryService cqService;
    private static final int MAX_PUT = 50;
    private static long killInterval;
    private static CQUtilBB cqBB;
    private static GsRandom rand;
    protected static ConcCQTest testInstance;
    protected static boolean waitForEventsAlreadyPerformed;
    protected static boolean cqsOn;
    private static Class valueConstraint;
    static List endpoints;
    static volatile int endpoint_cntr;
    protected static final int EXCEUTE_CQ = 1;
    protected static final int EXCEUTE_CQ_WITH_INITIAL_RESULTS = 2;
    protected static final int STOP_CQ = 3;
    protected static final int CLOSE_CQ = 4;
    protected static final int DO_NOTHING = 99;
    protected static final int[] cqOperations;
    protected static List cqList;

    public static void initServerRegion() {
        PRObserver.installObserverHook();
        CacheHelper.createCache(ConfigPrms.getCacheConfig());
        Region reg = RegionHelper.createRegion(ConfigPrms.getBridgeConfig());
        bridgeRegionName = reg.getName();
        Log.getLogWriter().info("created cache and region in bridge");
        BridgeHelper.startBridgeServer(ConfigPrms.getBridgeConfig());
        Log.getLogWriter().info("started bridge server");
    }

    public static void initServerWithMultRegions() {
        PRObserver.installObserverHook();
        cqsOn = TestConfig.tab().booleanAt(CQUtilPrms.CQsOn, true);
        CacheHelper.createCache(ConfigPrms.getCacheConfig());
        RegionAttributes attributes = RegionHelper.getRegionAttributes(ConfigPrms.getRegionConfig());
        CacheListener[] assignedListeners = attributes.getCacheListeners();
        AttributesFactory factory = RegionHelper.getAttributesFactory(ConfigPrms.getRegionConfig());
        String valueConst = TestConfig.tab().stringAt(CQUtilPrms.valueConstraint, null);
        if (valueConst != null) {
            try {
                valueConstraint = Class.forName(valueConst);
            }
            catch (ClassNotFoundException e) {
                throw new TestException("Could not find specified class: " + valueConst + "\n" + TestHelper.getStackTrace(e));
            }
            factory.setValueConstraint(valueConstraint);
        }
        String[] regionNames = MapPrms.getRegionNames();
        for (int i = 0; i < regionNames.length; ++i) {
            RegionHelper.createRegion(regionNames[i], factory);
        }
        Region reg = RegionHelper.getRegion(regionNames[0]);
        Scope scope = reg.getAttributes().getScope();
        CQUtilBB.getBB().getSharedMap().put("Scope", scope);
        for (int i = 0; i < regionNames.length; ++i) {
            Region aRegion = RegionHelper.getRegion(regionNames[i]);
            EntryEventListener eeListener = new EntryEventListener();
            if (cqsOn || aRegion == null) continue;
            for (int j = 0; j < assignedListeners.length; ++j) {
                aRegion.getAttributesMutator().removeCacheListener(assignedListeners[j]);
            }
            aRegion.getAttributesMutator().addCacheListener((CacheListener)eeListener);
        }
        Log.getLogWriter().info("created cache and region in bridge");
        BridgeHelper.startBridgeServer(ConfigPrms.getBridgeConfig());
        Log.getLogWriter().info("started bridge server");
    }

    public static void waitForServerHAQueuesToDrain() {
        HashMap<ClientProxyMembershipID, Integer> proxyToQueueSizeMap = new HashMap<ClientProxyMembershipID, Integer>();
        Log.getLogWriter().info("Making use of Queue Size statistics on server side to wait for CQ events getting drained");
        CacheClientNotifier ccnInstance = CacheClientNotifier.getInstance();
        ArrayList clientProxyList = new ArrayList();
        clientProxyList.addAll(ccnInstance.getClientProxies());
        Log.getLogWriter().info("Server has " + clientProxyList.size() + " client proxies");
        boolean timeout = false;
        long start = NanoTimer.getTime();
        while (clientProxyList.size() != 0) {
            HashSet proxySet = new HashSet();
            proxySet.addAll(clientProxyList);
            for (CacheClientProxy clientProxy : proxySet) {
                int size = clientProxy.getQueueSizeStat();
                if (size == 0) {
                    clientProxyList.remove(clientProxy);
                    Log.getLogWriter().info("Queue size reached zero for ClientProxy:: " + clientProxy.getProxyID());
                    continue;
                }
                if (proxyToQueueSizeMap.get(clientProxy.getProxyID()) != null) {
                    int oldSize = (Integer)proxyToQueueSizeMap.get(clientProxy.getProxyID());
                    if (oldSize == size) {
                        clientProxyList.remove(clientProxy);
                        Log.getLogWriter().warning("Size of the HA Queue for client proxy did not reach zero, instead it remained constant at: " + oldSize + " for a 5 second period... this could be because of Bug 47390. Continuing with the test...");
                        continue;
                    }
                    Log.getLogWriter().info("For client proxy, " + clientProxy.getProxyID() + " HA queue size reduced from  " + oldSize + " to " + size);
                    proxyToQueueSizeMap.put(clientProxy.getProxyID(), size);
                    continue;
                }
                proxyToQueueSizeMap.put(clientProxy.getProxyID(), size);
            }
            long two_minutes = 120000000000L;
            if (NanoTimer.getTime() - start > two_minutes) {
                timeout = true;
                break;
            }
            try {
                Log.getLogWriter().info("Sleeping for 5 seconds to allow queues to drain for all client proxies");
                Thread.sleep(5000L);
            }
            catch (InterruptedException e) {
                Log.getLogWriter().warning("Caught interrupted exception" + e);
            }
        }
        if (timeout) {
            throw new TestException("Wait for all HA Queue sizes to zero or a constant value timedout. It took more than 2 minutes but queue size was still changing");
        }
    }

    public static void performQuery() {
        try {
            QueryService qs = CacheHelper.getCache().getQueryService();
            int id = (int)MapBB.getBB().getSharedCounters().read(MapBB.NUM_PUT);
            String queryStr = "select distinct * from " + RegionHelper.getRegion(edgeRegionName).getFullPath() + " where ID > " + Integer.toString(-1);
            Query query = qs.newQuery(queryStr);
            Object results = query.execute();
            Log.getLogWriter().info("executed query " + queryStr);
            if (results instanceof SelectResults) {
                int size = ((SelectResults)results).size();
                if (size != id) {
                    Log.getLogWriter().info("Result set should have been " + id + " but is " + size);
                }
            } else {
                Log.getLogWriter().info("Result set not instanceof SelectResults");
            }
        }
        catch (Exception e) {
            throw new TestException("Caught exception during query execution" + TestHelper.getStackTrace(e));
        }
    }

    public static void monitorServerRegion() {
        Region reg = RegionHelper.getRegion(bridgeRegionName);
        Log.getLogWriter().info("The size of the region on server is: " + reg.size());
    }

    public static void monitorServerRegions() {
        String[] regionNames = MapPrms.getRegionNames();
        Region reg = null;
        for (int i = 0; i < regionNames.length; ++i) {
            reg = RegionHelper.getRegion(regionNames[i]);
            if (reg == null) continue;
            Log.getLogWriter().info("The size of the region- " + regionNames[i] + " on server is: " + reg.size());
        }
    }

    public static void Close_Task() {
        if (!waitForEventsAlreadyPerformed) {
            ConcCQTest.waitForEvents();
        }
        Log.getLogWriter().info("Printing Counters");
        MapBB.getBB().print();
        Log.getLogWriter().info("closing cache");
        CacheHelper.closeCache();
    }

    public static synchronized void killServer() {
        PRObserver.initialize();
        try {
            long now = System.currentTimeMillis();
            Long lastKill = (Long)cqBB.getSharedMap().get("lastKillTime");
            long diff = now - lastKill;
            if (diff < killInterval) {
                Log.getLogWriter().info("No kill executed");
                return;
            }
            cqBB.getSharedMap().put("lastKillTime", new Long(now));
            BridgeHelper.Endpoint endpointToKill = ConcCQTest.getNextServerToKill();
            Log.getLogWriter().info("killing:- " + endpointToKill.toString());
            int index = rand.nextInt(0, 1);
            boolean killServer = index != 0;
            ClientVmInfo target = new ClientVmInfo(endpointToKill);
            if (killServer) {
                ClientVmMgr.stop("Killing cache server", -21, -31, target);
            } else {
                ClientVmMgr.stop("Stopping cache server", -22, -31, target);
            }
            ClientVmMgr.start("Test is restarting server", target);
            Object value = ConcCQBB.getBB().getSharedMap().get("expectRecovery");
            if (value instanceof Boolean && ((Boolean)value).booleanValue()) {
                int numPRs = (Integer)ConcCQBB.getBB().getSharedMap().get("numPRs");
                PRObserver.waitForRebalRecov(target, 1, numPRs, null, null, false);
            }
        }
        catch (ClientVmNotFoundException e) {
            Log.getLogWriter().info("Caught expected ClientVmNotFoundException, continuing test");
        }
    }

    public static boolean initBlackboard() {
        cqBB.getSharedMap().put("lastKillTime", new Long(0L));
        Long val = (Long)cqBB.getSharedMap().get("lastKillTime");
        return val != null && val == 0L;
    }

    private static BridgeHelper.Endpoint getNextServerToKill() {
        if (endpoints == null) {
            endpoints = BridgeHelper.getEndpoints();
        }
        BridgeHelper.Endpoint endpoint = (BridgeHelper.Endpoint)endpoints.get(endpoint_cntr);
        if (++endpoint_cntr >= endpoints.size()) {
            endpoint_cntr = 0;
        }
        Log.getLogWriter().info("returning endpoint " + endpoint.toString() + " to kill");
        return endpoint;
    }

    public static void doCQOperations() {
        testInstance.performCQOperations();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected CqQuery getCQForOp() {
        long startTime = System.currentTimeMillis();
        CqQuery cq = null;
        cq = this.getNextRandonCQ();
        while (cqList.contains(cq) && System.currentTimeMillis() - startTime < 10000L) {
            cq = this.getNextRandonCQ();
        }
        List list = cqList;
        synchronized (list) {
            if (!cqList.contains(cq)) {
                cqList.add(cq);
                return cq;
            }
            return null;
        }
    }

    protected CqQuery getNextRandonCQ() {
        CqQuery cq = null;
        CqQuery[] cqArray = cqService.getCqs();
        int randInt = TestConfig.tab().getRandGen().nextInt(0, cqArray.length - 1);
        cq = cqArray[randInt];
        return cq;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void performCQOperations() {
        CqQuery cq = null;
        int randInt = 0;
        int whichOp = 0;
        cq = this.getCQForOp();
        if (cq != null) {
            randInt = TestConfig.tab().getRandGen().nextInt(0, cqOperations.length - 1);
            whichOp = cqOperations[randInt];
        } else {
            whichOp = 99;
        }
        switch (whichOp) {
            case 1: {
                Log.getLogWriter().info("executing:- " + cq.getName());
                this.executeCQ(cq);
                break;
            }
            case 2: {
                if (MapPrms.getDoExecuteWithInitialResultsCQ()) {
                    Log.getLogWriter().info("executing with initial results:- " + cq.getName());
                    this.executeWithInitialResultsCQ(cq);
                    break;
                }
                Log.getLogWriter().info("NOT doing executeWithInitial results:- " + cq.getName());
                break;
            }
            case 3: {
                Log.getLogWriter().info("stoping cq:- " + cq.getName());
                this.stopCQ(cq);
                break;
            }
            case 4: {
                Log.getLogWriter().info("closing cq:- " + cq.getName());
                this.closeCQ(cq);
                break;
            }
            case 99: {
                Log.getLogWriter().info("The thread could not find any CQ for operation within 10 seconds. cq:-" + cq);
                break;
            }
            default: {
                throw new TestException("Unrecognized cqOperation (" + randInt + ")");
            }
        }
        List list = cqList;
        synchronized (list) {
            cqList.remove(cq);
        }
    }

    protected void closeCQ(CqQuery cq) {
        block5: {
            try {
                if (cq.isRunning() || cq.isStopped()) {
                    long startTime = System.currentTimeMillis();
                    cq.close();
                    long endTime = System.currentTimeMillis();
                    Log.getLogWriter().info("closed cq:- " + cq.getName());
                    ConcCQTest.checkTime(startTime, endTime, 20000L, "close");
                }
                if (!cq.isClosed()) break block5;
                try {
                    cq.close();
                    Log.getLogWriter().info("CQ:- " + cq.getName() + " is closed hence registering it before execution");
                    this.reRegisterCQ(cq);
                }
                catch (CqClosedException cle) {
                    throw new TestException("Should not have thrown CQClosedException. close() on CLOSED query is not successful");
                }
            }
            catch (Exception e) {
                throw new TestException(TestHelper.getStackTrace(e));
            }
        }
    }

    protected void executeCQ(CqQuery cq) {
        try {
            if (cq.isStopped()) {
                long startTime = System.currentTimeMillis();
                cq.execute();
                long endTime = System.currentTimeMillis();
                Log.getLogWriter().info("executed query:- " + cq.getName());
                ConcCQTest.checkTime(startTime, endTime, 40000L, "execute");
            } else if (cq.isRunning()) {
                try {
                    cq.execute();
                    throw new TestException("Should have thrown IllegalStateException. Execute on RUNNING query is successful");
                }
                catch (IllegalStateException startTime) {
                    // empty catch block
                }
            }
            if (cq.isClosed()) {
                try {
                    cq.execute();
                    throw new TestException("Should have thrown CQClosedException. execute() on CLOSED query is successful");
                }
                catch (CqClosedException cql) {
                    Log.getLogWriter().info("CQ:- " + cq.getName() + " is closed hence registering it before execution");
                    this.reRegisterCQ(cq);
                }
            }
        }
        catch (Exception e) {
            throw new TestException(TestHelper.getStackTrace(e));
        }
    }

    protected void executeWithInitialResultsCQ(CqQuery cq) {
        try {
            if (cq.isStopped()) {
                long startTime = System.currentTimeMillis();
                CqResults rs = cq.executeWithInitialResults();
                SelectResults results = CQUtil.getSelectResults(rs);
                long endTime = System.currentTimeMillis();
                Log.getLogWriter().info("executed query:- " + cq.getName() + " with initial results");
                ConcCQTest.checkTime(startTime, endTime, 60000L, "executeWithInitialResults");
            } else if (cq.isRunning()) {
                try {
                    cq.execute();
                    throw new TestException("Should have thrown IllegalStateException. executeWithInitialResults on RUNNING query is successful");
                }
                catch (IllegalStateException startTime) {
                    // empty catch block
                }
            }
            if (cq.isClosed()) {
                try {
                    cq.executeWithInitialResults();
                    throw new TestException("Should have thrown CQClosedException. executeWithInitialResults() on CLOSED query is succussful");
                }
                catch (CqClosedException cle) {
                    Log.getLogWriter().info("CQ:- " + cq.getName() + " is closed hence registering it before executeWithInitialResults");
                    this.reRegisterCQ(cq);
                }
            }
        }
        catch (Exception e) {
            throw new TestException(TestHelper.getStackTrace(e));
        }
    }

    protected void stopCQ(CqQuery cq) {
        try {
            if (cq.isRunning()) {
                long startTime = System.currentTimeMillis();
                cq.stop();
                long endTime = System.currentTimeMillis();
                Log.getLogWriter().info("stopped CQ:- " + cq.getName());
                ConcCQTest.checkTime(startTime, endTime, 20000L, "stop");
            } else if (cq.isStopped()) {
                try {
                    cq.stop();
                    throw new TestException("should have thrown IllegalStateException. executed stop() successfully on STOPPED CQ");
                }
                catch (IllegalStateException startTime) {
                    // empty catch block
                }
            }
            if (cq.isClosed()) {
                try {
                    cq.stop();
                    throw new TestException("should have thrown CQClosedException. executed stop() successfully on CLOSED CQ");
                }
                catch (CqClosedException cle) {
                    this.reRegisterCQ(cq);
                }
            }
        }
        catch (Exception e) {
            throw new TestException(TestHelper.getStackTrace(e));
        }
    }

    protected void registerCQs() {
        try {
            String[] queryArr = MapPrms.getQueryStrs();
            CqAttributesFactory factory = new CqAttributesFactory();
            factory.addCqListener(CQUtilPrms.getCQListener());
            CqAttributes cqAttrs = factory.create();
            CqQuery[] cqs = new CqQuery[queryArr.length];
            for (int i = 0; i < queryArr.length; ++i) {
                cqs[i] = cqService.newCq(queryArr[i], cqAttrs);
            }
            StringBuffer aStr = new StringBuffer("Registered CQs (size " + cqs.length + ") = \n");
            for (int i = 0; i < cqs.length; ++i) {
                CqQuery o = cqs[i];
                aStr.append(o.toString() + "\n");
            }
            Log.getLogWriter().info(aStr.toString());
            Log.getLogWriter().info("Done with registering CQs");
        }
        catch (Exception e) {
            throw new TestException(TestHelper.getStackTrace(e));
        }
    }

    protected void registerAndExecuteCQs() {
        try {
            String[] queryArr = MapPrms.getQueryStrs();
            CqAttributesFactory factory = new CqAttributesFactory();
            factory.addCqListener(CQUtilPrms.getCQListener());
            CqAttributes cqAttrs = factory.create();
            CqQuery[] cqs = new CqQuery[queryArr.length];
            for (int i = 0; i < queryArr.length; ++i) {
                cqs[i] = cqService.newCq(queryArr[i], cqAttrs);
                cqs[i].execute();
            }
            StringBuffer aStr = new StringBuffer("Registered and executed CQs (size " + cqs.length + ") = \n");
            for (int i = 0; i < cqs.length; ++i) {
                CqQuery o = cqs[i];
                aStr.append(o.toString() + "\n");
            }
            Log.getLogWriter().info(aStr.toString());
            Log.getLogWriter().info("Done with registering and started executions of CQs");
        }
        catch (Exception e) {
            throw new TestException(TestHelper.getStackTrace(e));
        }
    }

    protected void reRegisterCQ(CqQuery cq) {
        try {
            Log.getLogWriter().info("re-registering CQ:- " + cq.getName());
            String query = cq.getQueryString();
            CqAttributes attr = cq.getCqAttributes();
            cqService.newCq(query, attr);
            Log.getLogWriter().info("CQ re-registered:- " + cq.getName());
        }
        catch (Exception e) {
            throw new TestException(TestHelper.getStackTrace(e));
        }
    }

    public static void verifyCQListener() {
        cqsOn = TestConfig.tab().booleanAt(CQUtilPrms.CQsOn, true);
        if (cqsOn) {
            testInstance.verifyEvents();
        } else {
            testInstance.verifyBridgeCacheEvent();
        }
    }

    protected void verifyBridgeCacheEvent() {
        ConcCQTest.waitForEvents();
        MapBB mapBB = MapBB.getBB();
        EntryEventBB eeBB = EntryEventBB.getBB();
        int numPuts = (int)mapBB.getSharedCounters().read(MapBB.NUM_PUT);
        int numDestroys = (int)mapBB.getSharedCounters().read(MapBB.NUM_REMOVE);
        long tempInvalidates = mapBB.getSharedCounters().read(MapBB.NUM_INVALIDATE);
        int numInvalidates = (int)Math.abs(tempInvalidates);
        int numCloses = (int)mapBB.getSharedCounters().read(MapBB.NUM_CLOSE);
        int afterCreates = (int)eeBB.getSharedCounters().read(EntryEventBB.NUM_CREATE);
        int afterDestroys = (int)eeBB.getSharedCounters().read(EntryEventBB.NUM_DESTROY);
        int afterInvalidates = (int)eeBB.getSharedCounters().read(EntryEventBB.NUM_INVALIDATE);
        int afterUpdates = (int)eeBB.getSharedCounters().read(EntryEventBB.NUM_UPDATE);
        int afterCloses = (int)eeBB.getSharedCounters().read(EntryEventBB.NUM_CLOSE);
        int numListeners = TestConfig.tab().intAt(CQUtilPrms.numBridges);
        this.verifyFromBBs(numPuts, numDestroys, numInvalidates, numCloses, afterCreates, afterDestroys, afterInvalidates, afterUpdates, afterCloses, numListeners);
    }

    protected void verifyFromBBs(int numPuts, int numDestroys, int numInvalidates, int numCloses, int afterCreates, int afterDestroys, int afterInvalidates, int afterUpdates, int afterCloses, int numListeners) {
        Scope scope = (Scope)CQUtilBB.getBB().getSharedMap().get("Scope");
        boolean isDack = false;
        if (scope.isDistributedAck()) {
            isDack = true;
        }
        StringBuffer errStr = new StringBuffer();
        Log.getLogWriter().info("numPuts:- " + numPuts);
        Log.getLogWriter().info("numDestroys:- " + numDestroys);
        Log.getLogWriter().info("numInvalidates:- " + numInvalidates);
        Log.getLogWriter().info("numCloses: - " + numCloses);
        Log.getLogWriter().info("afterCreates:- " + afterCreates);
        Log.getLogWriter().info("afterUpdates:- " + afterUpdates);
        Log.getLogWriter().info("afterInvalidates:- " + afterInvalidates);
        Log.getLogWriter().info("afterDestroys:- " + afterDestroys);
        Log.getLogWriter().info("afterCloses:- " + afterCloses);
        Log.getLogWriter().info("numListeners:- " + numListeners);
        try {
            ConcCQTest.checkNumberOfEvents(numPuts, (afterCreates + afterUpdates) / numListeners, "numPuts", "put", "afterCreates");
        }
        catch (TestException ex) {
            errStr.append(ex.getMessage() + "\n");
        }
        try {
            if (!isDack) {
                ConcCQTest.checkNumberOfEvents(numDestroys, afterDestroys / numListeners, "numDestroys", "destroy", "afterDestroys");
            } else {
                ConcCQTest.checkDackEvents(numDestroys, afterDestroys / numListeners, "numDestroys", "destroy", "afterDestroys");
            }
        }
        catch (TestException ex) {
            errStr.append(ex.getMessage() + "\n");
        }
        try {
            ConcCQTest.checkNumberOfEvents(numCloses, afterCloses / numListeners, "numCloses", "close", "afterCloses");
        }
        catch (TestException ex) {
            errStr.append(ex.getMessage() + "\n");
        }
        try {
            if (!isDack) {
                ConcCQTest.checkNumberOfEvents(numInvalidates, afterInvalidates / numListeners, "numInvalidates", "invalidate", "afterInvalidates");
            } else {
                ConcCQTest.checkDackEvents(numInvalidates, afterInvalidates / numListeners, "numInvalidates", "invalidate", "afterInvalidates");
            }
        }
        catch (TestException ex) {
            errStr.append(ex.getMessage() + "\n");
        }
        if (errStr.length() > 0) {
            throw new TestException(errStr.toString());
        }
    }

    protected void verifyWithRegionOpsFromBB(int numPuts, int numDestroys, int numInvalidates, int numCloses, int numClearRegions, int numInvalidateRegions, int afterCreates, int afterDestroys, int afterInvalidates, int afterUpdates, int afterCloses, int afterClearRegions, int afterInvalidateRegions, int numListeners) {
        StringBuffer errStr = new StringBuffer();
        Log.getLogWriter().info("numClearRegions:- " + numClearRegions);
        Log.getLogWriter().info("numInvalidateRegions:- " + numInvalidateRegions);
        Log.getLogWriter().info("afterClearRegions:- " + afterClearRegions);
        Log.getLogWriter().info("afterInvalidateRegions:- " + afterInvalidateRegions);
        try {
            this.verifyFromBBs(numPuts, numDestroys, numInvalidates, numCloses, afterCreates, afterDestroys, afterInvalidates, afterUpdates, afterCloses, numListeners);
        }
        catch (TestException ex) {
            errStr.append(ex.getMessage() + "\n");
        }
        try {
            ConcCQTest.checkNumberOfEvents(numClearRegions, afterClearRegions / numListeners, "numClearRegions", "clearRegions", "afterClearRegions");
        }
        catch (TestException ex) {
            errStr.append(ex.getMessage() + "\n");
        }
        try {
            ConcCQTest.checkNumberOfEvents(numInvalidateRegions, afterInvalidateRegions / numListeners, "numInvalidateRegions", "InvalidateRegion", "afterInvalidateRegions");
        }
        catch (TestException ex) {
            errStr.append(ex.getMessage() + "\n");
        }
        if (errStr.length() > 0) {
            throw new TestException(errStr.toString());
        }
    }

    protected void verifyEvents() {
        ConcCQTest.waitForEvents();
        MapBB mapBB = MapBB.getBB();
        int numPuts = (int)mapBB.getSharedCounters().read(MapBB.NUM_PUT);
        int numDestroys = (int)mapBB.getSharedCounters().read(MapBB.NUM_REMOVE);
        long tempInvalidates = mapBB.getSharedCounters().read(MapBB.NUM_INVALIDATE);
        int numInvalidates = (int)Math.abs(tempInvalidates);
        int numCloses = (int)mapBB.getSharedCounters().read(MapBB.NUM_CLOSE);
        int numClearRegions = (int)mapBB.getSharedCounters().read(MapBB.NUM_REGION_DESTROY);
        int numInvalidateRegions = (int)mapBB.getSharedCounters().read(MapBB.NUM_REGION_INVALIDATE);
        int afterCreates = (int)cqBB.getSharedCounters().read(CQUtilBB.NUM_CREATE);
        int afterDestroys = (int)cqBB.getSharedCounters().read(CQUtilBB.NUM_DESTROY);
        int afterInvalidates = (int)cqBB.getSharedCounters().read(CQUtilBB.NUM_INVALIDATE);
        int afterUpdates = (int)cqBB.getSharedCounters().read(CQUtilBB.NUM_UPDATE);
        int afterCloses = (int)cqBB.getSharedCounters().read(CQUtilBB.NUM_CLOSE);
        int afterClearRegions = (int)cqBB.getSharedCounters().read(CQUtilBB.NUM_CLEARREGION);
        int afterInvalidateRegions = (int)cqBB.getSharedCounters().read(CQUtilBB.NUM_INVALIDATEREGION);
        int numListeners = MapPrms.getNumEdges();
        if (numClearRegions != 0 || numInvalidateRegions != 0) {
            this.verifyWithRegionOpsFromBB(numPuts, numDestroys, numInvalidates, numCloses, numClearRegions, numInvalidateRegions, afterCreates, afterDestroys, afterInvalidates, afterUpdates, afterCloses, afterClearRegions, afterInvalidateRegions, numListeners);
        } else {
            this.verifyFromBBs(numPuts, numDestroys, numInvalidates, numCloses, afterCreates, afterDestroys, afterInvalidates, afterUpdates, afterCloses, numListeners);
        }
    }

    public static void checkNumberOfEvents(int expected, int actual, String cntrName, String op, String listenerEvent) {
        if (expected != actual) {
            throw new TestException(cntrName + "(incremented after every " + op + ") (= " + expected + " ) are not equal " + listenerEvent + " (CQListener) ( = " + actual + " )");
        }
    }

    public static void checkDackEvents(int expected, int actual, String cntrName, String op, String listenerEvent) {
        int allowedMisses = 10;
        if (expected - actual > allowedMisses) {
            throw new TestException("For replicated distributed ack, misses for  " + op + " events is " + (expected - actual) + " and greater than expected misses of " + allowedMisses + ". Need to check logs to see if there are other causes. \n" + cntrName + "(incremented after every " + op + ") (= " + expected + " ) and " + listenerEvent + " (CQListener) ( = " + actual + " )");
        }
        if (expected < actual) {
            throw new TestException(cntrName + "(incremented after every " + op + ") (= " + expected + " ) are not equal " + listenerEvent + " (CQListener) ( = " + actual + " )");
        }
    }

    public static void checkTime(long startTime, long endTime, long limit, String cqOperationName) {
        if (endTime - startTime > limit) {
            Log.getLogWriter().warning("WARNING:- CQ operation:- " + cqOperationName + " took more than " + limit / 1000L + " seconds. Actual time taken is:- " + (endTime - startTime) / 1000L + " seconds.");
        }
    }

    public static void waitForEvents() {
        long limit = MapPrms.getTimeToWaitForEvents();
        Long lastEventReceived = (Long)cqBB.getSharedMap().get("lastEventReceived");
        if (lastEventReceived == null) {
            cqBB.getSharedMap().put("lastEventReceived", new Long(0L));
            Log.getLogWriter().info("Initialized lastEventReceived in waitForEvents");
        }
        while (System.currentTimeMillis() - (Long)cqBB.getSharedMap().get("lastEventReceived") < limit) {
            try {
                Thread.sleep(5000L);
            }
            catch (InterruptedException interruptedException) {}
        }
        waitForEventsAlreadyPerformed = true;
        Log.getLogWriter().info("Waited for " + limit / 1000L + " seconds for events to arrive. Since no events seen during that time test is proceeding for validations");
    }

    static {
        killInterval = TestConfig.tab().longAt(HctPrms.killInterval);
        cqBB = CQUtilBB.getBB();
        rand = new GsRandom();
        waitForEventsAlreadyPerformed = false;
        cqsOn = true;
        valueConstraint = null;
        endpoint_cntr = 0;
        cqOperations = new int[]{1, 2, 3, 4};
        cqList = Collections.synchronizedList(new ArrayList());
    }
}

