/*
 * Decompiled with CFR 0.152.
 */
package com.gemstone.gemfire.distributed.internal.streaming;

import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.ReplyException;
import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.streaming.StreamingOperation;
import com.gemstone.gemfire.internal.cache.Token;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.SerializableRunnable;
import dunit.VM;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import junit.framework.TestCase;

public class StreamingOperationManyDUnitTest
extends DistributedTestCase {
    protected static final int NUM_INTEGERS = 819200;

    public StreamingOperationManyDUnitTest(String name) {
        super(name);
    }

    public void testStreamingManyProvidersNoExceptions() throws Exception {
        Host host = Host.getHost(0);
        for (int i = 0; i < 4; ++i) {
            VM vm = host.getVM(i);
            vm.invoke(new SerializableRunnable("connect to system"){

                @Override
                public void run() {
                    TestCase.assertTrue((StreamingOperationManyDUnitTest.this.getSystem() != null ? 1 : 0) != 0);
                }
            });
        }
        Set setOfIds = this.getSystem().getDistributionManager().getOtherNormalDistributionManagerIds();
        StreamingOperationManyDUnitTest.assertEquals((int)4, (int)setOfIds.size());
        TestStreamingOperationManyProviderNoExceptions streamOp = new TestStreamingOperationManyProviderNoExceptions(this.getSystem());
        streamOp.getDataFromAll(setOfIds);
        StreamingOperationManyDUnitTest.assertTrue((boolean)streamOp.dataValidated);
    }

    public static final class TestRequestStreamingMessageManyProviderNoExceptions
    extends StreamingOperation.RequestStreamingMessage {
        private int nextInt = -10;
        private int count = 0;

        protected Object getNextReplyObject() throws ReplyException {
            if (++this.count > 819200) {
                return Token.END_OF_STREAM;
            }
            this.nextInt += 10;
            return new Integer(this.nextInt);
        }

        public int getDSFID() {
            return Integer.MAX_VALUE;
        }
    }

    public static class TestStreamingOperationManyProviderNoExceptions
    extends StreamingOperation {
        volatile boolean dataValidated = false;
        ConcurrentMap senderMap = new ConcurrentHashMap();
        ConcurrentMap senderNumChunksMap = new ConcurrentHashMap();
        private int numChunks = -1;

        public TestStreamingOperationManyProviderNoExceptions(InternalDistributedSystem sys) {
            super(sys);
        }

        protected DistributionMessage createRequestMessage(Set recipients, ReplyProcessor21 processor) {
            TestRequestStreamingMessageManyProviderNoExceptions msg = new TestRequestStreamingMessageManyProviderNoExceptions();
            msg.setRecipients(recipients);
            msg.processorId = processor == null ? 0 : processor.getProcessorId();
            return msg;
        }

        protected synchronized boolean processData(List objects, InternalDistributedMember sender, int sequenceNum, boolean lastInSequence) {
            Object prevValue;
            ConcurrentMap chunkMap2;
            LogWriter logger = this.sys.getLogWriter();
            ConcurrentMap<Integer, List<Object>> chunkMap = (ConcurrentHashMap<Integer, List>)this.senderMap.get(sender);
            if (chunkMap == null && (chunkMap2 = (ConcurrentMap)this.senderMap.putIfAbsent(sender, chunkMap = new ConcurrentHashMap<Integer, List>())) != null) {
                chunkMap = chunkMap2;
            }
            if ((prevValue = chunkMap.putIfAbsent(new Integer(sequenceNum), objects)) != null) {
                logger.severe("prevValue != null");
            }
            if (lastInSequence) {
                this.numChunks = sequenceNum + 1;
                prevValue = this.senderNumChunksMap.putIfAbsent(sender, new Integer(sequenceNum + 1));
                if (prevValue != null) {
                    logger.severe("prevValue != null");
                }
            }
            if (chunkMap.size() == this.numChunks && this.senderMap.size() == 4) {
                boolean completelyDone = true;
                for (Map.Entry entry : this.senderMap.entrySet()) {
                    InternalDistributedMember senderV = (InternalDistributedMember)entry.getKey();
                    ConcurrentMap chunkMapV = (ConcurrentMap)entry.getValue();
                    Integer numChunksV = (Integer)this.senderNumChunksMap.get(senderV);
                    if (chunkMapV == null) {
                        completelyDone = false;
                        break;
                    }
                    if (numChunksV == null) {
                        completelyDone = false;
                        break;
                    }
                    if (chunkMapV.size() == numChunksV.intValue()) continue;
                    completelyDone = false;
                    break;
                }
                if (completelyDone) {
                    this.validateData();
                }
            }
            return true;
        }

        private void validateData() {
            LogWriter logger = this.sys.getLogWriter();
            for (Map.Entry entry : this.senderMap.entrySet()) {
                ConcurrentMap chunkMap = (ConcurrentMap)entry.getValue();
                InternalDistributedMember sender = (InternalDistributedMember)entry.getKey();
                ArrayList[] arrayOfLists = new ArrayList[chunkMap.size()];
                int expectedInt = 0;
                for (Map.Entry entry2 : chunkMap.entrySet()) {
                    int seqNum = (Integer)entry2.getKey();
                    List objList = (List)entry2.getValue();
                    arrayOfLists[seqNum] = objList;
                }
                int count = 0;
                for (int i = 0; i < chunkMap.size(); ++i) {
                    for (Integer nextInteger : arrayOfLists[i]) {
                        if (nextInteger != expectedInt) {
                            logger.severe("nextInteger.intValue() != expectedInt");
                            return;
                        }
                        expectedInt += 10;
                        ++count;
                    }
                }
                if (count != 819200) {
                    logger.severe("found " + count + " integers from " + sender + " , expected " + 819200);
                    return;
                }
                logger.info("Received " + count + " integers from " + sender + " in " + chunkMap.size() + " chunks");
            }
            this.dataValidated = true;
        }
    }
}

