/*
 * Decompiled with CFR 0.152.
 */
package com.gemstone.gemfire.internal.cache;

import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.distributed.internal.DMStats;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.DistributionMessageObserver;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.internal.cache.InitialImageOperation;
import dunit.AsyncInvocation;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.SerializableRunnable;
import dunit.VM;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.TestCase;

public class GIIFlowControlDUnitTest
extends CacheTestCase {
    protected static final String REGION_NAME = "region";
    private static final long MAX_WAIT = 10000L;
    private static int origChunkSize = InitialImageOperation.CHUNK_SIZE_IN_BYTES;
    private static int origNumChunks = InitialImageOperation.CHUNK_PERMITS;
    protected static FlowControlObserver observer;

    public GIIFlowControlDUnitTest(String name) {
        super(name);
    }

    @Override
    public void tearDown2() throws Exception {
        GIIFlowControlDUnitTest.invokeInEveryVM(new SerializableRunnable("reset chunk size"){

            @Override
            public void run() {
                InitialImageOperation.CHUNK_SIZE_IN_BYTES = origChunkSize;
                InitialImageOperation.CHUNK_PERMITS = origNumChunks;
            }
        });
        super.tearDown2();
    }

    public void testLotsOfChunks() throws Throwable {
        Host host = Host.getHost(0);
        VM vm0 = host.getVM(0);
        VM vm1 = host.getVM(1);
        GIIFlowControlDUnitTest.invokeInEveryVM(new SerializableRunnable("reset chunk size"){

            @Override
            public void run() {
                InitialImageOperation.CHUNK_SIZE_IN_BYTES = 10;
                InitialImageOperation.CHUNK_PERMITS = 2;
            }
        });
        this.createRegion(vm0);
        this.createData(vm0, 0, 50, "1234567890");
        this.createRegion(vm1);
        this.closeCache(vm0);
    }

    public void testFlowControlHappening() throws Throwable {
        Host host = Host.getHost(0);
        VM vm0 = host.getVM(0);
        VM vm1 = host.getVM(1);
        GIIFlowControlDUnitTest.invokeInEveryVM(new SerializableRunnable("set chunk size"){

            @Override
            public void run() {
                InitialImageOperation.CHUNK_SIZE_IN_BYTES = 10;
                InitialImageOperation.CHUNK_PERMITS = 2;
            }
        });
        vm1.invoke(new SerializableRunnable("Add flow control observer"){

            @Override
            public void run() {
                observer = new FlowControlObserver();
                DistributionMessageObserver.setInstance((DistributionMessageObserver)observer);
                GIIFlowControlDUnitTest.this.getCache();
                observer.start();
            }
        });
        this.createRegion(vm0);
        this.createData(vm0, 0, 50, "1234567890");
        AsyncInvocation async1 = this.createRegionAsync(vm1);
        async1.join(100L);
        GIIFlowControlDUnitTest.assertTrue((boolean)async1.isAlive());
        vm1.invoke(new SerializableRunnable("Wait for chunks"){

            @Override
            public void run() {
                DistributedTestCase.waitForCriterion(new DistributedTestCase.WaitCriterion(){

                    @Override
                    public String description() {
                        return "Waiting for messages to be at least 2: " + GIIFlowControlDUnitTest.observer.messageCount.get();
                    }

                    @Override
                    public boolean done() {
                        return GIIFlowControlDUnitTest.observer.messageCount.get() >= 2;
                    }
                }, 10000L, 100L, true);
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException e) {
                    DistributedTestCase.fail("interrupted", e);
                }
                TestCase.assertEquals((int)2, (int)GIIFlowControlDUnitTest.observer.messageCount.get());
                GIIFlowControlDUnitTest.observer.allowMessages.countDown();
            }
        });
        async1.getResult(10000L);
        vm1.invoke(new SerializableRunnable("Add flow control observer"){

            @Override
            public void run() {
                TestCase.assertTrue((String)"Message count should be greater than 2 now", (GIIFlowControlDUnitTest.observer.messageCount.get() > 2 ? 1 : 0) != 0);
            }
        });
        this.closeCache(vm0);
    }

    public void testKillSenderNoHang() throws Throwable {
        Host host = Host.getHost(0);
        VM vm0 = host.getVM(0);
        VM vm1 = host.getVM(1);
        GIIFlowControlDUnitTest.invokeInEveryVM(new SerializableRunnable("set chunk size"){

            @Override
            public void run() {
                InitialImageOperation.CHUNK_SIZE_IN_BYTES = 10;
                InitialImageOperation.CHUNK_PERMITS = 2;
            }
        });
        vm1.invoke(new SerializableRunnable("Add flow control observer"){

            @Override
            public void run() {
                observer = new FlowControlObserver();
                DistributionMessageObserver.setInstance((DistributionMessageObserver)observer);
                GIIFlowControlDUnitTest.this.getCache();
                observer.start();
            }
        });
        this.createRegion(vm0);
        this.createData(vm0, 0, 50, "1234567890");
        AsyncInvocation async1 = this.createRegionAsync(vm1);
        vm1.invoke(new SerializableRunnable("Wait to flow control messages"){

            @Override
            public void run() {
                DistributedTestCase.waitForCriterion(new DistributedTestCase.WaitCriterion(){

                    @Override
                    public String description() {
                        return "Waiting for messages to be at least 2: " + GIIFlowControlDUnitTest.observer.messageCount.get();
                    }

                    @Override
                    public boolean done() {
                        return GIIFlowControlDUnitTest.observer.messageCount.get() >= 2;
                    }
                }, 10000L, 100L, true);
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException e) {
                    DistributedTestCase.fail("interrupted", e);
                }
                TestCase.assertEquals((int)2, (int)GIIFlowControlDUnitTest.observer.messageCount.get());
            }
        });
        this.closeCache(vm0);
        vm1.invoke(new SerializableRunnable("release flow control"){

            @Override
            public void run() {
                GIIFlowControlDUnitTest.observer.allowMessages.countDown();
            }
        });
        async1.getResult(10000L);
    }

    public void testCloseReceiverCacheNoHang() throws Throwable {
        this.doCloseTest(false);
    }

    public void testDisconnectReceiverNoHang() throws Throwable {
        this.doCloseTest(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void doCloseTest(boolean disconnect) throws Throwable {
        Host host = Host.getHost(0);
        VM vm0 = host.getVM(0);
        VM vm1 = host.getVM(1);
        GIIFlowControlDUnitTest.invokeInEveryVM(new SerializableRunnable("set chunk size"){

            @Override
            public void run() {
                InitialImageOperation.CHUNK_SIZE_IN_BYTES = 10;
                InitialImageOperation.CHUNK_PERMITS = 2;
            }
        });
        InitialImageOperation.CHUNK_SIZE_IN_BYTES = 10;
        InitialImageOperation.CHUNK_PERMITS = 2;
        vm1.invoke(new SerializableRunnable("Add flow control observer"){

            @Override
            public void run() {
                observer = new FlowControlObserver();
                DistributionMessageObserver.setInstance((DistributionMessageObserver)observer);
                GIIFlowControlDUnitTest.this.getCache();
                observer.start();
            }
        });
        DistributedTestCase.ExpectedException expectedEx = null;
        try {
            this.createRegion(vm0);
            this.createData(vm0, 0, 50, "1234567890");
            this.createRegionAsync(vm1);
            vm1.invoke(new SerializableRunnable("Wait to flow control messages"){

                @Override
                public void run() {
                    DistributedTestCase.waitForCriterion(new DistributedTestCase.WaitCriterion(){

                        @Override
                        public String description() {
                            return "Waiting for messages to be at least 2: " + GIIFlowControlDUnitTest.observer.messageCount.get();
                        }

                        @Override
                        public boolean done() {
                            return GIIFlowControlDUnitTest.observer.messageCount.get() >= 2;
                        }
                    }, 10000L, 100L, true);
                    try {
                        Thread.sleep(500L);
                    }
                    catch (InterruptedException e) {
                        DistributedTestCase.fail("interrupted", e);
                    }
                    TestCase.assertEquals((int)2, (int)GIIFlowControlDUnitTest.observer.messageCount.get());
                }
            });
            vm0.invoke(new SerializableRunnable("check for in progress messages"){

                @Override
                public void run() {
                    GIIFlowControlDUnitTest.this.getSystem();
                    DMStats stats = InternalDistributedSystem.getDMStats();
                    TestCase.assertEquals((int)2, (int)stats.getInitialImageMessagesInFlight());
                }
            });
            expectedEx = GIIFlowControlDUnitTest.addExpectedException(InterruptedException.class.getName(), vm1);
            if (disconnect) {
                this.disconnect(vm1);
            } else {
                this.closeCache(vm1);
            }
            vm1.invoke(new SerializableRunnable("release flow control"){

                @Override
                public void run() {
                    GIIFlowControlDUnitTest.observer.allowMessages.countDown();
                }
            });
            vm0.invoke(new SerializableRunnable("check for in progress messages"){

                @Override
                public void run() {
                    GIIFlowControlDUnitTest.this.getSystem();
                    final DMStats stats = InternalDistributedSystem.getDMStats();
                    DistributedTestCase.waitForCriterion(new DistributedTestCase.WaitCriterion(){

                        @Override
                        public boolean done() {
                            return stats.getInitialImageMessagesInFlight() == 0;
                        }

                        @Override
                        public String description() {
                            return "Timeout waiting for all initial image messages to be processed: " + stats.getInitialImageMessagesInFlight();
                        }
                    }, 10000L, 100L, true);
                }
            });
        }
        finally {
            if (expectedEx != null) {
                expectedEx.remove();
            }
        }
    }

    protected void closeCache(VM vm) {
        SerializableRunnable closeCache = new SerializableRunnable("close cache"){

            @Override
            public void run() {
                Cache cache = GIIFlowControlDUnitTest.this.getCache();
                cache.close();
            }
        };
        vm.invoke(closeCache);
    }

    protected void disconnect(VM vm) {
        SerializableRunnable closeCache = new SerializableRunnable("close cache"){

            @Override
            public void run() {
                CacheTestCase.disconnectFromDS();
            }
        };
        vm.invoke(closeCache);
    }

    private void createRegion(VM vm) throws Throwable {
        SerializableRunnable createRegion = this.getCreateRegionRunnable();
        vm.invoke(createRegion);
    }

    private SerializableRunnable getCreateRegionRunnable() {
        SerializableRunnable createRegion = new SerializableRunnable("Create non persistent region"){

            @Override
            public void run() {
                GIIFlowControlDUnitTest.this.getCache();
                RegionFactory rf = new RegionFactory();
                rf.setDataPolicy(DataPolicy.REPLICATE);
                rf.setScope(Scope.DISTRIBUTED_ACK);
                rf.create(GIIFlowControlDUnitTest.REGION_NAME);
            }
        };
        return createRegion;
    }

    private AsyncInvocation createRegionAsync(VM vm) throws Throwable {
        SerializableRunnable createRegion = this.getCreateRegionRunnable();
        return vm.invokeAsync(createRegion);
    }

    protected void createData(VM vm, final int startKey, final int endKey, final Object value) {
        SerializableRunnable createData = new SerializableRunnable(){

            @Override
            public void run() {
                Cache cache = GIIFlowControlDUnitTest.this.getCache();
                Region region = cache.getRegion(GIIFlowControlDUnitTest.REGION_NAME);
                for (int i = startKey; i < endKey; ++i) {
                    region.put((Object)i, value);
                }
            }
        };
        vm.invoke(createData);
    }

    protected void checkData(VM vm0, final int startKey, final int endKey, final String value) {
        SerializableRunnable checkData = new SerializableRunnable(){

            @Override
            public void run() {
                Cache cache = GIIFlowControlDUnitTest.this.getCache();
                Region region = cache.getRegion(GIIFlowControlDUnitTest.REGION_NAME);
                for (int i = startKey; i < endKey; ++i) {
                    TestCase.assertEquals((String)("On key " + i), (Object)value, (Object)region.get((Object)i));
                }
            }
        };
        vm0.invoke(checkData);
    }

    private static class FlowControlObserver
    extends DistributionMessageObserver {
        CountDownLatch allowMessages = new CountDownLatch(1);
        AtomicInteger messageCount = new AtomicInteger();
        private volatile boolean started = false;

        private FlowControlObserver() {
        }

        public void beforeProcessMessage(DistributionManager dm, DistributionMessage message) {
            if (this.started && message instanceof InitialImageOperation.ImageReplyMessage) {
                this.messageCount.incrementAndGet();
                try {
                    this.allowMessages.await();
                }
                catch (InterruptedException e) {
                    DistributedTestCase.fail("Interrupted", e);
                }
            }
        }

        public void start() {
            this.started = true;
        }
    }
}

