/*
 * Decompiled with CFR 0.152.
 */
package com.gemstone.gemfire.internal.cache.partitioned;

import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.PartitionAttributes;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.ReplyException;
import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.Token;
import com.gemstone.gemfire.internal.cache.partitioned.StreamingPartitionOperation;
import dunit.Host;
import dunit.SerializableRunnable;
import dunit.VM;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import junit.framework.TestCase;

public class StreamingPartitionOperationManyDUnitTest
extends CacheTestCase {
    CacheSerializableRunnable createPrRegionWithDS_DACK = new CacheSerializableRunnable("createPrRegionWithDS"){

        @Override
        public void run2() throws CacheException {
            Cache cache = StreamingPartitionOperationManyDUnitTest.this.getCache();
            AttributesFactory attr = new AttributesFactory();
            PartitionAttributesFactory paf = new PartitionAttributesFactory();
            paf.setTotalNumBuckets(5);
            PartitionAttributes prAttr = paf.create();
            attr.setPartitionAttributes(prAttr);
            RegionAttributes regionAttribs = attr.create();
            cache.createRegion("PR1", regionAttribs);
        }
    };
    protected static final int NUM_INTEGERS = 819200;

    public StreamingPartitionOperationManyDUnitTest(String name) {
        super(name);
    }

    public void testStreamingManyProvidersNoExceptions() throws Exception {
        Host host = Host.getHost(0);
        for (int i = 0; i < 4; ++i) {
            VM vm = host.getVM(i);
            vm.invoke(new SerializableRunnable("connect to system"){

                @Override
                public void run() {
                    TestCase.assertTrue((StreamingPartitionOperationManyDUnitTest.this.getSystem() != null ? 1 : 0) != 0);
                }
            });
            vm.invoke(this.createPrRegionWithDS_DACK);
        }
        this.createPrRegionWithDS_DACK.run2();
        int regionId = ((PartitionedRegion)this.getCache().getRegion("PR1")).getPRId();
        Set setOfIds = this.getSystem().getDistributionManager().getOtherNormalDistributionManagerIds();
        StreamingPartitionOperationManyDUnitTest.assertEquals((int)4, (int)setOfIds.size());
        TestStreamingPartitionOperationManyProviderNoExceptions streamOp = new TestStreamingPartitionOperationManyProviderNoExceptions(this.getSystem(), regionId);
        streamOp.getPartitionedDataFrom(setOfIds);
        StreamingPartitionOperationManyDUnitTest.assertTrue((String)"data did not validate correctly: see log for severe message", (boolean)streamOp.dataValidated);
    }

    public static final class TestStreamingPartitionMessageManyProviderNoExceptions
    extends StreamingPartitionOperation.StreamingPartitionMessage {
        private int nextInt = -10;
        private int count = 0;

        public TestStreamingPartitionMessageManyProviderNoExceptions() {
        }

        public TestStreamingPartitionMessageManyProviderNoExceptions(Set recipients, int regionId, ReplyProcessor21 processor) {
            super(recipients, regionId, processor);
        }

        protected Object getNextReplyObject(PartitionedRegion pr) throws ReplyException {
            if (++this.count > 819200) {
                return Token.END_OF_STREAM;
            }
            this.nextInt += 10;
            return new Integer(this.nextInt);
        }

        public int getDSFID() {
            return Integer.MAX_VALUE;
        }
    }

    public static class TestStreamingPartitionOperationManyProviderNoExceptions
    extends StreamingPartitionOperation {
        volatile boolean dataValidated = false;
        ConcurrentMap senderMap = new ConcurrentHashMap();
        ConcurrentMap senderNumChunksMap = new ConcurrentHashMap();

        public TestStreamingPartitionOperationManyProviderNoExceptions(InternalDistributedSystem sys, int regionId) {
            super(sys, regionId);
        }

        protected DistributionMessage createRequestMessage(Set recipients, ReplyProcessor21 processor) {
            TestStreamingPartitionMessageManyProviderNoExceptions msg = new TestStreamingPartitionMessageManyProviderNoExceptions(recipients, this.regionId, processor);
            return msg;
        }

        protected synchronized boolean processData(List objects, InternalDistributedMember sender, int sequenceNum, boolean lastInSequence) {
            Integer numChunksI;
            Object prevValue;
            ConcurrentMap chunkMap2;
            LogWriter logger = this.sys.getLogWriter();
            int numChunks = -1;
            ConcurrentMap chunkMap = (ConcurrentHashMap<Integer, List>)this.senderMap.get(sender);
            if (chunkMap == null && (chunkMap2 = (ConcurrentMap)this.senderMap.putIfAbsent(sender, chunkMap = new ConcurrentHashMap<Integer, List>())) != null) {
                chunkMap = chunkMap2;
            }
            if ((prevValue = chunkMap.putIfAbsent(new Integer(sequenceNum), objects)) != null) {
                logger.severe("prevValue != null");
            }
            if (lastInSequence && (prevValue = this.senderNumChunksMap.putIfAbsent(sender, new Integer(sequenceNum + 1))) != null) {
                logger.severe("prevValue != null");
            }
            if ((numChunksI = (Integer)this.senderNumChunksMap.get(sender)) != null) {
                numChunks = numChunksI;
            }
            if (chunkMap.size() == numChunks && this.senderMap.size() == 4) {
                boolean completelyDone = true;
                for (Map.Entry entry : this.senderMap.entrySet()) {
                    InternalDistributedMember senderV = (InternalDistributedMember)entry.getKey();
                    ConcurrentMap chunkMapV = (ConcurrentMap)entry.getValue();
                    Integer numChunksV = (Integer)this.senderNumChunksMap.get(senderV);
                    if (chunkMapV != null && numChunksV != null && chunkMapV.size() == numChunksV.intValue()) continue;
                    completelyDone = false;
                }
                if (completelyDone) {
                    this.validateData();
                }
            }
            return true;
        }

        private void validateData() {
            LogWriter logger = this.sys.getLogWriter();
            logger.info("Validating data...");
            try {
                for (Map.Entry entry : this.senderMap.entrySet()) {
                    ConcurrentMap chunkMap = (ConcurrentMap)entry.getValue();
                    InternalDistributedMember sender = (InternalDistributedMember)entry.getKey();
                    ArrayList[] arrayOfLists = new ArrayList[chunkMap.size()];
                    int expectedInt = 0;
                    for (Map.Entry entry2 : chunkMap.entrySet()) {
                        int seqNum = (Integer)entry2.getKey();
                        List objList = (List)entry2.getValue();
                        arrayOfLists[seqNum] = objList;
                    }
                    int count = 0;
                    for (int i = 0; i < chunkMap.size(); ++i) {
                        for (Integer nextInteger : arrayOfLists[i]) {
                            if (nextInteger != expectedInt) {
                                logger.severe("nextInteger.intValue() != expectedInt");
                                return;
                            }
                            expectedInt += 10;
                            ++count;
                        }
                    }
                    if (count != 819200) {
                        logger.severe("found " + count + " integers from " + sender + " , expected " + 819200);
                        return;
                    }
                    logger.info("Received " + count + " integers from " + sender + " in " + chunkMap.size() + " chunks");
                }
            }
            catch (Exception e) {
                logger.severe("Validation exception", (Throwable)e);
            }
            logger.info("Successful validation");
            this.dataValidated = true;
        }
    }
}

