/*
 * Decompiled with CFR 0.152.
 */
package com.gemstone.gemfire.internal.cache.tier.sockets;

import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.client.internal.PoolImpl;
import com.gemstone.gemfire.cache.client.internal.QueueStateImpl;
import com.gemstone.gemfire.cache.util.BridgeServer;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.cache.BridgeObserver;
import com.gemstone.gemfire.internal.cache.BridgeObserverAdapter;
import com.gemstone.gemfire.internal.cache.BridgeObserverHolder;
import com.gemstone.gemfire.internal.cache.ha.HAHelper;
import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.VM;
import hydra.Log;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;

public class ReliableMessagingDUnitTest
extends DistributedTestCase {
    static VM server1 = null;
    static VM server2 = null;
    private static Cache cache = null;
    private static int PORT1;
    private static int PORT2;
    static PoolImpl pool;
    static ThreadIdentifier tid;
    static Long seqid;
    static long creationTime;
    static int CLIENT_ACK_INTERVAL;
    private static final String REGION_NAME = "ReliableMessagingDUnitTest_Region";
    private static BridgeObserver origObserver;

    public ReliableMessagingDUnitTest(String name) {
        super(name);
    }

    public void testPeriodicAckSendByClient() throws Exception {
        ReliableMessagingDUnitTest.createEntries();
        server1.invoke(ReliableMessagingDUnitTest.class, "putOnServer");
        ReliableMessagingDUnitTest.waitForServerUpdate();
        ReliableMessagingDUnitTest.setCreationTimeTidAndSeq();
        ReliableMessagingDUnitTest.waitForClientAck();
        server1.invoke(ReliableMessagingDUnitTest.class, "checkTidAndSeq");
    }

    public void testPeriodicAckSendByClientPrimaryFailover() throws Exception {
        ReliableMessagingDUnitTest.createEntries();
        ReliableMessagingDUnitTest.setBridgeObserverForBeforeSendingClientAck();
        server1.invoke(ReliableMessagingDUnitTest.class, "putOnServer");
        ReliableMessagingDUnitTest.getLogWriter().info("Entering waitForServerUpdate");
        ReliableMessagingDUnitTest.waitForServerUpdate();
        ReliableMessagingDUnitTest.getLogWriter().info("Entering waitForCallback");
        ReliableMessagingDUnitTest.waitForCallback();
        ReliableMessagingDUnitTest.getLogWriter().info("Entering waitForClientAck");
        ReliableMessagingDUnitTest.waitForClientAck();
        server2.invoke(ReliableMessagingDUnitTest.class, "checkTidAndSeq");
    }

    public static void waitForClientAck() throws Exception {
        long maxWaitTime = 30000L;
        long start = System.currentTimeMillis();
        Iterator iter = pool.getThreadIdToSequenceIdMap().entrySet().iterator();
        QueueStateImpl.SequenceIdAndExpirationObject seo = null;
        if (!iter.hasNext()) {
            ReliableMessagingDUnitTest.fail((String)"map is empty");
        }
        Map.Entry entry = iter.next();
        seo = (QueueStateImpl.SequenceIdAndExpirationObject)entry.getValue();
        while (!seo.getAckSend()) {
            ReliableMessagingDUnitTest.assertTrue((String)"Waited over 30000 for client ack ", (System.currentTimeMillis() - start < 30000L ? 1 : 0) != 0);
            ReliableMessagingDUnitTest.sleep(1000);
        }
        ReliableMessagingDUnitTest.getLogWriter().info("seo = " + seo);
        ReliableMessagingDUnitTest.assertTrue((String)("Creation time " + creationTime + " supposed to be same as seo " + seo.getCreationTime()), (creationTime == seo.getCreationTime() ? 1 : 0) != 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void setCreationTimeTidAndSeq() {
        Map.Entry entry;
        final Map map = pool.getThreadIdToSequenceIdMap();
        DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public boolean done() {
                Map map2 = map;
                synchronized (map2) {
                    return map.entrySet().size() > 0;
                }
            }

            @Override
            public String description() {
                return null;
            }
        };
        DistributedTestCase.waitForCriterion(ev, 10000L, 200L, true);
        Map map2 = map;
        synchronized (map2) {
            Iterator iter = map.entrySet().iterator();
            entry = iter.next();
        }
        QueueStateImpl.SequenceIdAndExpirationObject seo = (QueueStateImpl.SequenceIdAndExpirationObject)entry.getValue();
        ReliableMessagingDUnitTest.assertFalse((boolean)seo.getAckSend());
        creationTime = seo.getCreationTime();
        ReliableMessagingDUnitTest.getLogWriter().info("seo is " + seo.toString());
        ReliableMessagingDUnitTest.assertTrue((String)"Creation time not set", (creationTime != 0L ? 1 : 0) != 0);
        Object[] args = new Object[]{((ThreadIdentifier)entry.getKey()).getMembershipID(), new Long(((ThreadIdentifier)entry.getKey()).getThreadID()), new Long(seo.getSequenceId())};
        server1.invoke(ReliableMessagingDUnitTest.class, "setTidAndSeq", args);
        server2.invoke(ReliableMessagingDUnitTest.class, "setTidAndSeq", args);
    }

    public static void checkEmptyDispatchedMsgs() {
        ReliableMessagingDUnitTest.assertEquals((int)0, (int)HARegionQueue.getDispatchedMessagesMapForTesting().size());
    }

    public static void checkTidAndSeq() {
        Map map = HARegionQueue.getDispatchedMessagesMapForTesting();
        ReliableMessagingDUnitTest.assertTrue((map.size() > 0 ? 1 : 0) != 0);
        Iterator iter = map.entrySet().iterator();
        if (!iter.hasNext()) {
            ReliableMessagingDUnitTest.fail((String)"Dispatched messages is empty");
        }
        Map.Entry entry = iter.next();
        Map dispMap = HAHelper.getDispatchMessageMap(entry.getValue());
        ReliableMessagingDUnitTest.assertEquals((Object)seqid, dispMap.get(tid));
    }

    public static void setTidAndSeq(byte[] membershipId, Long threadId, Long sequenceId) {
        tid = new ThreadIdentifier(membershipId, threadId.longValue());
        seqid = sequenceId;
    }

    public static void createEntries() throws Exception {
        creationTime = 0L;
        Region r1 = cache.getRegion("/ReliableMessagingDUnitTest_Region");
        String keyPrefix = "server-";
        for (int i = 0; i < 5; ++i) {
            r1.create((Object)(keyPrefix + i), (Object)"val");
        }
    }

    public static void putOnServer() throws Exception {
        Region r1 = cache.getRegion("/ReliableMessagingDUnitTest_Region");
        String keyPrefix = "server-";
        for (int i = 0; i < 5; ++i) {
            r1.put((Object)(keyPrefix + i), (Object)("val-" + i));
        }
    }

    private static void sleep(int ms) {
        try {
            Thread.sleep(ms);
        }
        catch (InterruptedException e) {
            ReliableMessagingDUnitTest.fail("Interrupted", e);
        }
    }

    public static void checkServerCount(int expectedDeadServers, int expectedLiveServers) {
        long maxWaitTime = 60000L;
        long start = System.currentTimeMillis();
        while (pool.getConnectedServerCount() != expectedLiveServers) {
            ReliableMessagingDUnitTest.assertTrue((String)("Waited over 60000for active servers to become :" + expectedLiveServers), (System.currentTimeMillis() - start < 60000L ? 1 : 0) != 0);
            ReliableMessagingDUnitTest.sleep(2000);
        }
    }

    public static void stopServer() {
        try {
            Iterator iter = cache.getBridgeServers().iterator();
            if (iter.hasNext()) {
                BridgeServer server = (BridgeServer)iter.next();
                server.stop();
            }
        }
        catch (Exception e) {
            ReliableMessagingDUnitTest.fail("failed while stopServer()", e);
        }
    }

    public static void waitForServerUpdate() {
        Region r1 = cache.getRegion("/ReliableMessagingDUnitTest_Region");
        ReliableMessagingDUnitTest.assertNotNull((Object)r1);
        long maxWaitTime = 60000L;
        long start = System.currentTimeMillis();
        while (!r1.getEntry((Object)"server-4").getValue().equals("val-4")) {
            ReliableMessagingDUnitTest.assertTrue((String)"Waited over 60000 ms for entry to be refreshed", (System.currentTimeMillis() - start < 60000L ? 1 : 0) != 0);
            ReliableMessagingDUnitTest.sleep(1000);
        }
    }

    public static void setBridgeObserverForBeforeSendingClientAck() throws Exception {
        PoolImpl.BEFORE_SENDING_CLIENT_ACK_CALLBACK_FLAG = true;
        origObserver = BridgeObserverHolder.setInstance((BridgeObserver)new BridgeObserverAdapter(){

            public void beforeSendingClientAck() {
                DistributedTestCase.getLogWriter().info("beforeSendingClientAck invoked");
                ReliableMessagingDUnitTest.setCreationTimeTidAndSeq();
                server1.invoke(ReliableMessagingDUnitTest.class, "stopServer");
                ReliableMessagingDUnitTest.checkServerCount(1, 1);
                server2.invoke(ReliableMessagingDUnitTest.class, "checkEmptyDispatchedMsgs");
                PoolImpl.BEFORE_SENDING_CLIENT_ACK_CALLBACK_FLAG = false;
                DistributedTestCase.getLogWriter().info("end of beforeSendingClientAck");
            }
        });
    }

    public static void waitForCallback() {
        long maxWaitTime = 60000L;
        long start = System.currentTimeMillis();
        while (PoolImpl.BEFORE_SENDING_CLIENT_ACK_CALLBACK_FLAG) {
            ReliableMessagingDUnitTest.assertTrue((String)"Waited over 60000to send an ack from client : ", (System.currentTimeMillis() - start < 60000L ? 1 : 0) != 0);
            ReliableMessagingDUnitTest.sleep(2000);
        }
    }

    @Override
    public void setUp() throws Exception {
        super.setUp();
        Host host = Host.getHost(0);
        server1 = host.getVM(0);
        server2 = host.getVM(1);
        PORT1 = (Integer)server1.invoke(ReliableMessagingDUnitTest.class, "createServerCache");
        PORT2 = (Integer)server2.invoke(ReliableMessagingDUnitTest.class, "createServerCache");
        CacheServerTestUtil.disableShufflingOfEndpoints();
        ReliableMessagingDUnitTest.createClientCache(PORT1, PORT2);
    }

    private Cache createCache(Properties props) throws Exception {
        InternalDistributedSystem ds = this.getSystem(props);
        ds.disconnect();
        ds = this.getSystem(props);
        Cache result = null;
        result = CacheFactory.create((DistributedSystem)ds);
        if (result == null) {
            throw new Exception("CacheFactory.create() returned null ");
        }
        return result;
    }

    public static Integer createServerCache() throws Exception {
        ReliableMessagingDUnitTest test = new ReliableMessagingDUnitTest("temp");
        Properties props = new Properties();
        cache = test.createCache(props);
        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.DISTRIBUTED_ACK);
        factory.setDataPolicy(DataPolicy.REPLICATE);
        RegionAttributes attrs = factory.create();
        cache.setMessageSyncInterval(25);
        cache.createRegion(REGION_NAME, attrs);
        BridgeServer server = cache.addBridgeServer();
        int port = AvailablePort.getRandomAvailablePort((int)0);
        server.setPort(port);
        server.setNotifyBySubscription(true);
        server.start();
        Log.getLogWriter().info("Server started at PORT = " + port);
        return new Integer(server.getPort());
    }

    public static void createClientCache(int port1, int port2) throws Exception {
        ReliableMessagingDUnitTest test = new ReliableMessagingDUnitTest("temp");
        Properties props = new Properties();
        props.setProperty("mcast-port", "0");
        props.setProperty("locators", "");
        cache = test.createCache(props);
        String host = ReliableMessagingDUnitTest.getServerHostName(Host.getHost(0));
        PoolImpl p = (PoolImpl)PoolManager.createFactory().addServer(host, PORT1).addServer(host, PORT2).setSubscriptionEnabled(true).setSubscriptionRedundancy(1).setThreadLocalConnections(true).setMinConnections(6).setReadTimeout(20000).setPingInterval(10000L).setRetryAttempts(5).setSubscriptionAckInterval(CLIENT_ACK_INTERVAL).create("ReliableMessagingDUnitTestPool");
        AttributesFactory factory = new AttributesFactory();
        factory.setScope(Scope.DISTRIBUTED_ACK);
        factory.setPoolName(p.getName());
        RegionAttributes attrs = factory.create();
        Region region = cache.createRegion(REGION_NAME, attrs);
        region.registerInterest((Object)"ALL_KEYS");
        pool = p;
    }

    @Override
    public void tearDown2() throws Exception {
        creationTime = 0L;
        super.tearDown2();
        ReliableMessagingDUnitTest.closeCache();
        server1.invoke(ReliableMessagingDUnitTest.class, "closeCache");
        server2.invoke(ReliableMessagingDUnitTest.class, "closeCache");
        CacheServerTestUtil.resetDisableShufflingOfEndpointsFlag();
    }

    public static void closeCache() {
        if (cache != null && !cache.isClosed()) {
            cache.close();
            cache.getDistributedSystem().disconnect();
        }
    }

    public static void resetCallBack() {
        BridgeObserverHolder.setInstance((BridgeObserver)origObserver);
    }

    static {
        pool = null;
        tid = null;
        seqid = null;
        creationTime = 0L;
        CLIENT_ACK_INTERVAL = 5000;
    }
}

