/*
 * Decompiled with CFR 0.152.
 */
package com.gemstone.gemfire.internal.cache.wan.misc;

import com.gemstone.gemfire.cache.wan.GatewaySender;
import com.gemstone.gemfire.internal.cache.BucketRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.RegionQueue;
import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderImpl;
import dunit.AsyncInvocation;
import dunit.DistributedTestCase;
import java.util.Set;

public class CommonParallelGatewaySenderDUnitTest
extends WANTestBase {
    public CommonParallelGatewaySenderDUnitTest(String name) {
        super(name);
    }

    public void testParallelPropagation() throws Exception {
        Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, "createFirstLocatorWithDSId", new Object[]{1});
        Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, "createFirstRemoteLocator", new Object[]{2, lnPort});
        vm2.invoke(WANTestBase.class, "createReceiver", new Object[]{nyPort});
        vm3.invoke(WANTestBase.class, "createReceiver", new Object[]{nyPort});
        vm4.invoke(WANTestBase.class, "createCache", new Object[]{lnPort});
        vm5.invoke(WANTestBase.class, "createCache", new Object[]{lnPort});
        vm6.invoke(WANTestBase.class, "createCache", new Object[]{lnPort});
        vm7.invoke(WANTestBase.class, "createCache", new Object[]{lnPort});
        vm4.invoke(WANTestBase.class, "createSender", new Object[]{"ln", 2, true, 100, 10, false, false, null, true});
        vm5.invoke(WANTestBase.class, "createSender", new Object[]{"ln", 2, true, 100, 10, false, false, null, true});
        vm6.invoke(WANTestBase.class, "createSender", new Object[]{"ln", 2, true, 100, 10, false, false, null, true});
        vm7.invoke(WANTestBase.class, "createSender", new Object[]{"ln", 2, true, 100, 10, false, false, null, true});
        vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR1", "ln", 1, 100, this.isOffHeap()});
        vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR1", "ln", 1, 100, this.isOffHeap()});
        vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR1", "ln", 1, 100, this.isOffHeap()});
        vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR1", "ln", 1, 100, this.isOffHeap()});
        vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR2", "ln", 1, 100, this.isOffHeap()});
        vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR2", "ln", 1, 100, this.isOffHeap()});
        vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR2", "ln", 1, 100, this.isOffHeap()});
        vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR2", "ln", 1, 100, this.isOffHeap()});
        vm4.invoke(WANTestBase.class, "startSender", new Object[]{"ln"});
        vm5.invoke(WANTestBase.class, "startSender", new Object[]{"ln"});
        vm6.invoke(WANTestBase.class, "startSender", new Object[]{"ln"});
        vm7.invoke(WANTestBase.class, "startSender", new Object[]{"ln"});
        vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR1", null, 1, 100, this.isOffHeap()});
        vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR1", null, 1, 100, this.isOffHeap()});
        vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR2", null, 1, 100, this.isOffHeap()});
        vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR2", null, 1, 100, this.isOffHeap()});
        vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[]{"ln"});
        vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[]{"ln"});
        vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[]{"ln"});
        vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[]{"ln"});
        vm4.invoke(WANTestBase.class, "doPuts", new Object[]{testName + "_PR1", 1000});
        vm4.invoke(WANTestBase.class, "doPuts", new Object[]{testName + "_PR2", 1000});
        vm4.invoke(CommonParallelGatewaySenderDUnitTest.class, "validateParallelSenderQueueAllBucketsDrained", new Object[]{"ln"});
        vm5.invoke(CommonParallelGatewaySenderDUnitTest.class, "validateParallelSenderQueueAllBucketsDrained", new Object[]{"ln"});
        vm6.invoke(CommonParallelGatewaySenderDUnitTest.class, "validateParallelSenderQueueAllBucketsDrained", new Object[]{"ln"});
        vm7.invoke(CommonParallelGatewaySenderDUnitTest.class, "validateParallelSenderQueueAllBucketsDrained", new Object[]{"ln"});
        vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[]{testName + "_PR1", 1000});
        vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[]{testName + "_PR2", 1000});
    }

    public void testParallelPropagationPersistenceEnabled() throws Exception {
        Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, "createFirstLocatorWithDSId", new Object[]{1});
        Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, "createFirstRemoteLocator", new Object[]{2, lnPort});
        vm2.invoke(WANTestBase.class, "createReceiver", new Object[]{nyPort});
        vm3.invoke(WANTestBase.class, "createReceiver", new Object[]{nyPort});
        vm4.invoke(WANTestBase.class, "createCache", new Object[]{lnPort});
        vm5.invoke(WANTestBase.class, "createCache", new Object[]{lnPort});
        vm6.invoke(WANTestBase.class, "createCache", new Object[]{lnPort});
        vm7.invoke(WANTestBase.class, "createCache", new Object[]{lnPort});
        vm4.invoke(WANTestBase.class, "createSender", new Object[]{"ln", 2, true, 100, 10, false, true, null, true});
        vm5.invoke(WANTestBase.class, "createSender", new Object[]{"ln", 2, true, 100, 10, false, true, null, true});
        vm6.invoke(WANTestBase.class, "createSender", new Object[]{"ln", 2, true, 100, 10, false, true, null, true});
        vm7.invoke(WANTestBase.class, "createSender", new Object[]{"ln", 2, true, 100, 10, false, true, null, true});
        vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR1", "ln", 1, 100, this.isOffHeap()});
        vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR1", "ln", 1, 100, this.isOffHeap()});
        vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR1", "ln", 1, 100, this.isOffHeap()});
        vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR1", "ln", 1, 100, this.isOffHeap()});
        vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR2", "ln", 1, 100, this.isOffHeap()});
        vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR2", "ln", 1, 100, this.isOffHeap()});
        vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR2", "ln", 1, 100, this.isOffHeap()});
        vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR2", "ln", 1, 100, this.isOffHeap()});
        vm4.invoke(WANTestBase.class, "startSender", new Object[]{"ln"});
        vm5.invoke(WANTestBase.class, "startSender", new Object[]{"ln"});
        vm6.invoke(WANTestBase.class, "startSender", new Object[]{"ln"});
        vm7.invoke(WANTestBase.class, "startSender", new Object[]{"ln"});
        vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR1", null, 1, 100, this.isOffHeap()});
        vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR1", null, 1, 100, this.isOffHeap()});
        vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR2", null, 1, 100, this.isOffHeap()});
        vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "_PR2", null, 1, 100, this.isOffHeap()});
        vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[]{"ln"});
        vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[]{"ln"});
        vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[]{"ln"});
        vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[]{"ln"});
        vm4.invoke(WANTestBase.class, "doPuts", new Object[]{testName + "_PR1", 1000});
        vm4.invoke(WANTestBase.class, "doPuts", new Object[]{testName + "_PR2", 1000});
        vm4.invoke(CommonParallelGatewaySenderDUnitTest.class, "validateParallelSenderQueueAllBucketsDrained", new Object[]{"ln"});
        vm5.invoke(CommonParallelGatewaySenderDUnitTest.class, "validateParallelSenderQueueAllBucketsDrained", new Object[]{"ln"});
        vm6.invoke(CommonParallelGatewaySenderDUnitTest.class, "validateParallelSenderQueueAllBucketsDrained", new Object[]{"ln"});
        vm7.invoke(CommonParallelGatewaySenderDUnitTest.class, "validateParallelSenderQueueAllBucketsDrained", new Object[]{"ln"});
        vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[]{testName + "_PR1", 1000});
        vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[]{testName + "_PR2", 1000});
    }

    public void testPRWithGatewaySenderPersistenceEnabled_Restart() {
        Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, "createFirstLocatorWithDSId", new Object[]{1});
        Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, "createFirstRemoteLocator", new Object[]{2, lnPort});
        vm2.invoke(WANTestBase.class, "createReceiver", new Object[]{nyPort});
        vm3.invoke(WANTestBase.class, "createReceiver", new Object[]{nyPort});
        vm4.invoke(WANTestBase.class, "createCache", new Object[]{lnPort});
        vm5.invoke(WANTestBase.class, "createCache", new Object[]{lnPort});
        vm6.invoke(WANTestBase.class, "createCache", new Object[]{lnPort});
        vm7.invoke(WANTestBase.class, "createCache", new Object[]{lnPort});
        String diskStore1 = (String)vm4.invoke(WANTestBase.class, "createSenderWithDiskStore", new Object[]{"ln", 2, true, 100, 10, false, true, null, null, true});
        String diskStore2 = (String)vm5.invoke(WANTestBase.class, "createSenderWithDiskStore", new Object[]{"ln", 2, true, 100, 10, false, true, null, null, true});
        String diskStore3 = (String)vm6.invoke(WANTestBase.class, "createSenderWithDiskStore", new Object[]{"ln", 2, true, 100, 10, false, true, null, null, true});
        String diskStore4 = (String)vm7.invoke(WANTestBase.class, "createSenderWithDiskStore", new Object[]{"ln", 2, true, 100, 10, false, true, null, null, true});
        CommonParallelGatewaySenderDUnitTest.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
        vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR1", null, 1, 100, this.isOffHeap()});
        vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR1", null, 1, 100, this.isOffHeap()});
        vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR2", null, 1, 100, this.isOffHeap()});
        vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR2", null, 1, 100, this.isOffHeap()});
        vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR1", "ln", 1, 100, this.isOffHeap()});
        vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR1", "ln", 1, 100, this.isOffHeap()});
        vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR1", "ln", 1, 100, this.isOffHeap()});
        vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR1", "ln", 1, 100, this.isOffHeap()});
        vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR2", "ln", 1, 100, this.isOffHeap()});
        vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR2", "ln", 1, 100, this.isOffHeap()});
        vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR2", "ln", 1, 100, this.isOffHeap()});
        vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR2", "ln", 1, 100, this.isOffHeap()});
        vm4.invoke(WANTestBase.class, "startSender", new Object[]{"ln"});
        vm5.invoke(WANTestBase.class, "startSender", new Object[]{"ln"});
        vm6.invoke(WANTestBase.class, "startSender", new Object[]{"ln"});
        vm7.invoke(WANTestBase.class, "startSender", new Object[]{"ln"});
        vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[]{"ln"});
        vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[]{"ln"});
        vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[]{"ln"});
        vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[]{"ln"});
        vm4.invoke(WANTestBase.class, "pauseSender", new Object[]{"ln"});
        vm5.invoke(WANTestBase.class, "pauseSender", new Object[]{"ln"});
        vm6.invoke(WANTestBase.class, "pauseSender", new Object[]{"ln"});
        vm7.invoke(WANTestBase.class, "pauseSender", new Object[]{"ln"});
        vm4.invoke(WANTestBase.class, "doPuts", new Object[]{testName + "PR1", 3000});
        vm4.invoke(WANTestBase.class, "doPuts", new Object[]{testName + "PR2", 5000});
        CommonParallelGatewaySenderDUnitTest.getLogWriter().info("Completed puts in the region");
        vm4.invoke(WANTestBase.class, "killSender", new Object[0]);
        vm5.invoke(WANTestBase.class, "killSender", new Object[0]);
        vm6.invoke(WANTestBase.class, "killSender", new Object[0]);
        vm7.invoke(WANTestBase.class, "killSender", new Object[0]);
        CommonParallelGatewaySenderDUnitTest.getLogWriter().info("Killed all the senders.");
        vm4.invoke(WANTestBase.class, "createCache", new Object[]{lnPort});
        vm5.invoke(WANTestBase.class, "createCache", new Object[]{lnPort});
        vm6.invoke(WANTestBase.class, "createCache", new Object[]{lnPort});
        vm7.invoke(WANTestBase.class, "createCache", new Object[]{lnPort});
        CommonParallelGatewaySenderDUnitTest.getLogWriter().info("Created back the cache");
        vm4.invoke(WANTestBase.class, "createSenderWithDiskStore", new Object[]{"ln", 2, true, 100, 10, false, true, null, diskStore1, true});
        vm5.invoke(WANTestBase.class, "createSenderWithDiskStore", new Object[]{"ln", 2, true, 100, 10, false, true, null, diskStore2, true});
        vm6.invoke(WANTestBase.class, "createSenderWithDiskStore", new Object[]{"ln", 2, true, 100, 10, false, true, null, diskStore3, true});
        vm7.invoke(WANTestBase.class, "createSenderWithDiskStore", new Object[]{"ln", 2, true, 100, 10, false, true, null, diskStore4, true});
        CommonParallelGatewaySenderDUnitTest.getLogWriter().info("Created the senders back from the disk store.");
        AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR1", "ln", 1, 100, this.isOffHeap()});
        AsyncInvocation inv2 = vm5.invokeAsync(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR1", "ln", 1, 100, this.isOffHeap()});
        AsyncInvocation inv3 = vm6.invokeAsync(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR1", "ln", 1, 100, this.isOffHeap()});
        AsyncInvocation inv4 = vm7.invokeAsync(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR1", "ln", 1, 100, this.isOffHeap()});
        try {
            inv1.join();
            inv2.join();
            inv3.join();
            inv4.join();
        }
        catch (InterruptedException e) {
            e.printStackTrace();
            CommonParallelGatewaySenderDUnitTest.fail();
        }
        inv1 = vm4.invokeAsync(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR2", "ln", 1, 100, this.isOffHeap()});
        inv2 = vm5.invokeAsync(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR2", "ln", 1, 100, this.isOffHeap()});
        inv3 = vm6.invokeAsync(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR2", "ln", 1, 100, this.isOffHeap()});
        inv4 = vm7.invokeAsync(WANTestBase.class, "createPartitionedRegion", new Object[]{testName + "PR2", "ln", 1, 100, this.isOffHeap()});
        try {
            inv1.join();
            inv2.join();
            inv3.join();
            inv4.join();
        }
        catch (InterruptedException e) {
            e.printStackTrace();
            CommonParallelGatewaySenderDUnitTest.fail();
        }
        CommonParallelGatewaySenderDUnitTest.getLogWriter().info("Created back the partitioned regions");
        vm4.invokeAsync(WANTestBase.class, "startSender", new Object[]{"ln"});
        vm5.invokeAsync(WANTestBase.class, "startSender", new Object[]{"ln"});
        vm6.invokeAsync(WANTestBase.class, "startSender", new Object[]{"ln"});
        vm7.invokeAsync(WANTestBase.class, "startSender", new Object[]{"ln"});
        CommonParallelGatewaySenderDUnitTest.getLogWriter().info("Waiting for senders running.");
        vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[]{"ln"});
        vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[]{"ln"});
        vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[]{"ln"});
        vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[]{"ln"});
        CommonParallelGatewaySenderDUnitTest.getLogWriter().info("All the senders are now running...");
        vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[]{testName + "PR1", 3000});
        vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[]{testName + "PR1", 3000});
        vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[]{testName + "PR2", 5000});
        vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[]{testName + "PR2", 5000});
    }

    public static void validateParallelSenderQueueAllBucketsDrained(String senderId) {
        Set senders = cache.getGatewaySenders();
        GatewaySender sender = null;
        for (GatewaySender s : senders) {
            if (!s.getId().equals(senderId)) continue;
            sender = s;
            break;
        }
        ConcurrentParallelGatewaySenderQueue regionQueue = (ConcurrentParallelGatewaySenderQueue)((ParallelGatewaySenderImpl)sender).getQueues().toArray(new RegionQueue[1])[0];
        Set shadowPRs = regionQueue.getRegions();
        for (PartitionedRegion shadowPR : shadowPRs) {
            Set buckets = shadowPR.getDataStore().getAllLocalBucketRegions();
            for (final BucketRegion bucket : buckets) {
                DistributedTestCase.WaitCriterion wc = new DistributedTestCase.WaitCriterion(){

                    @Override
                    public boolean done() {
                        if (bucket.keySet().size() == 0) {
                            DistributedTestCase.getLogWriter().info("Bucket " + bucket.getId() + " is empty");
                            return true;
                        }
                        return false;
                    }

                    @Override
                    public String description() {
                        return "Expected bucket entries for bucket: " + bucket.getId() + " is: 0 but actual entries: " + bucket.keySet().size() + " This bucket isPrimary: " + bucket.getBucketAdvisor().isPrimary() + " KEYSET: " + bucket.keySet();
                    }
                };
                DistributedTestCase.waitForCriterion(wc, 180000L, 50L, true);
            }
        }
    }
}

