package org.opendaylight.controller.cluster.datastore.entityownership;

import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.dispatch.Dispatchers;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.AdditionalMatchers;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.ShardDataTree;
import org.opendaylight.controller.cluster.datastore.ShardTestKit;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategy;
import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;

/* loaded from: input_file:org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.class */
public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
    private static final String ENTITY_TYPE = "test type";
    private static final YangInstanceIdentifier ENTITY_ID1 = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
    private static final YangInstanceIdentifier ENTITY_ID2 = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
    private static final YangInstanceIdentifier ENTITY_ID3 = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
    private static final YangInstanceIdentifier ENTITY_ID4 = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
    private static final YangInstanceIdentifier ENTITY_ID5 = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
    private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
    private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
    private static final String LOCAL_MEMBER_NAME = "member-1";
    private final DatastoreContext.Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
    private final TestActorFactory actorFactory = new TestActorFactory(getSystem());

    /* loaded from: input_file:org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest$MockFollower.class */
    public static class MockFollower extends UntypedActor {
        volatile boolean grantVote;
        volatile boolean dropAppendEntries;
        private final String myId;

        public MockFollower(String str) {
            this(str, true);
        }

        public MockFollower(String str, boolean z) {
            this.myId = str;
            this.grantVote = z;
        }

        public void onReceive(Object obj) {
            if (obj instanceof RequestVote) {
                if (this.grantVote) {
                    getSender().tell(new RequestVoteReply(((RequestVote) obj).getTerm(), true), getSelf());
                }
            } else {
                if (!(obj instanceof AppendEntries) || this.dropAppendEntries) {
                    return;
                }
                AppendEntries appendEntries = (AppendEntries) obj;
                long leaderCommit = appendEntries.getLeaderCommit();
                if (appendEntries.getEntries().size() > 0) {
                    Iterator it = appendEntries.getEntries().iterator();
                    while (it.hasNext()) {
                        leaderCommit = ((ReplicatedLogEntry) it.next()).getIndex();
                    }
                }
                getSender().tell(new AppendEntriesReply(this.myId, appendEntries.getTerm(), true, leaderCommit, appendEntries.getTerm(), (short) 3), getSelf());
            }
        }
    }

    /* loaded from: input_file:org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest$MockLeader.class */
    public static class MockLeader extends UntypedActor {
        volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
        List<Modification> receivedModifications = new ArrayList();
        volatile boolean sendReply = true;
        volatile long delay;

        /* JADX WARN: Multi-variable type inference failed */
        public void onReceive(Object obj) {
            if (obj instanceof BatchedModifications) {
                if (this.delay > 0) {
                    Uninterruptibles.sleepUninterruptibly(this.delay, TimeUnit.MILLISECONDS);
                }
                if (!this.sendReply) {
                    this.sendReply = true;
                    return;
                }
                BatchedModifications batchedModifications = (BatchedModifications) obj;
                synchronized (this.receivedModifications) {
                    for (int i = 0; i < batchedModifications.getModifications().size(); i++) {
                        this.receivedModifications.add(batchedModifications.getModifications().get(i));
                        this.modificationsReceived.countDown();
                    }
                }
                getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
            }
        }

        List<Modification> getAndClearReceivedModifications() {
            ArrayList arrayList;
            synchronized (this.receivedModifications) {
                arrayList = new ArrayList(this.receivedModifications);
                this.receivedModifications.clear();
            }
            return arrayList;
        }
    }

    /* loaded from: input_file:org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest$TestEntityOwnershipShard.class */
    public static class TestEntityOwnershipShard extends EntityOwnershipShard {
        TestEntityOwnershipShard(ShardIdentifier shardIdentifier, Map<String, String> map, DatastoreContext datastoreContext) {
            super(newBuilder().id(shardIdentifier).peerAddresses(map).datastoreContext(datastoreContext).schemaContext(EntityOwnershipShardTest.SCHEMA_CONTEXT).localMemberName(EntityOwnershipShardTest.LOCAL_MEMBER_NAME));
        }

        public void onReceiveCommand(Object obj) throws Exception {
            if (obj instanceof ElectionTimeout) {
                return;
            }
            super.onReceiveCommand(obj);
        }

        public /* bridge */ /* synthetic */ void scheduleOwnerSelection(YangInstanceIdentifier yangInstanceIdentifier, Collection collection, EntityOwnerSelectionStrategy entityOwnerSelectionStrategy) {
            super.scheduleOwnerSelection(yangInstanceIdentifier, collection, entityOwnerSelectionStrategy);
        }
    }

    @After
    public void tearDown() {
        this.actorFactory.close();
    }

    @Test
    public void testOnRegisterCandidateLocal() throws Exception {
        ShardTestKit shardTestKit = new ShardTestKit(getSystem());
        TestActorRef<EntityOwnershipShard> createTestActor = this.actorFactory.createTestActor(newShardProps());
        ShardTestKit.waitUntilLeader(createTestActor);
        YangInstanceIdentifier yangInstanceIdentifier = ENTITY_ID1;
        createTestActor.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, yangInstanceIdentifier)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        verifyCommittedEntityCandidate(createTestActor, ENTITY_TYPE, yangInstanceIdentifier, LOCAL_MEMBER_NAME);
        verifyOwner(createTestActor, ENTITY_TYPE, yangInstanceIdentifier, LOCAL_MEMBER_NAME);
    }

    @Test
    public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
        ShardTestKit shardTestKit = new ShardTestKit(getSystem());
        this.dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2L);
        String shardIdentifier = newShardId("follower").toString();
        TestActorRef createTestActor = this.actorFactory.createTestActor(Props.create(MockFollower.class, new Object[]{shardIdentifier, false}).withDispatcher(Dispatchers.DefaultDispatcherId()), shardIdentifier);
        TestActorRef<EntityOwnershipShard> createTestActor2 = this.actorFactory.createTestActor(newShardProps((Map<String, String>) ImmutableMap.builder().put(shardIdentifier, createTestActor.path().toString()).build()).withDispatcher(Dispatchers.DefaultDispatcherId()));
        YangInstanceIdentifier yangInstanceIdentifier = ENTITY_ID1;
        createTestActor2.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, yangInstanceIdentifier)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        createTestActor.underlyingActor().grantVote = true;
        verifyCommittedEntityCandidate(createTestActor2, ENTITY_TYPE, yangInstanceIdentifier, LOCAL_MEMBER_NAME);
        verifyOwner(createTestActor2, ENTITY_TYPE, yangInstanceIdentifier, LOCAL_MEMBER_NAME);
    }

    @Test
    public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
        ShardTestKit shardTestKit = new ShardTestKit(getSystem());
        this.dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2L).shardTransactionCommitTimeoutInSeconds(1);
        String shardIdentifier = newShardId("follower").toString();
        TestActorRef createTestActor = this.actorFactory.createTestActor(Props.create(MockFollower.class, new Object[]{shardIdentifier}).withDispatcher(Dispatchers.DefaultDispatcherId()), shardIdentifier);
        MockFollower underlyingActor = createTestActor.underlyingActor();
        underlyingActor.dropAppendEntries = true;
        TestActorRef<EntityOwnershipShard> createTestActor2 = this.actorFactory.createTestActor(newShardProps((Map<String, String>) ImmutableMap.builder().put(shardIdentifier, createTestActor.path().toString()).build()).withDispatcher(Dispatchers.DefaultDispatcherId()));
        ShardTestKit.waitUntilLeader(createTestActor2);
        YangInstanceIdentifier yangInstanceIdentifier = ENTITY_ID1;
        createTestActor2.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, yangInstanceIdentifier)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.SECONDS);
        underlyingActor.dropAppendEntries = false;
        verifyCommittedEntityCandidate(createTestActor2, ENTITY_TYPE, yangInstanceIdentifier, LOCAL_MEMBER_NAME);
        verifyOwner(createTestActor2, ENTITY_TYPE, yangInstanceIdentifier, LOCAL_MEMBER_NAME);
    }

    @Test
    public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
        ShardTestKit shardTestKit = new ShardTestKit(getSystem());
        this.dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2L).shardIsolatedLeaderCheckIntervalInMillis(50);
        String shardIdentifier = newShardId("follower").toString();
        TestActorRef createTestActor = this.actorFactory.createTestActor(Props.create(MockFollower.class, new Object[]{shardIdentifier}).withDispatcher(Dispatchers.DefaultDispatcherId()), shardIdentifier);
        MockFollower underlyingActor = createTestActor.underlyingActor();
        TestActorRef<EntityOwnershipShard> createTestActor2 = this.actorFactory.createTestActor(newShardProps((Map<String, String>) ImmutableMap.builder().put(shardIdentifier, createTestActor.path().toString()).build()).withDispatcher(Dispatchers.DefaultDispatcherId()));
        ShardTestKit.waitUntilLeader(createTestActor2);
        underlyingActor.dropAppendEntries = true;
        Uninterruptibles.sleepUninterruptibly(500L, TimeUnit.MILLISECONDS);
        YangInstanceIdentifier yangInstanceIdentifier = ENTITY_ID1;
        createTestActor2.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, yangInstanceIdentifier)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        underlyingActor.dropAppendEntries = false;
        verifyCommittedEntityCandidate(createTestActor2, ENTITY_TYPE, yangInstanceIdentifier, LOCAL_MEMBER_NAME);
        verifyOwner(createTestActor2, ENTITY_TYPE, yangInstanceIdentifier, LOCAL_MEMBER_NAME);
    }

    @Test
    public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
        ShardTestKit shardTestKit = new ShardTestKit(getSystem());
        this.dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2L).shardBatchedModificationCount(5);
        String shardIdentifier = newShardId("leader").toString();
        TestActorRef createTestActor = this.actorFactory.createTestActor(Props.create(MockLeader.class, new Object[0]).withDispatcher(Dispatchers.DefaultDispatcherId()), shardIdentifier);
        TestActorRef createTestActor2 = this.actorFactory.createTestActor(Props.create(TestEntityOwnershipShard.class, new Object[]{newShardId(LOCAL_MEMBER_NAME), ImmutableMap.builder().put(shardIdentifier, createTestActor.path().toString()).build(), this.dataStoreContextBuilder.build()}).withDispatcher(Dispatchers.DefaultDispatcherId()));
        createTestActor2.tell(new AppendEntries(1L, shardIdentifier, -1L, -1L, Collections.emptyList(), -1L, -1L, (short) 3), createTestActor);
        createTestActor2.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        MockLeader underlyingActor = createTestActor.underlyingActor();
        Assert.assertEquals("Leader received BatchedModifications", true, Boolean.valueOf(Uninterruptibles.awaitUninterruptibly(underlyingActor.modificationsReceived, 5L, TimeUnit.SECONDS)));
        verifyBatchedEntityCandidate(underlyingActor.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        underlyingActor.modificationsReceived = new CountDownLatch(1);
        underlyingActor.sendReply = false;
        createTestActor2.tell(this.dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
        createTestActor2.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID2)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        Assert.assertEquals("Leader received BatchedModifications", true, Boolean.valueOf(Uninterruptibles.awaitUninterruptibly(underlyingActor.modificationsReceived, 5L, TimeUnit.SECONDS)));
        verifyBatchedEntityCandidate(underlyingActor.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME);
        underlyingActor.delay = 4L;
        underlyingActor.modificationsReceived = new CountDownLatch(100);
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i <= 100; i++) {
            YangInstanceIdentifier of = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
            arrayList.add(of);
            createTestActor2.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, of)), shardTestKit.getRef());
        }
        Assert.assertEquals("Leader received BatchedModifications", true, Boolean.valueOf(Uninterruptibles.awaitUninterruptibly(underlyingActor.modificationsReceived, 10L, TimeUnit.SECONDS)));
        Uninterruptibles.sleepUninterruptibly(500L, TimeUnit.MILLISECONDS);
        List<Modification> andClearReceivedModifications = underlyingActor.getAndClearReceivedModifications();
        for (int i2 = 0; i2 < 100; i2++) {
            verifyBatchedEntityCandidate(andClearReceivedModifications.get(i2), ENTITY_TYPE, (YangInstanceIdentifier) arrayList.get(i2), LOCAL_MEMBER_NAME);
        }
        Assert.assertEquals("# modifications received", 100, andClearReceivedModifications.size());
    }

    @Test
    public void testOnUnregisterCandidateLocal() throws Exception {
        ShardTestKit shardTestKit = new ShardTestKit(getSystem());
        TestActorRef<EntityOwnershipShard> createTestActor = this.actorFactory.createTestActor(newShardProps());
        ShardTestKit.waitUntilLeader(createTestActor);
        Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
        createTestActor.tell(new RegisterCandidateLocal(entity), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        verifyCommittedEntityCandidate(createTestActor, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        verifyOwner(createTestActor, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        createTestActor.tell(new UnregisterCandidateLocal(entity), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        verifyOwner(createTestActor, ENTITY_TYPE, ENTITY_ID1, "");
        createTestActor.tell(new RegisterCandidateLocal(entity), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        verifyCommittedEntityCandidate(createTestActor, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        verifyOwner(createTestActor, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
    }

    @Test
    public void testOwnershipChanges() throws Exception {
        ShardTestKit shardTestKit = new ShardTestKit(getSystem());
        TestActorRef<EntityOwnershipShard> createTestActor = this.actorFactory.createTestActor(newShardProps());
        ShardTestKit.waitUntilLeader(createTestActor);
        Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
        ShardDataTree dataStore = createTestActor.underlyingActor().getDataStore();
        writeNode(EntityOwnersModel.ENTITY_OWNERS_PATH, EntityOwnersModel.entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, "remoteMember1"), dataStore);
        createTestActor.tell(new RegisterCandidateLocal(entity), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        verifyCommittedEntityCandidate(createTestActor, ENTITY_TYPE, ENTITY_ID1, "remoteMember1");
        verifyCommittedEntityCandidate(createTestActor, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        verifyOwner(createTestActor, ENTITY_TYPE, ENTITY_ID1, "remoteMember1");
        writeNode(EntityOwnersModel.ENTITY_OWNERS_PATH, EntityOwnersModel.entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, "remoteMember2"), dataStore);
        verifyCommittedEntityCandidate(createTestActor, ENTITY_TYPE, ENTITY_ID1, "remoteMember2");
        Uninterruptibles.sleepUninterruptibly(500L, TimeUnit.MILLISECONDS);
        verifyOwner(createTestActor, ENTITY_TYPE, ENTITY_ID1, "remoteMember1");
        deleteNode(EntityOwnersModel.candidatePath(ENTITY_TYPE, ENTITY_ID1, "remoteMember2"), dataStore);
        verifyEntityCandidateRemoved(createTestActor, ENTITY_TYPE, ENTITY_ID1, "remoteMember2");
        Uninterruptibles.sleepUninterruptibly(500L, TimeUnit.MILLISECONDS);
        verifyOwner(createTestActor, ENTITY_TYPE, ENTITY_ID1, "remoteMember1");
        deleteNode(EntityOwnersModel.candidatePath(ENTITY_TYPE, ENTITY_ID1, "remoteMember1"), dataStore);
        verifyEntityCandidateRemoved(createTestActor, ENTITY_TYPE, ENTITY_ID1, "remoteMember1");
        verifyOwner(createTestActor, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        writeNode(EntityOwnersModel.ENTITY_OWNERS_PATH, EntityOwnersModel.entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, "remoteMember2"), dataStore);
        verifyCommittedEntityCandidate(createTestActor, ENTITY_TYPE, ENTITY_ID1, "remoteMember2");
        Uninterruptibles.sleepUninterruptibly(500L, TimeUnit.MILLISECONDS);
        verifyOwner(createTestActor, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        createTestActor.tell(new UnregisterCandidateLocal(entity), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        verifyEntityCandidateRemoved(createTestActor, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        verifyOwner(createTestActor, ENTITY_TYPE, ENTITY_ID1, "remoteMember2");
    }

    @Test
    public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
        ShardTestKit shardTestKit = new ShardTestKit(getSystem());
        this.dataStoreContextBuilder.shardHeartbeatIntervalInMillis(500).shardElectionTimeoutFactor(10000L);
        ShardIdentifier newShardId = newShardId(LOCAL_MEMBER_NAME);
        ShardIdentifier newShardId2 = newShardId("peerMember1");
        ShardIdentifier newShardId3 = newShardId("peerMember2");
        TestActorRef createTestActor = this.actorFactory.createTestActor(newShardProps(newShardId2, ImmutableMap.builder().put(newShardId.toString(), "").put(newShardId3.toString(), "").build(), "peerMember1", EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), newShardId2.toString());
        TestActorRef createTestActor2 = this.actorFactory.createTestActor(newShardProps(newShardId3, ImmutableMap.builder().put(newShardId.toString(), "").put(newShardId2.toString(), "").build(), "peerMember2", EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), newShardId3.toString());
        TestActorRef<EntityOwnershipShard> createTestActor3 = this.actorFactory.createTestActor(newShardProps(newShardId, ImmutableMap.builder().put(newShardId2.toString(), createTestActor.path().toString()).put(newShardId3.toString(), createTestActor2.path().toString()).build(), LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), newShardId.toString());
        createTestActor3.tell(new ElectionTimeout(), createTestActor3);
        ShardTestKit.waitUntilLeader(createTestActor3);
        createTestActor3.tell(new PeerDown("peerMember2", newShardId3.toString()), ActorRef.noSender());
        createTestActor3.tell(new PeerUp("peerMember2", newShardId3.toString()), ActorRef.noSender());
        createTestActor3.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        verifyCommittedEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        createTestActor2.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        commitModification(createTestActor3, EntityOwnersModel.entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, "peerMember2"), shardTestKit);
        verifyCommittedEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID1, "peerMember2");
        createTestActor.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        commitModification(createTestActor3, EntityOwnersModel.entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, "peerMember1"), shardTestKit);
        verifyCommittedEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID1, "peerMember1");
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        createTestActor2.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID2)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        commitModification(createTestActor3, EntityOwnersModel.entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, "peerMember2"), shardTestKit);
        verifyCommittedEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID2, "peerMember2");
        createTestActor.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID2)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        commitModification(createTestActor3, EntityOwnersModel.entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, "peerMember1"), shardTestKit);
        verifyCommittedEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID2, "peerMember1");
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID2, "peerMember2");
        createTestActor2.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID3)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        commitModification(createTestActor3, EntityOwnersModel.entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, "peerMember2"), shardTestKit);
        verifyCommittedEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID3, "peerMember2");
        createTestActor3.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID3)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        verifyCommittedEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
        createTestActor.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID3)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        commitModification(createTestActor3, EntityOwnersModel.entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, "peerMember1"), shardTestKit);
        verifyCommittedEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID3, "peerMember1");
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID3, "peerMember2");
        createTestActor2.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID4)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        commitModification(createTestActor3, EntityOwnersModel.entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID4, "peerMember2"), shardTestKit);
        verifyCommittedEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID4, "peerMember2");
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID4, "peerMember2");
        createTestActor.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID5)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        commitModification(createTestActor3, EntityOwnersModel.entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID5, "peerMember1"), shardTestKit);
        verifyCommittedEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID5, "peerMember1");
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID5, "peerMember1");
        shardTestKit.watch(createTestActor2);
        createTestActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
        shardTestKit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
        shardTestKit.unwatch(createTestActor2);
        createTestActor3.tell(new PeerDown("peerMember2", newShardId3.toString()), ActorRef.noSender());
        createTestActor3.tell(new PeerDown("peerMember2", newShardId3.toString()), ActorRef.noSender());
        createTestActor.tell(new PeerDown("peerMember2", newShardId3.toString()), ActorRef.noSender());
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID4, "");
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID2, "peerMember1");
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        verifyNoEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID1, "peerMember2");
        verifyNoEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID2, "peerMember2");
        verifyNoEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID3, "peerMember2");
        verifyNoEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID4, "peerMember2");
        TestActorRef createTestActor4 = this.actorFactory.createTestActor(newShardProps(newShardId3, ImmutableMap.builder().put(newShardId.toString(), "").put(newShardId2.toString(), "").build(), "peerMember2", EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), newShardId3.toString());
        createTestActor3.tell(new PeerUp("peerMember2", newShardId3.toString()), ActorRef.noSender());
        createTestActor3.tell(new PeerUp("peerMember2", newShardId3.toString()), ActorRef.noSender());
        createTestActor.tell(new PeerUp("peerMember2", newShardId3.toString()), ActorRef.noSender());
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID2, "peerMember1");
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID4, "");
        createTestActor4.tell(new PeerAddressResolved(newShardId2.toString(), createTestActor.path().toString()), ActorRef.noSender());
        createTestActor4.tell(new PeerAddressResolved(newShardId.toString(), createTestActor3.path().toString()), ActorRef.noSender());
        createTestActor4.tell(new PeerUp(LOCAL_MEMBER_NAME, newShardId.toString()), ActorRef.noSender());
        createTestActor4.tell(new PeerUp("peerMember1", newShardId2.toString()), ActorRef.noSender());
        createTestActor4.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        createTestActor4.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID2)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        createTestActor4.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID3)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        commitModification(createTestActor3, EntityOwnersModel.entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, "peerMember2"), shardTestKit);
        commitModification(createTestActor3, EntityOwnersModel.entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, "peerMember2"), shardTestKit);
        commitModification(createTestActor3, EntityOwnersModel.entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, "peerMember2"), shardTestKit);
        verifyCommittedEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID1, "peerMember2");
        verifyCommittedEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID2, "peerMember2");
        verifyCommittedEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID3, "peerMember2");
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID2, "peerMember1");
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
        createTestActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
        createTestActor3.tell(new PeerDown("peerMember1", newShardId2.toString()), ActorRef.noSender());
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID2, "peerMember2");
        verifyOwner((TestActorRef<EntityOwnershipShard>) createTestActor4, ENTITY_TYPE, ENTITY_ID4, "");
        verifyOwner((TestActorRef<EntityOwnershipShard>) createTestActor4, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
        verifyOwner((TestActorRef<EntityOwnershipShard>) createTestActor4, ENTITY_TYPE, ENTITY_ID2, "peerMember2");
        verifyOwner((TestActorRef<EntityOwnershipShard>) createTestActor4, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        TestActorRef createTestActor5 = this.actorFactory.createTestActor(newShardProps(newShardId2, ImmutableMap.builder().put(newShardId.toString(), "").put(newShardId3.toString(), "").build(), "peerMember1", EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), newShardId2.toString());
        createTestActor3.tell(new PeerUp("peerMember1", newShardId2.toString()), ActorRef.noSender());
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID4, "");
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID2, "peerMember2");
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        verifyOwner((TestActorRef<EntityOwnershipShard>) createTestActor5, ENTITY_TYPE, ENTITY_ID4, "");
        verifyOwner((TestActorRef<EntityOwnershipShard>) createTestActor5, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
        verifyOwner((TestActorRef<EntityOwnershipShard>) createTestActor5, ENTITY_TYPE, ENTITY_ID2, "peerMember2");
        verifyOwner((TestActorRef<EntityOwnershipShard>) createTestActor5, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        createTestActor3.tell(PoisonPill.getInstance(), ActorRef.noSender());
        createTestActor4.tell(new PeerDown(LOCAL_MEMBER_NAME, newShardId.toString()), ActorRef.noSender());
        createTestActor4.tell(new ElectionTimeout(), createTestActor4);
        ShardTestKit.waitUntilLeader(createTestActor4);
        verifyOwner((TestActorRef<EntityOwnershipShard>) createTestActor4, ENTITY_TYPE, ENTITY_ID4, "");
        verifyOwner((TestActorRef<EntityOwnershipShard>) createTestActor4, ENTITY_TYPE, ENTITY_ID3, "peerMember2");
        verifyOwner((TestActorRef<EntityOwnershipShard>) createTestActor4, ENTITY_TYPE, ENTITY_ID2, "peerMember2");
        verifyOwner((TestActorRef<EntityOwnershipShard>) createTestActor4, ENTITY_TYPE, ENTITY_ID1, "peerMember2");
    }

    @Test
    public void testCandidateRemovedWhenCandidateNotRegisteredLocally() throws Exception {
        ShardTestKit shardTestKit = new ShardTestKit(getSystem());
        this.dataStoreContextBuilder.shardHeartbeatIntervalInMillis(500).shardElectionTimeoutFactor(10000L);
        ShardIdentifier newShardId = newShardId(LOCAL_MEMBER_NAME);
        ShardIdentifier newShardId2 = newShardId("peerMember1");
        ShardIdentifier newShardId3 = newShardId("peerMember2");
        TestActorRef createTestActor = this.actorFactory.createTestActor(newShardProps(newShardId2, ImmutableMap.builder().put(newShardId.toString(), "").put(newShardId3.toString(), "").build(), "peerMember1", EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), newShardId2.toString());
        TestActorRef createTestActor2 = this.actorFactory.createTestActor(newShardProps(newShardId3, ImmutableMap.builder().put(newShardId.toString(), "").put(newShardId2.toString(), "").build(), "peerMember2", EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), newShardId3.toString());
        TestActorRef<EntityOwnershipShard> createTestActor3 = this.actorFactory.createTestActor(newShardProps(newShardId, ImmutableMap.builder().put(newShardId2.toString(), createTestActor.path().toString()).put(newShardId3.toString(), createTestActor2.path().toString()).build(), LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), newShardId.toString());
        createTestActor3.tell(new ElectionTimeout(), createTestActor3);
        ShardTestKit.waitUntilLeader(createTestActor3);
        createTestActor2.tell(new PeerAddressResolved(newShardId2.toString(), createTestActor.path().toString()), ActorRef.noSender());
        createTestActor2.tell(new PeerAddressResolved(newShardId.toString(), createTestActor3.path().toString()), ActorRef.noSender());
        createTestActor.tell(new PeerAddressResolved(newShardId3.toString(), createTestActor2.path().toString()), ActorRef.noSender());
        createTestActor.tell(new PeerAddressResolved(newShardId.toString(), createTestActor3.path().toString()), ActorRef.noSender());
        createTestActor3.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        verifyCommittedEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        createTestActor2.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        commitModification(createTestActor3, EntityOwnersModel.entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, "peerMember2"), shardTestKit);
        verifyCommittedEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID1, "peerMember2");
        commitModification(createTestActor3, EntityOwnersModel.entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, "peerMember1"), shardTestKit);
        verifyOwner(createTestActor3, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        verifyNoEntityCandidate(createTestActor3, ENTITY_TYPE, ENTITY_ID1, "peerMember1");
    }

    @Test
    public void testLocalCandidateRemovedWithCandidateRegistered() throws Exception {
        ShardTestKit shardTestKit = new ShardTestKit(getSystem());
        this.dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10000L);
        ShardIdentifier newShardId = newShardId("leader");
        ShardIdentifier newShardId2 = newShardId(LOCAL_MEMBER_NAME);
        TestActorRef<EntityOwnershipShard> createTestActor = this.actorFactory.createTestActor(Props.create(TestEntityOwnershipShard.class, new Object[]{newShardId2, ImmutableMap.builder().put(newShardId.toString(), "".toString()).build(), this.dataStoreContextBuilder.build()}).withDispatcher(Dispatchers.DefaultDispatcherId()));
        TestActorRef createTestActor2 = this.actorFactory.createTestActor(newShardProps(newShardId, ImmutableMap.builder().put(newShardId2.toString(), createTestActor.path().toString()).build(), LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), newShardId.toString());
        createTestActor2.tell(new ElectionTimeout(), createTestActor2);
        ShardTestKit.waitUntilLeader(createTestActor2);
        createTestActor.tell(new PeerAddressResolved(newShardId.toString(), createTestActor2.path().toString()), ActorRef.noSender());
        Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
        EntityOwnershipListener entityOwnershipListener = (EntityOwnershipListener) Mockito.mock(EntityOwnershipListener.class);
        createTestActor.tell(new RegisterListenerLocal(entityOwnershipListener, ENTITY_TYPE), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        createTestActor.tell(new RegisterCandidateLocal(entity), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        verifyCommittedEntityCandidate(createTestActor, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
        ((EntityOwnershipListener) Mockito.verify(entityOwnershipListener, Mockito.timeout(5000))).ownershipChanged(ownershipChange(entity, false, true, true));
        Mockito.reset(new EntityOwnershipListener[]{entityOwnershipListener});
        createTestActor2.tell(new PeerDown(LOCAL_MEMBER_NAME, newShardId2.toString()), ActorRef.noSender());
        ((EntityOwnershipListener) Mockito.verify(entityOwnershipListener, Mockito.timeout(5000))).ownershipChanged(ownershipChange(entity, true, false, false));
        verifyCommittedEntityCandidate(createTestActor, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
        ((EntityOwnershipListener) Mockito.verify(entityOwnershipListener, Mockito.timeout(5000))).ownershipChanged(ownershipChange(entity, false, true, true));
        createTestActor.tell(new UnregisterCandidateLocal(entity), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        verifyNoEntityCandidate(createTestActor, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
        Uninterruptibles.sleepUninterruptibly(500L, TimeUnit.MILLISECONDS);
        verifyNoEntityCandidate(createTestActor, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
    }

    @Test
    public void testListenerRegistration() throws Exception {
        ShardTestKit shardTestKit = new ShardTestKit(getSystem());
        TestActorRef<EntityOwnershipShard> createTestActor = this.actorFactory.createTestActor(newShardProps());
        ShardTestKit.waitUntilLeader(createTestActor);
        ShardDataTree dataStore = createTestActor.underlyingActor().getDataStore();
        Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
        Entity entity2 = new Entity(ENTITY_TYPE, ENTITY_ID2);
        Entity entity3 = new Entity(ENTITY_TYPE, ENTITY_ID3);
        Entity entity4 = new Entity("otherEntityType", ENTITY_ID3);
        EntityOwnershipListener entityOwnershipListener = (EntityOwnershipListener) Mockito.mock(EntityOwnershipListener.class);
        createTestActor.tell(new RegisterListenerLocal(entityOwnershipListener, ENTITY_TYPE), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        createTestActor.tell(new RegisterCandidateLocal(entity), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        ((EntityOwnershipListener) Mockito.verify(entityOwnershipListener, Mockito.timeout(5000))).ownershipChanged(ownershipChange(entity, false, true, true));
        createTestActor.tell(new RegisterCandidateLocal(entity2), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        ((EntityOwnershipListener) Mockito.verify(entityOwnershipListener, Mockito.timeout(5000))).ownershipChanged(ownershipChange(entity2, false, true, true));
        Mockito.reset(new EntityOwnershipListener[]{entityOwnershipListener});
        createTestActor.tell(new RegisterCandidateLocal(entity4), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        Uninterruptibles.sleepUninterruptibly(500L, TimeUnit.MILLISECONDS);
        ((EntityOwnershipListener) Mockito.verify(entityOwnershipListener, Mockito.never())).ownershipChanged(ownershipChange(entity4));
        writeNode(EntityOwnersModel.ENTITY_OWNERS_PATH, EntityOwnersModel.entityOwnersWithCandidate(ENTITY_TYPE, entity.getId(), "remoteMember"), dataStore);
        verifyCommittedEntityCandidate(createTestActor, ENTITY_TYPE, entity.getId(), "remoteMember");
        createTestActor.tell(new UnregisterCandidateLocal(entity), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        ((EntityOwnershipListener) Mockito.verify(entityOwnershipListener, Mockito.timeout(5000))).ownershipChanged(ownershipChange(entity, true, false, true));
        Mockito.reset(new EntityOwnershipListener[]{entityOwnershipListener});
        createTestActor.tell(new UnregisterListenerLocal(entityOwnershipListener, ENTITY_TYPE), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        createTestActor.tell(new RegisterCandidateLocal(entity3), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        verifyOwner(createTestActor, ENTITY_TYPE, entity3.getId(), LOCAL_MEMBER_NAME);
        Uninterruptibles.sleepUninterruptibly(500L, TimeUnit.MILLISECONDS);
        ((EntityOwnershipListener) Mockito.verify(entityOwnershipListener, Mockito.never())).ownershipChanged((EntityOwnershipChange) Matchers.any(EntityOwnershipChange.class));
        Mockito.reset(new EntityOwnershipListener[]{entityOwnershipListener});
        createTestActor.tell(new RegisterListenerLocal(entityOwnershipListener, ENTITY_TYPE), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        ((EntityOwnershipListener) Mockito.verify(entityOwnershipListener, Mockito.timeout(5000).times(2))).ownershipChanged((EntityOwnershipChange) AdditionalMatchers.or(ownershipChange(entity2, false, true, true), ownershipChange(entity3, false, true, true)));
        Uninterruptibles.sleepUninterruptibly(300L, TimeUnit.MILLISECONDS);
        ((EntityOwnershipListener) Mockito.verify(entityOwnershipListener, Mockito.never())).ownershipChanged(ownershipChange(entity4));
        ((EntityOwnershipListener) Mockito.verify(entityOwnershipListener, Mockito.times(1))).ownershipChanged(ownershipChange(entity));
    }

    private static void commitModification(TestActorRef<EntityOwnershipShard> testActorRef, NormalizedNode<?, ?> normalizedNode, JavaTestKit javaTestKit) {
        BatchedModifications newBatchedModifications = newBatchedModifications();
        newBatchedModifications.addModification(new MergeModification(EntityOwnersModel.ENTITY_OWNERS_PATH, normalizedNode));
        testActorRef.tell(newBatchedModifications, javaTestKit.getRef());
        javaTestKit.expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
    }

    private static BatchedModifications newBatchedModifications() {
        BatchedModifications batchedModifications = new BatchedModifications("tnx", (short) 3, "");
        batchedModifications.setDoCommitOnReady(true);
        batchedModifications.setReady(true);
        batchedModifications.setTotalMessagesSent(1);
        return batchedModifications;
    }

    private void verifyEntityCandidateRemoved(final TestActorRef<EntityOwnershipShard> testActorRef, String str, YangInstanceIdentifier yangInstanceIdentifier, String str2) {
        verifyNodeRemoved(EntityOwnersModel.candidatePath(str, yangInstanceIdentifier, str2), new Function<YangInstanceIdentifier, NormalizedNode<?, ?>>() { // from class: org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnershipShardTest.1
            public NormalizedNode<?, ?> apply(YangInstanceIdentifier yangInstanceIdentifier2) {
                try {
                    return AbstractShardTest.readStore((TestActorRef<? extends Shard>) testActorRef, yangInstanceIdentifier2);
                } catch (Exception e) {
                    throw new AssertionError("Failed to read " + yangInstanceIdentifier2, e);
                }
            }
        });
    }

    private void verifyCommittedEntityCandidate(final TestActorRef<EntityOwnershipShard> testActorRef, String str, YangInstanceIdentifier yangInstanceIdentifier, String str2) {
        verifyEntityCandidate(str, yangInstanceIdentifier, str2, new Function<YangInstanceIdentifier, NormalizedNode<?, ?>>() { // from class: org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnershipShardTest.2
            public NormalizedNode<?, ?> apply(YangInstanceIdentifier yangInstanceIdentifier2) {
                try {
                    return AbstractShardTest.readStore((TestActorRef<? extends Shard>) testActorRef, yangInstanceIdentifier2);
                } catch (Exception e) {
                    throw new AssertionError("Failed to read " + yangInstanceIdentifier2, e);
                }
            }
        });
    }

    private void verifyNoEntityCandidate(final TestActorRef<EntityOwnershipShard> testActorRef, String str, YangInstanceIdentifier yangInstanceIdentifier, String str2) {
        verifyEntityCandidate(str, yangInstanceIdentifier, str2, new Function<YangInstanceIdentifier, NormalizedNode<?, ?>>() { // from class: org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnershipShardTest.3
            public NormalizedNode<?, ?> apply(YangInstanceIdentifier yangInstanceIdentifier2) {
                try {
                    return AbstractShardTest.readStore((TestActorRef<? extends Shard>) testActorRef, yangInstanceIdentifier2);
                } catch (Exception e) {
                    throw new AssertionError("Failed to read " + yangInstanceIdentifier2, e);
                }
            }
        }, false);
    }

    private void verifyBatchedEntityCandidate(List<Modification> list, String str, YangInstanceIdentifier yangInstanceIdentifier, String str2) throws Exception {
        Assert.assertEquals("BatchedModifications size", 1L, list.size());
        verifyBatchedEntityCandidate(list.get(0), str, yangInstanceIdentifier, str2);
    }

    private void verifyBatchedEntityCandidate(Modification modification, String str, YangInstanceIdentifier yangInstanceIdentifier, String str2) throws Exception {
        Assert.assertEquals("Modification type", MergeModification.class, modification.getClass());
        verifyEntityCandidate(((MergeModification) modification).getData(), str, yangInstanceIdentifier, str2, true);
    }

    private static void verifyOwner(final TestActorRef<EntityOwnershipShard> testActorRef, String str, YangInstanceIdentifier yangInstanceIdentifier, String str2) {
        verifyOwner(str2, str, yangInstanceIdentifier, new Function<YangInstanceIdentifier, NormalizedNode<?, ?>>() { // from class: org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnershipShardTest.4
            public NormalizedNode<?, ?> apply(YangInstanceIdentifier yangInstanceIdentifier2) {
                try {
                    return AbstractShardTest.readStore((TestActorRef<? extends Shard>) testActorRef, yangInstanceIdentifier2);
                } catch (Exception e) {
                    return null;
                }
            }
        });
    }

    private Props newShardProps() {
        return newShardProps(Collections.emptyMap());
    }

    private Props newShardProps(EntityOwnerSelectionStrategyConfig entityOwnerSelectionStrategyConfig) {
        return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.emptyMap(), LOCAL_MEMBER_NAME, entityOwnerSelectionStrategyConfig);
    }

    private Props newShardProps(EntityOwnerSelectionStrategyConfig entityOwnerSelectionStrategyConfig, Map<String, String> map) {
        return newShardProps(newShardId(LOCAL_MEMBER_NAME), map, LOCAL_MEMBER_NAME, entityOwnerSelectionStrategyConfig);
    }

    private Props newShardProps(Map<String, String> map) {
        return newShardProps(newShardId(LOCAL_MEMBER_NAME), map, LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build());
    }

    private Props newShardProps(ShardIdentifier shardIdentifier, Map<String, String> map, String str, EntityOwnerSelectionStrategyConfig entityOwnerSelectionStrategyConfig) {
        return EntityOwnershipShard.newBuilder().id(shardIdentifier).peerAddresses(map).datastoreContext(this.dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).localMemberName(str).ownerSelectionStrategyConfig(entityOwnerSelectionStrategyConfig).props().withDispatcher(Dispatchers.DefaultDispatcherId());
    }

    private static ShardIdentifier newShardId(String str) {
        return ShardIdentifier.builder().memberName(str).shardName("entity-ownership").type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
    }

    @Test
    public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() throws Exception {
        ShardTestKit shardTestKit = new ShardTestKit(getSystem());
        EntityOwnerSelectionStrategyConfig.Builder addStrategy = EntityOwnerSelectionStrategyConfig.newBuilder().addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500L);
        String shardIdentifier = newShardId("follower").toString();
        TestActorRef createTestActor = this.actorFactory.createTestActor(Props.create(MockFollower.class, new Object[]{shardIdentifier, false}).withDispatcher(Dispatchers.DefaultDispatcherId()), shardIdentifier);
        createTestActor.underlyingActor().grantVote = true;
        TestActorRef<EntityOwnershipShard> createTestActor2 = this.actorFactory.createTestActor(newShardProps(addStrategy.build(), ImmutableMap.of(shardIdentifier.toString(), createTestActor.path().toString())));
        ShardTestKit.waitUntilLeader(createTestActor2);
        Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
        writeNode(EntityOwnersModel.ENTITY_OWNERS_PATH, EntityOwnersModel.entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, "follower"), createTestActor2.underlyingActor().getDataStore());
        createTestActor2.tell(new RegisterCandidateLocal(entity), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        verifyCommittedEntityCandidate(createTestActor2, ENTITY_TYPE, ENTITY_ID1, "follower");
        verifyCommittedEntityCandidate(createTestActor2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        verifyOwner(createTestActor2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
    }

    @Test
    public void testDelayedEntityOwnerSelection() throws Exception {
        ShardTestKit shardTestKit = new ShardTestKit(getSystem());
        EntityOwnerSelectionStrategyConfig.Builder addStrategy = EntityOwnerSelectionStrategyConfig.newBuilder().addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500L);
        String shardIdentifier = newShardId("follower1").toString();
        this.actorFactory.createTestActor(Props.create(MockFollower.class, new Object[]{shardIdentifier, false}).withDispatcher(Dispatchers.DefaultDispatcherId()), shardIdentifier).underlyingActor().grantVote = true;
        String shardIdentifier2 = newShardId("follower").toString();
        TestActorRef createTestActor = this.actorFactory.createTestActor(Props.create(MockFollower.class, new Object[]{shardIdentifier2, false}).withDispatcher(Dispatchers.DefaultDispatcherId()), shardIdentifier2);
        createTestActor.underlyingActor().grantVote = true;
        TestActorRef<EntityOwnershipShard> createTestActor2 = this.actorFactory.createTestActor(newShardProps(addStrategy.build(), ImmutableMap.of(shardIdentifier.toString(), createTestActor.path().toString(), shardIdentifier2.toString(), createTestActor.path().toString())));
        ShardTestKit.waitUntilLeader(createTestActor2);
        Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
        writeNode(EntityOwnersModel.ENTITY_OWNERS_PATH, EntityOwnersModel.entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, "follower"), createTestActor2.underlyingActor().getDataStore());
        createTestActor2.tell(new RegisterCandidateLocal(entity), shardTestKit.getRef());
        shardTestKit.expectMsgClass(SuccessReply.class);
        verifyCommittedEntityCandidate(createTestActor2, ENTITY_TYPE, ENTITY_ID1, "follower");
        verifyCommittedEntityCandidate(createTestActor2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
        verifyOwner(createTestActor2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
    }
}
