/*
 * Decompiled with CFR 0.152.
 */
package com.gemstone.gemfire.distributed.internal;

import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.MembershipListener;
import com.gemstone.gemfire.distributed.internal.MessageWithReply;
import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
import com.gemstone.gemfire.distributed.internal.SerialDistributionMessage;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import dunit.AsyncInvocation;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.SerializableRunnable;
import dunit.VM;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import junit.framework.TestCase;

public class LocalDistributionManagerTest
extends DistributedTestCase {
    protected static TestMembershipListener listener = null;

    public LocalDistributionManagerTest(String name) {
        super(name);
    }

    protected void createSystem(VM vm) {
        vm.invoke(new SerializableRunnable("Connect to distributed system"){

            @Override
            public void run() {
                Properties props = new Properties();
                LocalDistributionManagerTest.this.getSystem(props);
            }
        });
    }

    public void testCountDMs() {
        Host host = Host.getHost(0);
        VM vm0 = host.getVM(0);
        VM vm1 = host.getVM(1);
        int systemCount = 2;
        this.createSystem(vm0);
        vm0.invoke(new SerializableRunnable("Count DMs"){

            @Override
            public void run() {
                DM dm = LocalDistributionManagerTest.this.getSystem().getDistributionManager();
                TestCase.assertEquals((int)3, (int)dm.getNormalDistributionManagerIds().size());
            }
        });
        this.createSystem(vm1);
        vm1.invoke(new SerializableRunnable("Count DMs Again"){

            @Override
            public void run() {
                DM dm = LocalDistributionManagerTest.this.getSystem().getDistributionManager();
                TestCase.assertEquals((int)4, (int)dm.getNormalDistributionManagerIds().size());
            }
        });
    }

    public void testSendMessage() throws InterruptedException {
        Host host = Host.getHost(0);
        VM vm0 = host.getVM(0);
        VM vm1 = host.getVM(1);
        this.createSystem(vm0);
        this.createSystem(vm1);
        Thread.sleep(5000L);
        vm0.invoke(new SerializableRunnable("Send message"){

            @Override
            public void run() {
                DM dm = LocalDistributionManagerTest.this.getSystem().getDistributionManager();
                TestCase.assertEquals((String)("For DM " + dm.getId()), (int)3, (int)dm.getOtherNormalDistributionManagerIds().size());
                FirstMessage message = new FirstMessage();
                dm.putOutgoing((DistributionMessage)message);
            }
        });
        Thread.sleep(3000L);
        vm1.invoke(new SerializableRunnable("Was message received?"){

            @Override
            public void run() {
                DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

                    @Override
                    public boolean done() {
                        return FirstMessage.received;
                    }

                    @Override
                    public String description() {
                        return null;
                    }
                };
                DistributedTestCase.waitForCriterion(ev, 3000L, 200L, true);
                FirstMessage.received = false;
            }
        });
    }

    public void testReplyProcessor() throws InterruptedException {
        Host host = Host.getHost(0);
        VM vm0 = host.getVM(0);
        VM vm1 = host.getVM(1);
        this.createSystem(vm0);
        this.createSystem(vm1);
        Thread.sleep(2000L);
        vm0.invoke(new SerializableRunnable("Send request"){

            @Override
            public void run() {
                DM dm = LocalDistributionManagerTest.this.getSystem().getDistributionManager();
                int expected = dm.getOtherNormalDistributionManagerIds().size();
                TestCase.assertEquals((String)("For DM " + dm.getId()), (int)3, (int)expected);
                Response.totalResponses = 0;
                Request request = new Request();
                ReplyProcessor21 processor = new ReplyProcessor21(LocalDistributionManagerTest.this.getSystem(), (Collection)dm.getOtherNormalDistributionManagerIds());
                request.processorId = processor.getProcessorId();
                dm.putOutgoing((DistributionMessage)request);
                try {
                    processor.waitForReplies();
                }
                catch (Exception ex) {
                    DistributedTestCase.fail("While waiting for replies", ex);
                }
                TestCase.assertEquals((int)expected, (int)Response.totalResponses);
            }
        });
    }

    public void testMemberJoinedAndDeparted() throws InterruptedException {
        Host host = Host.getHost(0);
        VM vm0 = host.getVM(0);
        VM vm1 = host.getVM(1);
        this.createSystem(vm0);
        vm0.invoke(new SerializableRunnable("Install listener"){

            @Override
            public void run() {
                DM dm = LocalDistributionManagerTest.this.getSystem().getDistributionManager();
                listener = new TestMembershipListener();
                dm.addMembershipListener((MembershipListener)listener);
            }
        });
        this.createSystem(vm1);
        vm0.invoke(new SerializableRunnable("Verify member joining"){

            @Override
            public void run() {
                DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

                    @Override
                    public boolean done() {
                        return listener.memberJoined();
                    }

                    @Override
                    public String description() {
                        return null;
                    }
                };
                DistributedTestCase.waitForCriterion(ev, 3000L, 200L, true);
            }
        });
        vm1.invoke(new SerializableRunnable("Disconnect from system"){

            @Override
            public void run() {
                LocalDistributionManagerTest.this.getSystem().disconnect();
            }
        });
        vm0.invoke(new SerializableRunnable("Verify member departing"){

            @Override
            public void run() {
                DistributedTestCase.WaitCriterion ev = new DistributedTestCase.WaitCriterion(){

                    @Override
                    public boolean done() {
                        return listener.memberDeparted();
                    }

                    @Override
                    public String description() {
                        return null;
                    }
                };
                DistributedTestCase.waitForCriterion(ev, 3000L, 200L, true);
            }
        });
    }

    public void testMembersDepartWhileWaiting() throws InterruptedException {
        Host host = Host.getHost(0);
        VM vm0 = host.getVM(0);
        VM vm1 = host.getVM(1);
        this.createSystem(vm0);
        this.createSystem(vm1);
        Thread.sleep(3000L);
        AsyncInvocation ai0 = vm0.invokeAsync(new SerializableRunnable("Send message and wait"){

            @Override
            public void run() {
                DM dm = LocalDistributionManagerTest.this.getSystem().getDistributionManager();
                OnlyGFDMReply message = new OnlyGFDMReply();
                ReplyProcessor21 processor = new ReplyProcessor21(LocalDistributionManagerTest.this.getSystem(), (Collection)dm.getOtherNormalDistributionManagerIds());
                message.processorId = processor.getProcessorId();
                dm.putOutgoing((DistributionMessage)message);
                try {
                    processor.waitForReplies();
                }
                catch (Exception ex) {
                    DistributedTestCase.fail("While waiting for replies", ex);
                }
            }
        });
        Thread.sleep(3000L);
        vm1.invoke(new SerializableRunnable("Disconnect from system"){

            @Override
            public void run() {
                LocalDistributionManagerTest.this.getSystem().disconnect();
            }
        });
        DistributedTestCase.join(ai0, 30000L, LocalDistributionManagerTest.getLogWriter());
        if (ai0.exceptionOccurred()) {
            LocalDistributionManagerTest.fail("got exception", ai0.getException());
        }
    }

    public static class OnlyGFDMReply
    extends SerialDistributionMessage
    implements MessageWithReply {
        protected int processorId;

        public int getProcessorId() {
            return this.processorId;
        }

        public void process(DistributionManager dm) {
        }

        public int getDSFID() {
            return Integer.MAX_VALUE;
        }

        public void toData(DataOutput out) throws IOException {
            super.toData(out);
            out.writeInt(this.processorId);
        }

        public void fromData(DataInput in) throws ClassNotFoundException, IOException {
            super.fromData(in);
            this.processorId = in.readInt();
        }

        public String toString() {
            return "Only GFDM replies with processor " + this.processorId;
        }
    }

    static class TestMembershipListener
    implements MembershipListener {
        private boolean joined = false;
        private boolean departed = false;

        TestMembershipListener() {
        }

        public void memberJoined(InternalDistributedMember id) {
            this.joined = true;
        }

        public void memberDeparted(InternalDistributedMember id, boolean crashed) {
            this.departed = true;
        }

        public boolean memberJoined() {
            boolean b = this.joined;
            this.joined = false;
            return b;
        }

        public void quorumLost(Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {
        }

        public void memberSuspect(InternalDistributedMember id, InternalDistributedMember whoSuspected) {
        }

        public boolean memberDeparted() {
            boolean b = this.departed;
            this.departed = false;
            return b;
        }
    }

    public static class Response
    extends SerialDistributionMessage {
        static int totalResponses = 0;
        int processorId;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void process(DistributionManager dm) {
            ReplyProcessor21 processor = ReplyProcessor21.getProcessor((int)this.processorId);
            TestCase.assertNotNull((String)"Null processor!", (Object)processor);
            Class<Response> clazz = Response.class;
            synchronized (Response.class) {
                ++totalResponses;
                // ** MonitorExit[var3_3] (shouldn't be in output)
                processor.process((DistributionMessage)this);
                return;
            }
        }

        public int getDSFID() {
            return Integer.MAX_VALUE;
        }

        public void toData(DataOutput out) throws IOException {
            super.toData(out);
            out.writeInt(this.processorId);
        }

        public void fromData(DataInput in) throws ClassNotFoundException, IOException {
            super.fromData(in);
            this.processorId = in.readInt();
        }

        public String toString() {
            return "Response with processor " + this.processorId;
        }
    }

    public static class Request
    extends SerialDistributionMessage
    implements MessageWithReply {
        int processorId;

        public int getProcessorId() {
            return this.processorId;
        }

        public void process(DistributionManager dm) {
            Response response = new Response();
            response.processorId = this.processorId;
            response.setRecipient(this.getSender());
            dm.putOutgoing((DistributionMessage)response);
        }

        public int getDSFID() {
            return Integer.MAX_VALUE;
        }

        public void toData(DataOutput out) throws IOException {
            super.toData(out);
            out.writeInt(this.processorId);
        }

        public void fromData(DataInput in) throws ClassNotFoundException, IOException {
            super.fromData(in);
            this.processorId = in.readInt();
        }

        public String toString() {
            return "Request with processor " + this.processorId;
        }
    }

    public static class FirstMessage
    extends SerialDistributionMessage {
        public static boolean received = false;

        public void process(DistributionManager dm) {
            received = true;
        }

        public int getDSFID() {
            return Integer.MAX_VALUE;
        }
    }
}

