/*
 * Decompiled with CFR 0.152.
 */
package com.gemstone.gemfire.internal.cache.persistence;

import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.DiskStore;
import com.gemstone.gemfire.cache.DiskStoreFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.util.Gateway;
import com.gemstone.gemfire.cache.util.GatewayHub;
import com.gemstone.gemfire.cache.util.GatewayQueueAttributes;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.internal.AvailablePortHelper;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberManager;
import dunit.AsyncInvocation;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.SerializableRunnable;
import dunit.VM;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Map;
import java.util.Properties;
import junit.framework.TestCase;

public class PersistentGatewayDUnitTest
extends CacheTestCase {
    protected static final String REGION_NAME = "REGION_NAME";
    private static final long MAX_WAIT = 70000L;

    public PersistentGatewayDUnitTest(String name) {
        super(name);
    }

    public void testWaitForLatestMemberGateway() throws Throwable {
        Host host = Host.getHost(0);
        VM vm0 = host.getVM(0);
        VM vm1 = host.getVM(1);
        VM vm2 = host.getVM(2);
        final int destinationPort = AvailablePortHelper.getRandomAvailableTCPPortOnVM(vm2);
        PersistentGatewayDUnitTest.getLogWriter().info("Creating region in VM0");
        this.createPersistentGateway(vm0, destinationPort);
        PersistentGatewayDUnitTest.getLogWriter().info("Creating region in VM1");
        this.createPersistentGateway(vm1, destinationPort);
        vm0.invoke(new SerializableRunnable(){

            @Override
            public void run() {
                Cache cache = PersistentGatewayDUnitTest.this.getCache();
                Region region = cache.getRegion(PersistentGatewayDUnitTest.REGION_NAME);
                for (int i = 0; i < 3; ++i) {
                    region.put((Object)i, (Object)("a" + i));
                }
            }
        });
        PersistentGatewayDUnitTest.getLogWriter().info("closing region in vm0");
        this.closeCache(vm0);
        Thread.sleep(5000L);
        vm1.invoke(new SerializableRunnable(){

            @Override
            public void run() {
                Cache cache = PersistentGatewayDUnitTest.this.getCache();
                Region region = cache.getRegion(PersistentGatewayDUnitTest.REGION_NAME);
                for (int i = 3; i < 5; ++i) {
                    region.put((Object)i, (Object)("a" + i));
                }
            }
        });
        PersistentGatewayDUnitTest.getLogWriter().info("closing region in vm1");
        this.closeCache(vm1);
        PersistentGatewayDUnitTest.getLogWriter().info("Creating region in VM0");
        AsyncInvocation future = this.createPersistentGatewayAsync(vm0, destinationPort);
        this.waitForBlockedInitialization(vm0);
        PersistentGatewayDUnitTest.assertTrue((boolean)future.isAlive());
        PersistentGatewayDUnitTest.getLogWriter().info("Creating region in VM1");
        this.createPersistentGateway(vm1, destinationPort);
        future.join(70000L);
        if (future.isAlive()) {
            PersistentGatewayDUnitTest.fail((String)"Region not created within70000");
        }
        if (future.exceptionOccurred()) {
            throw new Exception(future.getException());
        }
        this.closeCache(vm1);
        SerializableRunnable createOtherSide = new SerializableRunnable("Create gateway region"){

            @Override
            public void run() {
                Properties props = new Properties();
                props.setProperty("mcast-port", "0");
                props.setProperty("locators", "");
                InternalDistributedSystem ds = PersistentGatewayDUnitTest.this.getSystem(props);
                Cache cache = PersistentGatewayDUnitTest.this.getCache();
                RegionFactory rf = new RegionFactory();
                rf.setDataPolicy(DataPolicy.REPLICATE);
                rf.setScope(Scope.DISTRIBUTED_ACK);
                rf.setEnableGateway(true);
                final Region region = rf.create(PersistentGatewayDUnitTest.REGION_NAME);
                GatewayHub hub = cache.addGatewayHub("h1", destinationPort);
                Gateway gateway = hub.addGateway("g1");
                DiskStoreFactory dsf = cache.createDiskStoreFactory();
                dsf.setDiskDirs(CacheTestCase.getDiskDirs());
                dsf.setMaxOplogSize(1L);
                DiskStore store = dsf.create(PersistentGatewayDUnitTest.REGION_NAME);
                gateway.setQueueAttributes(new GatewayQueueAttributes(store.getName(), 5, 1, 100, false, true, 60000));
                try {
                    hub.start();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
                DistributedTestCase.waitForCriterion(new DistributedTestCase.WaitCriterion(){

                    @Override
                    public boolean done() {
                        return region.size() == 5;
                    }

                    @Override
                    public String description() {
                        return "Waiting for destination region to contain entries. Current keys " + region.keySet();
                    }
                }, 70000L, 100L, true);
                for (int i = 0; i < 5; ++i) {
                    TestCase.assertEquals((Object)("a" + i), (Object)region.get((Object)i));
                }
            }
        };
        vm2.invoke(createOtherSide);
    }

    protected void closeCache(VM vm) {
        SerializableRunnable closeRegion = new SerializableRunnable("Create persistent region"){

            @Override
            public void run() {
                Cache cache = PersistentGatewayDUnitTest.this.getCache();
                cache.close();
            }
        };
        vm.invoke(closeRegion);
    }

    protected void createPersistentGateway(VM vm, int destinationPort) throws Throwable {
        AsyncInvocation future = this.createPersistentGatewayAsync(vm, destinationPort);
        future.join(70000L);
        if (future.isAlive()) {
            PersistentGatewayDUnitTest.fail((String)"Region not created within70000");
        }
        if (future.exceptionOccurred()) {
            throw new RuntimeException(future.getException());
        }
    }

    protected AsyncInvocation createPersistentGatewayAsync(VM vm, final int destinationPort) {
        SerializableRunnable createRegion = new SerializableRunnable("Create gateway region"){

            @Override
            public void run() {
                Cache cache = PersistentGatewayDUnitTest.this.getCache();
                RegionFactory rf = new RegionFactory();
                rf.setDataPolicy(DataPolicy.REPLICATE);
                rf.setScope(Scope.DISTRIBUTED_ACK);
                rf.setEnableGateway(true);
                rf.create(PersistentGatewayDUnitTest.REGION_NAME);
                int port = AvailablePortHelper.getRandomAvailableTCPPort();
                GatewayHub hub = cache.addGatewayHub("h1", port);
                Gateway gateway = hub.addGateway("g1");
                DiskStoreFactory dsf = cache.createDiskStoreFactory();
                dsf.setDiskDirs(CacheTestCase.getDiskDirs());
                dsf.setMaxOplogSize(1L);
                DiskStore ds = dsf.create(PersistentGatewayDUnitTest.REGION_NAME);
                gateway.setQueueAttributes(new GatewayQueueAttributes(ds.getName(), 5, 1, 100, false, true, 60000));
                try {
                    gateway.addEndpoint("end1", InetAddress.getLocalHost().getHostAddress(), destinationPort);
                    hub.start();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        };
        return vm.invokeAsync(createRegion);
    }

    private void waitForBlockedInitialization(VM vm) {
        vm.invoke(new SerializableRunnable(){

            @Override
            public void run() {
                DistributedTestCase.waitForCriterion(new DistributedTestCase.WaitCriterion(){

                    @Override
                    public String description() {
                        return "Waiting to blocked waiting for anothe persistent member to come online";
                    }

                    @Override
                    public boolean done() {
                        GemFireCacheImpl cache = (GemFireCacheImpl)PersistentGatewayDUnitTest.this.getCache();
                        PersistentMemberManager mm = cache.getPersistentMemberManager();
                        Map regions = mm.getWaitingRegions();
                        boolean done = !regions.isEmpty();
                        return done;
                    }
                }, 70000L, 100L, true);
            }
        });
    }
}

