/*
 * Decompiled with CFR 0.152.
 */
package com.gemstone.gemfire.internal.cache;

import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.control.RebalanceOperation;
import com.gemstone.gemfire.cache.control.RebalanceResults;
import com.gemstone.gemfire.cache.query.Query;
import com.gemstone.gemfire.cache.query.SelectResults;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.DistributionMessageObserver;
import com.gemstone.gemfire.internal.cache.partitioned.QueryMessage;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.SerializableRunnable;
import dunit.VM;
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;

public class PartitionedRegionQueryDUnitTest
extends CacheTestCase {
    private static final AtomicReference<RebalanceResults> rebalanceResults = new AtomicReference();

    public PartitionedRegionQueryDUnitTest(String name) {
        super(name);
    }

    public void testRebalanceDuringQueryEvaluation() {
        Host host = Host.getHost(0);
        VM vm0 = host.getVM(0);
        VM vm1 = host.getVM(1);
        VM vm2 = host.getVM(2);
        this.createAccessor(vm0);
        this.createPR(vm1);
        this.createBuckets(vm1);
        this.createPR(vm2);
        vm1.invoke(new SerializableRunnable("add listener"){

            @Override
            public void run() {
                DistributionMessageObserver.setInstance((DistributionMessageObserver)new DistributionMessageObserver(){

                    public void beforeProcessMessage(DistributionManager dm, DistributionMessage message) {
                        if (message instanceof QueryMessage) {
                            RebalanceOperation rebalance = PartitionedRegionQueryDUnitTest.this.getCache().getResourceManager().createRebalanceFactory().start();
                            try {
                                rebalanceResults.compareAndSet(null, rebalance.getResults());
                            }
                            catch (CancellationException cancellationException) {
                            }
                            catch (InterruptedException interruptedException) {
                                // empty catch block
                            }
                        }
                    }
                });
            }
        });
        this.executeQuery(vm0);
        vm1.invoke(new SerializableRunnable("check rebalance happened"){

            @Override
            public void run() {
                TestCase.assertNotNull(rebalanceResults.get());
                TestCase.assertEquals((int)5, (int)((RebalanceResults)rebalanceResults.get()).getTotalBucketTransfersCompleted());
            }
        });
    }

    private void executeQuery(VM vm0) {
        vm0.invoke(new SerializableRunnable(){

            @Override
            public void run() {
                Cache cache = PartitionedRegionQueryDUnitTest.this.getCache();
                Region region = cache.getRegion("region");
                Query query = cache.getQueryService().newQuery("select * from /region r where r > 0");
                try {
                    SelectResults results = (SelectResults)query.execute();
                    TestCase.assertEquals(new HashSet<Integer>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9)), (Object)results.asSet());
                }
                catch (Exception e) {
                    DistributedTestCase.fail("Bad query", e);
                }
            }
        });
    }

    private void createBuckets(VM vm) {
        vm.invoke(new SerializableRunnable("create accessor"){

            @Override
            public void run() {
                Cache cache = PartitionedRegionQueryDUnitTest.this.getCache();
                Region region = cache.getRegion("region");
                for (int i = 0; i < 10; ++i) {
                    region.put((Object)i, (Object)i);
                }
            }
        });
    }

    private void createPR(VM vm) {
        vm.invoke(new SerializableRunnable("create accessor"){

            @Override
            public void run() {
                Cache cache = PartitionedRegionQueryDUnitTest.this.getCache();
                PartitionAttributesFactory paf = new PartitionAttributesFactory();
                paf.setTotalNumBuckets(10);
                cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(paf.create()).create("region");
            }
        });
    }

    private void createAccessor(VM vm) {
        vm.invoke(new SerializableRunnable("create accessor"){

            @Override
            public void run() {
                Cache cache = PartitionedRegionQueryDUnitTest.this.getCache();
                PartitionAttributesFactory paf = new PartitionAttributesFactory();
                paf.setTotalNumBuckets(10);
                paf.setLocalMaxMemory(0);
                cache.createRegionFactory(RegionShortcut.PARTITION_PROXY).setPartitionAttributes(paf.create()).create("region");
            }
        });
    }
}

