/*
 * Decompiled with CFR 0.152.
 */
package com.pivotal.gemfirexd.distributed;

import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.FunctionService;
import com.gemstone.gemfire.cache.execute.ResultCollector;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.DistributionStats;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.execute.InternalRegionFunctionContext;
import com.gemstone.gnu.trove.TIntObjectHashMap;
import com.pivotal.gemfirexd.DistributedSQLTestBase;
import com.pivotal.gemfirexd.internal.engine.GfxdDataSerializable;
import com.pivotal.gemfirexd.internal.engine.Misc;
import com.pivotal.gemfirexd.internal.engine.distributed.GfxdListResultCollector;
import com.pivotal.gemfirexd.internal.engine.distributed.message.RegionExecutorMessage;
import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils;
import com.pivotal.gemfirexd.internal.engine.locks.GfxdDRWLockService;
import com.pivotal.gemfirexd.internal.engine.locks.GfxdLocalLockService;
import io.snappydata.test.dunit.AsyncInvocation;
import io.snappydata.test.dunit.DistributedTestBase;
import io.snappydata.test.dunit.SerializableCallable;
import io.snappydata.test.dunit.SerializableRunnable;
import io.snappydata.test.dunit.VM;
import io.snappydata.test.dunit.standalone.DUnitBB;
import io.snappydata.test.util.TestException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;

public class DistributedLockDUnit
extends DistributedSQLTestBase {
    private static final String NumReadLocks = "NUM_READLOCKS";
    private static final String NumReadUnlocks = "NUM_READUNLOCKS";
    private static final long MAX_LEASETIME = 60000L;

    public DistributedLockDUnit(String name) {
        super(name);
    }

    @Override
    protected String reduceLogging() {
        return "config";
    }

    private long runInThreads(int numThreads, Runnable run2, String msg) {
        int index;
        Thread[] threads = new Thread[numThreads];
        for (int index2 = 0; index2 < numThreads; ++index2) {
            threads[index2] = new Thread(run2);
        }
        long start = System.currentTimeMillis();
        for (index = 0; index < numThreads; ++index) {
            threads[index].start();
        }
        try {
            for (index = 0; index < numThreads; ++index) {
                threads[index].join();
            }
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new TestException("unexpected interrupt", (Throwable)ie);
        }
        long end = System.currentTimeMillis();
        this.getLogWriter().info((Object)("Time taken by " + msg + ": " + (end - start) + "ms"));
        return end - start;
    }

    private void checkResults(Object result, PartitionedRegion pr) {
        int[] allBucketIds = new int[pr.getPartitionAttributes().getTotalNumBuckets()];
        for (Set bucketIds : (Collection)result) {
            for (Integer bucketId : bucketIds) {
                if (allBucketIds[bucketId] != 0) {
                    DistributedLockDUnit.fail((String)("expected no more than one bucket for ID " + bucketId + " but got " + allBucketIds[bucketId]));
                }
                allBucketIds[bucketId.intValue()] = 1;
            }
        }
        for (int index = 0; index < allBucketIds.length; ++index) {
            if (allBucketIds[index] == 1) continue;
            DistributedLockDUnit.fail((String)("expected one bucket for ID " + index + " but got " + allBucketIds[index]));
        }
    }

    @Override
    public void vmTearDown() throws Exception {
        DUnitBB bb = DUnitBB.getBB();
        bb.remove((Object)NumReadLocks);
        bb.remove((Object)NumReadUnlocks);
        super.vmTearDown();
    }

    public void testLockWaiting() throws Exception {
        this.startVMs(3, 2);
        String lockObject = "Testing";
        AcquireReleaseLock getReadLock = new AcquireReleaseLock("get and release read lock", 10000L, 5000L, true, 0, lockObject);
        VM client1 = (VM)this.clientVMs.get(1);
        VM server1 = (VM)this.serverVMs.get(1);
        this.getLogWriter().info((Object)("Invoking read lock on client 1 with PID: " + client1.getPid()));
        this.getLogWriter().info((Object)("Invoking read lock on server 1 with PID: " + server1.getPid()));
        AsyncInvocation client1Async = client1.invokeAsync((Runnable)((Object)getReadLock));
        AsyncInvocation server1Async = server1.invokeAsync((Runnable)((Object)getReadLock));
        AcquireReleaseLock getWriteLock = new AcquireReleaseLock("get and release write lock", 20000L, 5000L, false, 2, lockObject);
        VM client2 = (VM)this.clientVMs.get(2);
        VM server2 = (VM)this.serverVMs.get(0);
        this.getLogWriter().info((Object)("Invoking write lock on client 2 with PID: " + client2.getPid()));
        this.getLogWriter().info((Object)("Invoking write lock on server 2 with PID: " + server2.getPid()));
        AsyncInvocation client2Async = client2.invokeAsync((Runnable)((Object)getWriteLock));
        AsyncInvocation server2Async = server2.invokeAsync((Runnable)((Object)getWriteLock));
        this.joinAsyncInvocation(client1Async, client1);
        this.joinAsyncInvocation(server1Async, server1);
        this.joinAsyncInvocation(client2Async, client2);
        this.joinAsyncInvocation(server2Async, server2);
    }

    public void testMultipleLockWaiting() throws Exception {
        this.startVMs(3, 2);
        String lockObject = "Testing";
        AcquireReleaseLock getReadLock1 = new AcquireReleaseLock("get and release read lock", 10000L, 15000L, true, 0, lockObject);
        AcquireReleaseLock getReadLock2 = new AcquireReleaseLock("get and release read lock", 10000L, 5000L, true, 0, lockObject);
        VM client1 = (VM)this.clientVMs.get(1);
        VM server1 = (VM)this.serverVMs.get(1);
        this.getLogWriter().info((Object)("Invoking read lock on client 1 with PID: " + client1.getPid()));
        this.getLogWriter().info((Object)("Invoking read lock on server 1 with PID: " + server1.getPid()));
        AsyncInvocation client1Async = client1.invokeAsync((Runnable)((Object)getReadLock1));
        AsyncInvocation server1Async = server1.invokeAsync((Runnable)((Object)getReadLock2));
        AcquireReleaseLock getWriteLock = new AcquireReleaseLock("get and release write lock", 25000L, 5000L, false, 2, lockObject);
        VM client2 = (VM)this.clientVMs.get(2);
        VM server2 = (VM)this.serverVMs.get(0);
        this.getLogWriter().info((Object)("Invoking write lock on client 2 with PID: " + client2.getPid()));
        this.getLogWriter().info((Object)("Invoking write lock on server 2 with PID: " + server2.getPid()));
        AsyncInvocation client2Async = client2.invokeAsync((Runnable)((Object)getWriteLock));
        AsyncInvocation server2Async = server2.invokeAsync((Runnable)((Object)getWriteLock));
        this.joinAsyncInvocation(client1Async, client1);
        this.joinAsyncInvocation(server1Async, server1);
        this.joinAsyncInvocation(client2Async, client2);
        this.joinAsyncInvocation(server2Async, server2);
    }

    public void testDeadlockPrevention() throws Exception {
        this.startVMs(3, 2);
        String lockObject1 = "Testing1";
        String lockObject2 = "Testing2";
        AcquireReleaseLock getReadLock1 = new AcquireReleaseLock("get and release read lock12", -1L, 5000L, true, 0, lockObject1, lockObject2);
        AcquireReleaseLock getReadLock2 = new AcquireReleaseLock("get and release read lock21", -1L, 9000L, true, 0, lockObject2, lockObject1);
        VM client1 = (VM)this.clientVMs.get(1);
        VM server1 = (VM)this.serverVMs.get(0);
        this.getLogWriter().info((Object)("Invoking read lock 12 on client 1 with PID: " + client1.getPid()));
        this.getLogWriter().info((Object)("Invoking read lock 21 on server 1 with PID: " + server1.getPid()));
        AsyncInvocation client1Async = client1.invokeAsync((Runnable)((Object)getReadLock1));
        AsyncInvocation server1Async = server1.invokeAsync((Runnable)((Object)getReadLock2));
        AcquireReleaseLock getWriteLock1 = new AcquireReleaseLock("get and release write lock1", -1L, 10000L, false, 0, lockObject1);
        AcquireReleaseLock getWriteLock2 = new AcquireReleaseLock("get and release write lock2", -1L, 5000L, false, 0, lockObject2);
        VM client2 = (VM)this.clientVMs.get(2);
        VM server2 = (VM)this.serverVMs.get(1);
        DistributedLockDUnit.waitForBBFlag((String)NumReadLocks, (int)2, (long)0L);
        this.getLogWriter().info((Object)("Invoking write lock 1 on client 2 with PID: " + client2.getPid()));
        this.getLogWriter().info((Object)("Invoking write lock 2 on server 2 with PID: " + server2.getPid()));
        AsyncInvocation client2Async = client2.invokeAsync((Runnable)((Object)getWriteLock1));
        AsyncInvocation server2Async = server2.invokeAsync((Runnable)((Object)getWriteLock2));
        this.joinAsyncInvocation(client1Async, client1);
        this.joinAsyncInvocation(server1Async, server1);
        this.joinAsyncInvocation(client2Async, client2);
        this.joinAsyncInvocation(server2Async, server2);
    }

    public void testShortUUIDGeneration() throws Throwable {
        this.startVMs(2, 3);
        int numThreads = 20;
        int numKeysPerThread = 50000;
        int numKeysPerVM = 1000000;
        SerializableCallable genKeys = new SerializableCallable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public Object call() throws Exception {
                Thread[] threads = new Thread[20];
                final int[][] allArrays = new int[20][];
                final AtomicInteger threadIndex = new AtomicInteger(0);
                for (int index = 0; index < 20; ++index) {
                    threads[index] = new Thread(new Runnable(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        public void run() {
                            try {
                                int[][] nArray = allArrays;
                                synchronized (allArrays) {
                                    if (allArrays[0] == null) {
                                        allArrays.wait();
                                    }
                                    // ** MonitorExit[var1_1] (shouldn't be in output)
                                    int arrayIndex = threadIndex.getAndIncrement();
                                    int[] threadKeys = new int[50000];
                                    for (int keyIndex = 0; keyIndex < 50000; ++keyIndex) {
                                        threadKeys[keyIndex] = GemFireXDUtils.newShortUUID();
                                    }
                                    allArrays[arrayIndex] = threadKeys;
                                }
                            }
                            catch (Throwable t) {
                                DistributedLockDUnit.this.getLogWriter().error((Object)"unexpected exception", t);
                            }
                        }
                    });
                    threads[index].start();
                }
                int[][] index = allArrays;
                synchronized (allArrays) {
                    allArrays.notifyAll();
                    allArrays[0] = new int[0];
                    // ** MonitorExit[index] (shouldn't be in output)
                    for (int index2 = 0; index2 < 20; ++index2) {
                        threads[index2].join();
                    }
                    return allArrays;
                }
            }
        };
        AsyncInvocation async1 = this.getServerVM(1).invokeAsync((Callable)genKeys);
        AsyncInvocation async2 = this.getServerVM(2).invokeAsync((Callable)genKeys);
        AsyncInvocation async3 = this.getClientVM(2).invokeAsync((Callable)genKeys);
        AsyncInvocation async4 = this.getServerVM(3).invokeAsync((Callable)genKeys);
        int[][] arrays1 = (int[][])genKeys.call();
        int[][] arrays2 = (int[][])async1.getResult();
        int[][] arrays3 = (int[][])async2.getResult();
        int[][] arrays4 = (int[][])async3.getResult();
        int[][] arrays5 = (int[][])async4.getResult();
        System.gc();
        TIntObjectHashMap allKeys = new TIntObjectHashMap(5000000);
        DistributedLockDUnit.checkUniqueKeys(allKeys, arrays1, "currentVM");
        DistributedLockDUnit.checkUniqueKeys(allKeys, arrays2, this.getServerVM(1));
        DistributedLockDUnit.checkUniqueKeys(allKeys, arrays3, this.getServerVM(2));
        DistributedLockDUnit.checkUniqueKeys(allKeys, arrays4, this.getClientVM(2));
        DistributedLockDUnit.checkUniqueKeys(allKeys, arrays5, this.getServerVM(3));
        DistributedLockDUnit.assertEquals((int)5000000, (int)allKeys.size());
    }

    public void PERF_testFunctionAndMessageCompare() throws Exception {
        GfxdListResultCollector rc;
        int count;
        Properties props = new Properties();
        props.setProperty("log-level", "config");
        this.startClientVMs(1, 0, null, props);
        this.startServerVMs(4, 0, null, props);
        SerializableRunnable register = new SerializableRunnable(){

            public void run() {
                FunctionService.registerFunction((Function)new TestFunction());
                GfxdDataSerializable.registerSqlSerializable(TestFunctionMessage.class);
            }
        };
        DistributedLockDUnit.invokeInEveryVM((SerializableRunnable)register);
        register.run();
        this.clientSQLExecute(1, "create table T.Test1 (id int primary key, addr varchar(20))");
        for (int id = 1; id <= 200; ++id) {
            this.clientSQLExecute(1, "insert into T.Test1 values (" + id + ", 'addr" + id + "')");
        }
        int numTimesPerThread = 10000;
        int numThreads = 3;
        final PartitionedRegion pr = (PartitionedRegion)GemFireCacheImpl.getExisting().getRegion("/T/TEST1");
        final CyclicBarrier barrier = new CyclicBarrier(3);
        Runnable funcRun = new Runnable(){

            @Override
            public void run() {
                try {
                    barrier.await();
                    GfxdListResultCollector listRC = new GfxdListResultCollector();
                    for (int count = 1; count <= 10000; ++count) {
                        listRC.clearResults();
                        ResultCollector rc = FunctionService.onRegion((Region)pr).withCollector((ResultCollector)listRC).execute("TestFunction");
                        DistributedLockDUnit.this.checkResults(rc.getResult(), pr);
                    }
                }
                catch (Exception ex) {
                    DistributedSQLTestBase.fail("failing with exception", ex);
                }
            }
        };
        Runnable msgRun = new Runnable(){

            @Override
            public void run() {
                try {
                    barrier.await();
                    GfxdListResultCollector rc = new GfxdListResultCollector();
                    for (int count = 1; count <= 10000; ++count) {
                        rc.clearResults();
                        TestFunctionMessage msg = new TestFunctionMessage((ResultCollector<Object, Object>)rc, (LocalRegion)pr, null);
                        DistributedLockDUnit.this.checkResults(msg.executeFunction(), pr);
                    }
                }
                catch (Exception ex) {
                    DistributedSQLTestBase.fail("failing with exception", ex);
                }
            }
        };
        for (count = 1; count <= 10000; ++count) {
            rc = new GfxdListResultCollector();
            rc = FunctionService.onRegion((Region)pr).withCollector((ResultCollector)rc).execute("TestFunction");
            this.checkResults(rc.getResult(), pr);
        }
        long elapsed = this.runInThreads(3, funcRun, "function");
        this.getLogWriter().info((Object)("Number of invocations per second: " + 30000000L / elapsed));
        elapsed = this.runInThreads(3, funcRun, "function");
        this.getLogWriter().info((Object)("Number of invocations per second: " + 30000000L / elapsed));
        for (count = 1; count <= 10000; ++count) {
            rc = new GfxdListResultCollector();
            TestFunctionMessage msg = new TestFunctionMessage((ResultCollector<Object, Object>)rc, (LocalRegion)pr, null);
            this.checkResults(msg.executeFunction(), pr);
        }
        elapsed = this.runInThreads(3, msgRun, "function message");
        this.getLogWriter().info((Object)("Number of invocations per second: " + 30000000L / elapsed));
        elapsed = this.runInThreads(3, msgRun, "function message");
        this.getLogWriter().info((Object)("Number of invocations per second: " + 30000000L / elapsed));
    }

    private static void checkUniqueKeys(TIntObjectHashMap allKeys, int[][] arrays, Object vm) {
        for (int[] array : arrays) {
            long lastKey = -1L;
            for (int key : array) {
                long currentKey = (long)key & 0xFFFFFFFFL;
                if (currentKey > lastKey) {
                    Object oldVM = allKeys.put(key, vm);
                    if (oldVM != null) {
                        DistributedLockDUnit.getGlobalLogger().info((Object)("Failed due to duplicate value. Keys: " + Arrays.toString(array)));
                        throw new TestException("unexpected duplicate " + key + " for VM: " + vm + ", oldVM: " + oldVM);
                    }
                } else {
                    DistributedLockDUnit.getGlobalLogger().info((Object)("Failed due to decrease in value. Keys: " + Arrays.toString(array)));
                    throw new TestException("unexpected decrease in value=" + key + " currentValue=" + currentKey + " lastValue=" + lastKey + " VM: " + vm);
                }
                lastKey = currentKey;
            }
        }
    }

    static final class TestFunction
    implements Function {
        static final String ID = "TestFunction";

        TestFunction() {
        }

        public boolean hasResult() {
            return true;
        }

        public void execute(FunctionContext context) {
            context.getResultSender().lastResult((Object)((InternalRegionFunctionContext)context).getLocalBucketSet(GemFireCacheImpl.getExisting().getRegion("/T/TEST1")));
        }

        public String getId() {
            return ID;
        }

        public boolean optimizeForWrite() {
            return true;
        }

        public boolean isHA() {
            return false;
        }
    }

    public static final class TestFunctionMessage
    extends RegionExecutorMessage<Object> {
        public TestFunctionMessage() {
            super(true);
        }

        private TestFunctionMessage(TestFunctionMessage other) {
            super((RegionExecutorMessage)other);
        }

        public TestFunctionMessage(ResultCollector<Object, Object> collector, LocalRegion region, Set<Object> routingObjects) {
            super(collector, region, routingObjects, null, DistributionStats.enableClockStats);
        }

        protected void setArgsForMember(DistributedMember member, Set<DistributedMember> messageAwareMembers) {
        }

        protected RegionExecutorMessage<Object> clone() {
            return new TestFunctionMessage(this);
        }

        public boolean isHA() {
            return false;
        }

        public boolean optimizeForWrite() {
            return true;
        }

        public boolean withSecondaries() {
            return false;
        }

        protected void execute() {
            this.lastResult(this.getLocalBucketSet(GemFireCacheImpl.getExisting().getRegion("/T/TEST1")));
        }

        public byte getGfxdID() {
            return 76;
        }
    }

    static class AcquireReleaseLock
    extends SerializableRunnable {
        private final Object[] lockObjects;
        private final boolean readLock;
        private final int numReadUnlocks;
        private final long waitTimeMillis;
        private final long sleepTimeMillis;

        AcquireReleaseLock(String name, long waitMillis, long sleepMillis, boolean isReadLock, int expectedNumReadUnlocks, Object ... locks) {
            super(name);
            if (locks == null) {
                throw new IllegalArgumentException("Expected at least one object to lock");
            }
            this.lockObjects = locks;
            this.waitTimeMillis = waitMillis;
            this.sleepTimeMillis = sleepMillis;
            this.readLock = isReadLock;
            this.numReadUnlocks = expectedNumReadUnlocks;
        }

        public void run() throws CacheException {
            GfxdDRWLockService lockService = Misc.getMemStore().getDDLLockService();
            GfxdLocalLockService.DistributedLockOwner owner = lockService.newCurrentOwner();
            for (Object lockObject : this.lockObjects) {
                boolean gotLock = false;
                if (this.readLock) {
                    gotLock = lockService.readLock(lockObject, (Object)owner, this.waitTimeMillis);
                    DistributedTestBase.incBBFlag((String)DistributedLockDUnit.NumReadLocks);
                } else {
                    if (this.numReadUnlocks > 0) {
                        DistributedTestBase.waitForBBFlag((String)DistributedLockDUnit.NumReadLocks, (int)this.numReadUnlocks, (long)0L);
                    }
                    gotLock = lockService.writeLock(lockObject, (Object)owner, this.waitTimeMillis, 60000L);
                    if (this.numReadUnlocks > 0) {
                        DistributedTestBase.checkBBFlag((String)DistributedLockDUnit.NumReadUnlocks, (int)this.numReadUnlocks);
                    }
                }
                if (!gotLock) {
                    throw new CacheException("failed to obtain the lock"){};
                }
                DistributedTestBase.getGlobalLogger().info((Object)(this.toString() + " acquired the " + (this.readLock ? "read" : "write") + " lock on object " + lockObject + "; sleeping for " + this.sleepTimeMillis + "ms"));
                try {
                    Thread.sleep(this.sleepTimeMillis);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new TestException("unexpected interrupt", (Throwable)ie);
                }
                DistributedTestBase.getGlobalLogger().info((Object)(this.toString() + (this.readLock ? " read" : " write") + " lock on object " + lockObject + "; sleep complete"));
            }
            for (Object lockObject : this.lockObjects) {
                if (this.readLock) {
                    DistributedTestBase.incBBFlag((String)DistributedLockDUnit.NumReadUnlocks);
                    lockService.readUnlock(lockObject);
                    continue;
                }
                lockService.writeUnlock(lockObject, (Object)owner);
            }
        }
    }
}

