/*
 * Decompiled with CFR 0.152.
 */
package com.gemstone.gemfire.cache30;

import com.gemstone.gemfire.Statistics;
import com.gemstone.gemfire.StatisticsType;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheListener;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.client.ServerConnectivityException;
import com.gemstone.gemfire.cache.util.Gateway;
import com.gemstone.gemfire.cache.util.GatewayHub;
import com.gemstone.gemfire.cache.util.GatewayQueueAttributes;
import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.cache30.TestCacheListener;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.InternalLocator;
import com.gemstone.gemfire.internal.AvailablePortHelper;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.GatewayEventCallbackArgument;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientHealthMonitor;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.xmlcache.Declarable2;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.SerializableRunnable;
import dunit.VM;
import hydra.ProcessMgr;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import junit.framework.TestCase;

public class GatewayConnectionDUnitTest
extends CacheTestCase {
    private static final String WAN_REGION_NAME = "GatewayDUnitTest_WAN_Root";
    private static final String DS_REGION_NAME = "GatewayDUnitTest_DS_Root";
    protected static InternalLocator locator;
    private static final int NUM_KEYS = 10;
    private VM vm0 = null;
    private VM vm1 = null;
    private VM vm2 = null;
    private VM vm3 = null;
    private VM[] vmsDS0 = new VM[0];
    private VM[] vmsDS1 = new VM[0];
    private String[] vmsNameDS0 = null;
    private String[] vmsNameDS1 = null;

    public GatewayConnectionDUnitTest(String name) {
        super(name);
    }

    @Override
    public void setUp() throws Exception {
        boolean failedSetUp = true;
        try {
            GatewayConnectionDUnitTest.disconnectAllFromDS();
            this.setUpSystems();
            failedSetUp = false;
        }
        finally {
            super.setUp();
            if (failedSetUp) {
                GatewayConnectionDUnitTest.disconnectAllFromDS();
                this.cleanupAllLocators();
            }
        }
    }

    @Override
    public void tearDown2() throws Exception {
        GatewayConnectionDUnitTest.getLogWriter().info("tearDown()");
        try {
            this.closeAllCache();
        }
        finally {
            GatewayConnectionDUnitTest.disconnectAllFromDS();
            this.cleanupAllLocators();
        }
    }

    private void cleanupAllLocators() {
        GatewayConnectionDUnitTest.getLogWriter().info("cleanupAllLocators()");
        Host host = Host.getHost(0);
        for (int i = 0; i < host.getVMCount(); ++i) {
            host.getVM(i).invoke(new SerializableRunnable("Stop locator"){

                @Override
                public void run() {
                    if (locator != null) {
                        locator.stop();
                        locator = null;
                    }
                }
            });
        }
    }

    private void setUpSystems() throws Exception {
        GatewayConnectionDUnitTest.getLogWriter().info("SetUpSystems()");
        Host host = Host.getHost(0);
        this.vm0 = host.getVM(0);
        this.vm1 = host.getVM(1);
        this.vm2 = host.getVM(2);
        this.vm3 = host.getVM(3);
        this.vmsDS0 = new VM[]{this.vm0, this.vm1};
        this.vmsDS1 = new VM[]{this.vm2, this.vm3};
        this.vmsNameDS0 = new String[]{"vm0", "vm1"};
        this.vmsNameDS1 = new String[]{"vm2", "vm3"};
        String hostName = GatewayConnectionDUnitTest.getServerHostName(host);
        int[] freeUDPPorts = AvailablePortHelper.getRandomAvailableUDPPorts(2);
        int[] freeTCPPorts = AvailablePortHelper.getRandomAvailableTCPPorts(2);
        int dsPortDS0 = freeUDPPorts[0];
        int dsPortDS1 = freeUDPPorts[1];
        int hubPortDS0 = freeTCPPorts[0];
        int hubPortDS1 = freeTCPPorts[1];
        this.setUpDS("ds0", dsPortDS0, this.vmsDS0, hubPortDS0, "ds1", hostName, hubPortDS1, this.vmsNameDS0);
        this.setUpDS("ds1", dsPortDS1, this.vmsDS1, hubPortDS1, "ds0", hostName, hubPortDS0, this.vmsNameDS1);
    }

    private void setUpDS(final String dsName, final int dsPort, VM[] vms, final int hubPortLocal, final String dsNameRemote, final String hostNameRemote, final int hubPortRemote, final String[] vmsName) throws Exception {
        GatewayConnectionDUnitTest.getLogWriter().info("SetUpDS()");
        final Properties propsDS = new Properties();
        propsDS.setProperty("mcast-port", String.valueOf(dsPort));
        propsDS.setProperty("locators", "");
        int i = 0;
        while (i < vms.length) {
            final int whichvm = i++;
            VM vm = vms[whichvm];
            vm.invoke(new CacheSerializableRunnable("Set up " + dsName){

                @Override
                public void run2() throws CacheException {
                    String vmName = "GatewayConnectionDUnitTest_" + dsName + vmsName[whichvm];
                    propsDS.setProperty("name", vmName);
                    GatewayConnectionDUnitTest.this.getSystem(propsDS);
                    DistributedTestCase.getLogWriter().info("[GatewayConnectionDUnitTest] " + vmName + " has joined " + dsName + " with port " + String.valueOf(dsPort));
                    GatewayConnectionDUnitTest.this.getCache();
                    AttributesFactory factory = new AttributesFactory();
                    factory.setScope(Scope.DISTRIBUTED_ACK);
                    factory.setDataPolicy(DataPolicy.REPLICATE);
                    factory.setEnableWAN(true);
                    factory.setCacheListener((CacheListener)new EventListener());
                    GatewayConnectionDUnitTest.this.createRegion(GatewayConnectionDUnitTest.WAN_REGION_NAME, factory.create());
                    factory.setEnableWAN(false);
                    factory.setCacheListener((CacheListener)new EventListener());
                    GatewayConnectionDUnitTest.this.createRegion(GatewayConnectionDUnitTest.DS_REGION_NAME, factory.create());
                    DistributedTestCase.getLogWriter().info("[GatewayConnectionDUnitTest] " + vmName + " has created both regions");
                }
            });
        }
        final int whichvm = vms.length - 1;
        VM vm = vms[whichvm];
        vm.invoke(new CacheSerializableRunnable("Set up gateway in " + dsName){

            @Override
            public void run2() throws CacheException {
                String vmName = "GatewayConnectionDUnitTest_" + dsName + vmsName[whichvm];
                String hubName = "GatewayConnectionDUnitTest_" + dsName;
                String gatewayName = "GatewayConnectionDUnitTest_" + dsNameRemote;
                DistributedTestCase.getLogWriter().info("[GatewayConnectionDUnitTest] " + vmName + " is creating " + hubName + " with gateway to " + gatewayName);
                Cache cache = GatewayConnectionDUnitTest.this.getCache();
                GatewayHub hub = cache.setGatewayHub(hubName, hubPortLocal);
                Gateway gateway = hub.addGateway(gatewayName);
                DistributedTestCase.getLogWriter().info("[GatewayConnectionDUnitTest_] " + vmName + " adding endpoint [" + gatewayName + ", " + hostNameRemote + ", " + hubPortRemote + "] to " + gatewayName);
                gateway.addEndpoint(gatewayName, hostNameRemote, hubPortRemote);
                File d = new File(gatewayName + "_overflow_" + ProcessMgr.getProcessId());
                DistributedTestCase.getLogWriter().info("[GatewayConnectionDUnitTest_] " + vmName + " creating queue in " + d + " for " + gatewayName);
                GatewayQueueAttributes queueAttributes = new GatewayQueueAttributes(d.toString(), 100, 1, 1000, false, false, true, 0);
                gateway.setQueueAttributes(queueAttributes);
                try {
                    hub.start();
                }
                catch (IOException e) {
                    DistributedTestCase.getLogWriter().error("Start of hub " + hubName + " threw IOException", (Throwable)e);
                    TestCase.fail((String)("Start of hub " + hubName + " threw IOException"));
                }
                DistributedTestCase.getLogWriter().info("[GatewayConnectionDUnitTest] " + vmName + " has created " + hubName + " with gateway to " + gatewayName);
            }
        });
    }

    public void testGateway() throws Exception {
        this.doTestGatewayReConnect();
    }

    public void doTestGatewayReConnect() throws CacheException {
        GatewayConnectionDUnitTest.getLogWriter().info("doTestGatewayReConnect()");
        DistributedTestCase.ExpectedException expectedEx = GatewayConnectionDUnitTest.addExpectedException(ServerConnectivityException.class.getName());
        this.vm0.invoke(new CacheSerializableRunnable("Create values"){

            @Override
            public void run2() throws CacheException {
                Region region1 = GatewayConnectionDUnitTest.this.getRootRegion().getSubregion(GatewayConnectionDUnitTest.WAN_REGION_NAME);
                for (int i = 0; i < 10; ++i) {
                    DistributedTestCase.getLogWriter().info("DEBUG: putting key-string-" + i);
                    region1.put((Object)("key-string-" + i), (Object)("value-" + i), (Object)"WAN");
                }
            }
        });
        this.vm3.invoke(new CacheSerializableRunnable("Verify values at server2"){

            @Override
            public void run2() throws CacheException {
                Region region1 = GatewayConnectionDUnitTest.this.getRootRegion().getSubregion(GatewayConnectionDUnitTest.WAN_REGION_NAME);
                HashSet<Object> keys = new HashSet<Object>();
                EventListener el = (EventListener)region1.getAttributes().getCacheListener();
                el.waitForGatewayCallback("key-string-3");
                Iterator iter = ClientHealthMonitor.getInstance().getClientHeartbeats().keySet().iterator();
                while (iter.hasNext()) {
                    ClientHealthMonitor.getInstance().removeAllConnectionsAndUnregisterClient((ClientProxyMembershipID)iter.next());
                }
                el.waitForGatewayCallback("key-string-9");
                Set events = el.getEvents();
                for (EntryEvent event : events) {
                    keys.add(event.getKey());
                }
                if (keys.size() <= 0) {
                    DistributedTestCase.getLogWriter().warning("Haven't received any callback events.");
                }
                for (int i = 0; i < 10; ++i) {
                    TestCase.assertTrue((boolean)keys.contains("key-string-" + i));
                }
                StatisticsType st = InternalDistributedSystem.getAnyInstance().findType("CacheServerStats");
                Statistics[] s = InternalDistributedSystem.getAnyInstance().findStatisticsByType(st);
                int NumberOfOutOfOrderBatchIds = s[0].getInt("outOfOrderGatewayBatchIds");
                DistributedTestCase.getLogWriter().info("NumberOfOutOfOrderBatchIds : " + NumberOfOutOfOrderBatchIds);
                TestCase.assertEquals((int)0, (int)NumberOfOutOfOrderBatchIds);
            }
        });
        expectedEx.remove();
    }

    public class EventListener
    extends TestCacheListener
    implements Declarable2 {
        public final Set destroys = Collections.synchronizedSet(new HashSet());
        public final Set creates = Collections.synchronizedSet(new HashSet());
        public final Set invalidates = Collections.synchronizedSet(new HashSet());
        public final Set updates = Collections.synchronizedSet(new HashSet());
        public final Set events = Collections.synchronizedSet(new HashSet());
        public static final long MAX_TIME = 10000L;

        @Override
        public void afterCreate2(EntryEvent event) {
            DistributedTestCase.getLogWriter().info("DEBUG: afterCreate2 event=" + event);
            this.collectGatewayCallbackEvents(event);
            this.creates.add(event.getKey());
        }

        @Override
        public void afterDestroy2(EntryEvent event) {
            DistributedTestCase.getLogWriter().info("DEBUG: afterDestroy2 event=" + event);
            this.destroys.add(event.getKey());
        }

        @Override
        public void afterInvalidate2(EntryEvent event) {
            DistributedTestCase.getLogWriter().info("DEBUG: afterInvalidate2 event=" + event);
            this.invalidates.add(event.getKey());
        }

        @Override
        public void afterUpdate2(EntryEvent event) {
            DistributedTestCase.getLogWriter().info("DEBUG: afterUpdate2 event=" + event);
            this.collectGatewayCallbackEvents(event);
            this.updates.add(event.getKey());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void collectGatewayCallbackEvents(EntryEvent event) {
            Set set = this.events;
            synchronized (set) {
                if (((EntryEventImpl)event).getRawCallbackArgument() instanceof GatewayEventCallbackArgument) {
                    this.events.add(event);
                }
                this.events.notifyAll();
            }
        }

        public Set getEvents() {
            return this.events;
        }

        public boolean waitForCreated(final Object key) {
            DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

                @Override
                public boolean done() {
                    return EventListener.this.creates.contains(key);
                }

                @Override
                public String description() {
                    return "waiting for key creation: " + key;
                }
            };
            DistributedTestCase.waitForCriterion(ev, 10000L, 200L, true);
            return true;
        }

        public boolean waitForDestroyed(final Object key) {
            DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

                @Override
                public boolean done() {
                    return EventListener.this.destroys.contains(key);
                }

                @Override
                public String description() {
                    return "waiting for key destroy: " + key;
                }
            };
            DistributedTestCase.waitForCriterion(ev, 10000L, 200L, true);
            return true;
        }

        public boolean waitForInvalidated(final Object key) {
            DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

                @Override
                public boolean done() {
                    return EventListener.this.invalidates.contains(key);
                }

                @Override
                public String description() {
                    return "waiting for key invalidation: " + key;
                }
            };
            DistributedTestCase.waitForCriterion(ev, 10000L, 200L, true);
            return true;
        }

        public boolean waitForUpdated(final Object key) {
            DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

                @Override
                public boolean done() {
                    return EventListener.this.updates.contains(key);
                }

                @Override
                public String description() {
                    return "waiting for key update: " + key;
                }
            };
            DistributedTestCase.waitForCriterion(ev, 10000L, 200L, true);
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void waitForGatewayCallback(Object key) {
            boolean foundKey = false;
            long tilt = System.currentTimeMillis() + 60000L;
            Set set = this.events;
            synchronized (set) {
                while (!foundKey) {
                    DistributedTestCase.getLogWriter().info("DEBUG: waitForGatewayCallback events.size=" + this.events.size());
                    for (EntryEvent event : this.events) {
                        if (!key.equals(event.getKey())) continue;
                        foundKey = true;
                        break;
                    }
                    if (foundKey) continue;
                    try {
                        this.events.wait(100L);
                    }
                    catch (InterruptedException ex) {
                        TestCase.fail((String)"interrupted");
                    }
                    if (System.currentTimeMillis() < tilt) continue;
                    TestCase.fail((String)"timed out");
                }
            }
        }

        public Properties getConfig() {
            return null;
        }

        public void init(Properties props) {
        }
    }
}

