/*
 * Decompiled with CFR 0.152.
 */
package com.gemstone.org.jgroups.protocols;

import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.Locator;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.jgroup.JGroupMembershipManager;
import com.gemstone.gemfire.distributed.internal.membership.jgroup.MembershipManagerHelper;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.org.jgroups.Address;
import com.gemstone.org.jgroups.JChannel;
import com.gemstone.org.jgroups.protocols.GemFireTimeSync;
import com.gemstone.org.jgroups.protocols.pbcast.GMS;
import com.gemstone.org.jgroups.stack.IpAddress;
import com.gemstone.org.jgroups.stack.Protocol;
import com.gemstone.org.jgroups.stack.ProtocolStack;
import dunit.AsyncInvocation;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.SerializableCallable;
import dunit.VM;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import junit.framework.TestCase;

public class GemFireTimeSyncProtocolDUnitTest
extends DistributedTestCase {
    public GemFireTimeSyncProtocolDUnitTest(String name) {
        super(name);
    }

    public void testGemfireTimeSyncJgroupsProtocol() {
        Host host = Host.getHost(0);
        VM vm0 = host.getVM(0);
        VM vm1 = host.getVM(1);
        VM vm2 = host.getVM(2);
        ArrayList<Object[]> vmJoinTimeOffsets = new ArrayList<Object[]>();
        Object[] vmjointime0 = (Object[])this.getIDAndTimeOffset(vm0);
        vmJoinTimeOffsets.add(vmjointime0);
        Object[] vmjointime1 = (Object[])this.getIDAndTimeOffset(vm1);
        vmJoinTimeOffsets.add(vmjointime1);
        Object[] vmjointime2 = (Object[])this.getIDAndTimeOffset(vm2);
        vmJoinTimeOffsets.add(vmjointime2);
        VM locator = Host.getLocator();
        Map offsets = (Map)locator.invoke(new SerializableCallable(){

            public Object call() throws Exception {
                InternalDistributedSystem system = InternalDistributedSystem.getAnyInstance();
                DistributedMember coord = MembershipManagerHelper.getCoordinator((DistributedSystem)system);
                InternalDistributedMember self = system.getDistributedMember();
                Map offs = null;
                if (self.getId().equalsIgnoreCase(coord.getId())) {
                    JGroupMembershipManager jgmm = MembershipManagerHelper.getMembershipManager((DistributedSystem)system);
                    JChannel jchannel = MembershipManagerHelper.getJChannel((DistributedSystem)system);
                    final UnitTestHook gftsTestHook = new UnitTestHook();
                    if (jchannel != null && jchannel.isConnected()) {
                        ProtocolStack pstack = jchannel.getProtocolStack();
                        GemFireTimeSync gts = null;
                        boolean found = false;
                        Protocol prot = pstack.findProtocol("GemFireTimeSync");
                        if (prot != null) {
                            Protocol up_prot = prot.getUpProtocol();
                            Protocol down_prot = prot.getDownProtocol();
                            TestCase.assertEquals((String)("UP protocol of GemFireTimeSync protocol is: " + up_prot), (String)"AUTH", (String)up_prot.getName());
                            TestCase.assertEquals((String)("Down protocol of GemFireTimeSync protocol is: " + down_prot), (String)"FRAG3", (String)down_prot.getName());
                            found = true;
                            gts = (GemFireTimeSync)prot;
                            gts.setTestHook((GemFireTimeSync.TestHook)gftsTestHook);
                            DistributedTestCase.getLogWriter().fine("Invoking sync-therad in GemFireTimeSync protocol");
                            gts.invokeServiceThreadForTest();
                        }
                        if (!found) {
                            TestCase.fail((String)"GemFireTimeSync protocol is not found in protocol stack");
                        }
                        DistributedTestCase.waitForCriterion(new DistributedTestCase.WaitCriterion(){

                            @Override
                            public boolean done() {
                                return gftsTestHook.getBarrier() == 0;
                            }

                            @Override
                            public String description() {
                                return "Waiting for all nodes to get time offsets from co-ordinator";
                            }
                        }, 500L, 50L, false);
                        Map<Address, GemFireTimeSync.GFTimeSyncHeader> respons = gftsTestHook.getResponses();
                        if (gts != null) {
                            gts.setTestHook(null);
                        }
                        offs = GemFireTimeSyncProtocolDUnitTest.this.calculateOffsets(respons, gftsTestHook.getCurTime());
                    }
                }
                return offs;
            }
        });
        ArrayList<Object[]> vmtimeOffsets = new ArrayList<Object[]>();
        Object[] vmtime0 = (Object[])this.getIDAndTimeOffset(vm0);
        Object[] vmtime1 = (Object[])this.getIDAndTimeOffset(vm1);
        Object[] vmtime2 = (Object[])this.getIDAndTimeOffset(vm2);
        vmtimeOffsets.add(vmtime0);
        vmtimeOffsets.add(vmtime1);
        vmtimeOffsets.add(vmtime2);
        for (int i = 0; i < 3; ++i) {
            String address = (String)((Object[])vmtimeOffsets.get(i))[0];
            long offsetTime = (Long)offsets.get(address);
            if (offsetTime == (Long)((Object[])vmtimeOffsets.get(i))[1] || ((Long)((Object[])vmtimeOffsets.get(i))[1]).longValue() == ((Long)((Object[])vmJoinTimeOffsets.get(i))[1]).longValue()) continue;
            GemFireTimeSyncProtocolDUnitTest.fail((String)("Offset calculated locally: " + offsetTime + " not equals to returned by GemFireTimeSync protocol: " + ((Object[])vmtimeOffsets.get(i))[1] + " and join time is: " + (Long)((Object[])vmJoinTimeOffsets.get(i))[1] + " for address: " + ((Object[])vmtimeOffsets.get(i))[0]));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testGemfireTimeSyncSlowDownCacheTime() {
        Host host = Host.getHost(0);
        VM newLocator = host.getVM(0);
        VM vm1 = host.getVM(1);
        ArrayList vmJoinTimeOffsets = new ArrayList();
        final int locatorPort = AvailablePort.getRandomAvailablePort((int)0);
        String host0 = GemFireTimeSyncProtocolDUnitTest.getServerHostName(host);
        final Properties props = new Properties();
        props.setProperty("locators", host0 + "[" + locatorPort + "]");
        props.setProperty("mcast-port", "0");
        props.setProperty("jmx-manager", "false");
        props.setProperty("log-level", "fine");
        props.put("member-timeout", "2000");
        try {
            newLocator.invoke(new CacheSerializableRunnable("Start locator"){

                @Override
                public void run2() throws CacheException {
                    try {
                        DistributedTestCase.system.disconnect();
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                    File myLocatorLogFile = new File("locator-" + locatorPort + ".log");
                    try {
                        Locator.startLocatorAndDS((int)locatorPort, (File)myLocatorLogFile, (Properties)props);
                    }
                    catch (IOException e) {
                        DistributedTestCase.fail("New locator startup failed on port: " + locatorPort, e);
                    }
                }
            });
            vm1.invoke(new CacheSerializableRunnable("Starting vm1"){

                @Override
                public void run2() {
                    DistributedTestCase.disconnectFromDS();
                    DistributedSystem.connect((Properties)props);
                }
            });
            AsyncInvocation slowDownCacheTimeTask = newLocator.invokeAsync(new CacheSerializableRunnable("Slow down locator cache time"){

                @Override
                public void run2() throws CacheException {
                    InternalDistributedSystem system = InternalDistributedSystem.getAnyInstance();
                    final DistributionManager dm = (DistributionManager)system.getDistributionManager();
                    long currOffset = dm.getCacheTimeOffset();
                    final long newOffset = currOffset - 10L;
                    dm.setCacheTimeOffset(null, newOffset, false);
                    DistributedTestCase.waitForCriterion(new DistributedTestCase.WaitCriterion(){

                        @Override
                        public boolean done() {
                            return dm.getCacheTimeOffset() == newOffset;
                        }

                        @Override
                        public String description() {
                            return "Waiting for locator cache time to slowdown";
                        }
                    }, 30L, 2L, true);
                }
            });
            DistributedTestCase.join(slowDownCacheTimeTask, 100L, GemFireTimeSyncProtocolDUnitTest.getLogWriter());
            if (slowDownCacheTimeTask.exceptionOccurred()) {
                GemFireTimeSyncProtocolDUnitTest.fail("Slow down locator cache time task failed with exception", slowDownCacheTimeTask.getException());
            }
            newLocator.invoke(new CacheSerializableRunnable("Shutdown locator"){

                @Override
                public void run2() throws CacheException {
                    try {
                        InternalDistributedSystem.getConnectedInstance().disconnect();
                    }
                    catch (Exception e) {
                        DistributedTestCase.fail("Stoping locator failed", e);
                    }
                }
            });
            vm1.invoke(new CacheSerializableRunnable("Shutdown vm1"){

                @Override
                public void run2() throws CacheException {
                    try {
                        InternalDistributedSystem.getConnectedInstance().disconnect();
                    }
                    catch (Exception e) {
                        DistributedTestCase.fail("Stoping vm1 failed", e);
                    }
                }
            });
        }
        catch (Throwable throwable) {
            newLocator.invoke(new /* invalid duplicate definition of identical inner class */);
            vm1.invoke(new /* invalid duplicate definition of identical inner class */);
            throw throwable;
        }
    }

    public Object getIDAndTimeOffset(VM vm) {
        return vm.invoke(new SerializableCallable(){

            public Object call() throws Exception {
                InternalDistributedSystem system = GemFireTimeSyncProtocolDUnitTest.this.getSystem();
                JChannel jchannel = MembershipManagerHelper.getJChannel((DistributedSystem)system);
                final UnitTestHook gftsTestHook = new UnitTestHook();
                Protocol prot = jchannel.getProtocolStack().findProtocol("GemFireTimeSync");
                GemFireTimeSync gts = (GemFireTimeSync)prot;
                gts.setTestHook((GemFireTimeSync.TestHook)gftsTestHook);
                DistributedTestCase.waitForCriterion(new DistributedTestCase.WaitCriterion(){

                    @Override
                    public boolean done() {
                        return gftsTestHook.getBarrier() == 1;
                    }

                    @Override
                    public String description() {
                        return "Waiting for this node to get time offsets from co-ordinator";
                    }
                }, 500L, 50L, false);
                DistributionManager dm = (DistributionManager)system.getDistributionManager();
                long timeOffset = dm.getCacheTimeOffset();
                gts.setTestHook(null);
                prot = jchannel.getProtocolStack().findProtocol("GMS");
                GMS gms = (GMS)prot;
                String address = gms.getLocalAddress();
                return new Object[]{address, timeOffset};
            }
        });
    }

    private Map calculateOffsets(Map responses, long currentTime) {
        HashMap<String, Long> offsets = new HashMap<String, Long>();
        if (responses.size() > 1) {
            long stddev;
            long averageTime;
            long newAverageTime;
            long rTTStddev;
            long averageRTT = this.getMeanRTT(responses, 0L, Long.MAX_VALUE);
            long newAverageRTT = this.getMeanRTT(responses, averageRTT, rTTStddev = this.getRTTStdDev(responses, averageRTT));
            if (newAverageRTT > 0L) {
                averageRTT = newAverageRTT;
            }
            if ((newAverageTime = this.getMeanClock(responses, averageTime = this.getMeanClock(responses, 0L, Long.MAX_VALUE), stddev = this.getClockStdDev(responses, averageTime))) > 0L) {
                averageTime = newAverageTime;
            }
            long averageTransmitTime = averageRTT / 2L;
            long adjustedAverageTime = averageTime + averageTransmitTime;
            for (Map.Entry entry : responses.entrySet()) {
                IpAddress mbr = (IpAddress)entry.getKey();
                GemFireTimeSync.GFTimeSyncHeader response = (GemFireTimeSync.GFTimeSyncHeader)entry.getValue();
                long responseTransmitTime = (response.timeReceived - currentTime) / 2L;
                long offset = adjustedAverageTime - (response.time + responseTransmitTime);
                offsets.put(mbr.toString(), offset);
            }
        }
        return offsets;
    }

    private long getMeanRTT(Map<Address, GemFireTimeSync.GFTimeSyncHeader> values, long previousMean, long stddev) {
        long totalTime = 0L;
        long numSamples = 0L;
        long upperLimit = previousMean + stddev;
        for (GemFireTimeSync.GFTimeSyncHeader response : values.values()) {
            long rtt = response.timeReceived - response.time;
            if (rtt > upperLimit) continue;
            ++numSamples;
            totalTime += rtt;
        }
        long averageTime = totalTime / numSamples;
        return averageTime;
    }

    private long getRTTStdDev(Map<Address, GemFireTimeSync.GFTimeSyncHeader> values, long average) {
        long sqDiffs = 0L;
        for (GemFireTimeSync.GFTimeSyncHeader response : values.values()) {
            long diff = average - (response.timeReceived - response.time);
            sqDiffs += diff * diff;
        }
        return Math.round(Math.sqrt(sqDiffs));
    }

    private long getMeanClock(Map<Address, GemFireTimeSync.GFTimeSyncHeader> values, long previousMean, long stddev) {
        long totalTime = 0L;
        long numSamples = 0L;
        long upperLimit = previousMean + stddev;
        long lowerLimit = previousMean - stddev;
        for (GemFireTimeSync.GFTimeSyncHeader response : values.values()) {
            if (lowerLimit > response.time || response.time > upperLimit) continue;
            ++numSamples;
            totalTime += response.time;
        }
        long averageTime = totalTime / numSamples;
        return averageTime;
    }

    private long getClockStdDev(Map<Address, GemFireTimeSync.GFTimeSyncHeader> values, long average) {
        long sqDiffs = 0L;
        for (GemFireTimeSync.GFTimeSyncHeader response : values.values()) {
            long diff = average - response.time;
            sqDiffs += diff * diff;
        }
        return Math.round(Math.sqrt(sqDiffs));
    }

    public class UnitTestHook
    implements GemFireTimeSync.TestHook {
        private Map<Address, GemFireTimeSync.GFTimeSyncHeader> respons;
        private long curTime;
        private int barrier = -1;

        public void hook(int barr) {
            this.barrier = barr;
            if (this.barrier == 0) {
                DistributedTestCase.pause(200);
            }
        }

        public void setResponses(Map<Address, GemFireTimeSync.GFTimeSyncHeader> responses, long currentTime) {
            this.respons = responses;
            this.curTime = currentTime;
        }

        public Map<Address, GemFireTimeSync.GFTimeSyncHeader> getResponses() {
            return this.respons;
        }

        public long getCurTime() {
            return this.curTime;
        }

        public int getBarrier() {
            return this.barrier;
        }
    }
}

