package org.realityforge.replicant.server.transport;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.transaction.TransactionSynchronizationRegistry;
import org.realityforge.replicant.server.Change;
import org.realityforge.replicant.server.ChangeAccumulator;
import org.realityforge.replicant.server.ChangeSet;
import org.realityforge.replicant.server.ChannelAction;
import org.realityforge.replicant.server.ChannelAddress;
import org.realityforge.replicant.server.ChannelLink;
import org.realityforge.replicant.server.EntityMessage;
import org.realityforge.replicant.server.EntityMessageEndpoint;
import org.realityforge.replicant.server.ServerConstants;
import org.realityforge.replicant.server.json.JsonEncoder;
import org.realityforge.replicant.server.transport.ChannelMetaData;
import org.realityforge.replicant.server.transport.ReplicantSessionManager;

/* loaded from: input_file:org/realityforge/replicant/server/transport/ReplicantSessionManagerImpl.class */
public abstract class ReplicantSessionManagerImpl implements EntityMessageEndpoint, ReplicantSessionManager {
    private final ReadWriteLock _lock = new ReentrantReadWriteLock();
    private final Map<String, ReplicantSession> _sessions = new HashMap();
    private final Map<String, ReplicantSession> _roSessions = Collections.unmodifiableMap(this._sessions);
    private final ReadWriteLock _cacheLock = new ReentrantReadWriteLock();
    private final HashMap<ChannelAddress, ChannelCacheEntry> _cache = new HashMap<>();
    static final /* synthetic */ boolean $assertionsDisabled;

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    @Nonnull
    public String getSessionKey() {
        return "sid";
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    public boolean invalidateSession(@Nonnull String str) {
        return null != removeSession(str);
    }

    protected ReplicantSession removeSession(String str) {
        this._lock.writeLock().lock();
        try {
            return this._sessions.remove(str);
        } finally {
            this._lock.writeLock().unlock();
        }
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    @Nullable
    public ReplicantSession getSession(@Nonnull String str) {
        this._lock.readLock().lock();
        try {
            ReplicantSession replicantSession = this._sessions.get(str);
            if (null != replicantSession) {
                replicantSession.updateAccessTime();
            }
            return replicantSession;
        } finally {
            this._lock.readLock().unlock();
        }
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    @Nonnull
    public Set<String> getSessionIDs() {
        this._lock.readLock().lock();
        try {
            return new HashSet(this._sessions.keySet());
        } finally {
            this._lock.readLock().unlock();
        }
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    @Nonnull
    public ReplicantSession createSession() {
        ReplicantSession newReplicantSession = newReplicantSession();
        this._lock.writeLock().lock();
        try {
            this._sessions.put(newReplicantSession.getSessionID(), newReplicantSession);
            return newReplicantSession;
        } finally {
            this._lock.writeLock().unlock();
        }
    }

    @Nonnull
    protected Map<String, ReplicantSession> getSessions() {
        return this._roSessions;
    }

    @Nonnull
    protected ReadWriteLock getLock() {
        return this._lock;
    }

    protected int removeIdleSessions(long j) {
        int i = 0;
        long currentTimeMillis = System.currentTimeMillis();
        this._lock.writeLock().lock();
        try {
            Iterator<Map.Entry<String, ReplicantSession>> it = this._sessions.entrySet().iterator();
            while (it.hasNext()) {
                if (currentTimeMillis - it.next().getValue().getLastAccessedAt() > j) {
                    it.remove();
                    i++;
                }
            }
            return i;
        } finally {
            this._lock.writeLock().unlock();
        }
    }

    @Nullable
    protected String pollJsonData(@Nonnull ReplicantSession replicantSession, int i) {
        Packet pollPacket = pollPacket(replicantSession, i);
        if (null != pollPacket) {
            return JsonEncoder.encodeChangeSet(pollPacket.getSequence(), pollPacket.getRequestId(), pollPacket.getETag(), pollPacket.getChangeSet());
        }
        return null;
    }

    protected Packet sendPacket(@Nonnull ReplicantSession replicantSession, @Nullable String str, @Nonnull ChangeSet changeSet) {
        Integer num = (Integer) getRegistry().getResource(ServerConstants.REQUEST_ID_KEY);
        getRegistry().putResource(ServerConstants.REQUEST_COMPLETE_KEY, Boolean.FALSE);
        return replicantSession.getQueue().addPacket(num, str, changeSet);
    }

    @Nonnull
    protected abstract TransactionSynchronizationRegistry getRegistry();

    @Nonnull
    protected ReplicantSession newReplicantSession() {
        return new ReplicantSession(null, UUID.randomUUID().toString());
    }

    @Nullable
    protected Packet pollPacket(@Nonnull ReplicantSession replicantSession, int i) {
        PacketQueue queue = replicantSession.getQueue();
        queue.ack(i);
        return queue.nextPacketToProcess();
    }

    @Nonnull
    protected ReplicantSession ensureSession(@Nonnull String str) {
        ReplicantSession session = getSession(str);
        if (null == session) {
            throw newBadSessionException(str);
        }
        return session;
    }

    @Nonnull
    protected abstract RuntimeException newBadSessionException(@Nonnull String str);

    @Override // org.realityforge.replicant.server.EntityMessageEndpoint
    public boolean saveEntityMessages(@Nullable String str, @Nullable Integer num, @Nonnull Collection<EntityMessage> collection, @Nullable ChangeSet changeSet) {
        getLock().readLock().lock();
        ChangeAccumulator changeAccumulator = new ChangeAccumulator();
        try {
            Collection<ReplicantSession> values = getSessions().values();
            Iterator<EntityMessage> it = collection.iterator();
            while (it.hasNext()) {
                processDeleteMessages(it.next(), values, changeAccumulator);
            }
            Iterator<EntityMessage> it2 = collection.iterator();
            while (it2.hasNext()) {
                processUpdateMessages(it2.next(), values, changeAccumulator);
            }
            ReplicantSession session = null != str ? getSession(str) : null;
            if (null != session && null != changeSet) {
                changeAccumulator.getChangeSet(session).setPingResponse(changeSet.isPingResponse());
                changeAccumulator.addChanges(session, changeSet.getChanges());
                changeAccumulator.addActions(session, changeSet.getChannelActions());
            }
            for (ReplicantSession replicantSession : getSessions().values()) {
                expandLinks(replicantSession, changeAccumulator.getChangeSet(replicantSession));
            }
            return changeAccumulator.complete(str, num);
        } finally {
            getLock().readLock().unlock();
        }
    }

    protected abstract void processUpdateMessages(@Nonnull EntityMessage entityMessage, @Nonnull Collection<ReplicantSession> collection, @Nonnull ChangeAccumulator changeAccumulator);

    protected abstract void processDeleteMessages(@Nonnull EntityMessage entityMessage, @Nonnull Collection<ReplicantSession> collection, @Nonnull ChangeAccumulator changeAccumulator);

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    public void delinkSubscription(@Nonnull ReplicantSession replicantSession, @Nonnull ChannelAddress channelAddress, @Nonnull ChannelAddress channelAddress2, @Nonnull ChangeSet changeSet) {
        SubscriptionEntry findSubscriptionEntry = replicantSession.findSubscriptionEntry(channelAddress);
        SubscriptionEntry findSubscriptionEntry2 = replicantSession.findSubscriptionEntry(channelAddress2);
        if (null == findSubscriptionEntry || null == findSubscriptionEntry2) {
            return;
        }
        delinkSubscriptionEntries(findSubscriptionEntry, findSubscriptionEntry2);
        if (findSubscriptionEntry2.canUnsubscribe()) {
            performUnsubscribe(replicantSession, findSubscriptionEntry2, false, changeSet);
        }
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    public void bulkDelinkSubscription(@Nonnull ReplicantSession replicantSession, @Nonnull ChannelAddress channelAddress, int i, @Nonnull Collection<Integer> collection, @Nonnull ChangeSet changeSet) {
        Iterator<Integer> it = collection.iterator();
        while (it.hasNext()) {
            delinkSubscription(replicantSession, channelAddress, new ChannelAddress(i, it.next()), changeSet);
        }
    }

    @Nonnull
    protected ReplicantSessionManager.CacheStatus subscribe(@Nonnull String str, @Nonnull ChannelAddress channelAddress, @Nullable Object obj, @Nullable String str2, @Nonnull ChangeSet changeSet) {
        setupRegistryContext(str);
        ReplicantSession ensureSession = ensureSession(str);
        ensureSession.setETag(channelAddress, str2);
        ReplicantSessionManager.CacheStatus subscribe = subscribe(ensureSession, channelAddress, true, obj, changeSet);
        if (subscribe != ReplicantSessionManager.CacheStatus.USE) {
            ensureSession.setETag(channelAddress, null);
            expandLinks(ensureSession, changeSet);
        }
        return subscribe;
    }

    protected void bulkSubscribe(@Nonnull String str, int i, @Nonnull Collection<Integer> collection, @Nullable Object obj, boolean z, @Nonnull ChangeSet changeSet) {
        setupRegistryContext(str);
        bulkSubscribe(ensureSession(str), i, collection, obj, z, changeSet);
    }

    protected void updateSubscription(@Nonnull String str, @Nonnull ChannelAddress channelAddress, @Nullable Object obj, @Nonnull ChangeSet changeSet) {
        setupRegistryContext(str);
        ReplicantSession ensureSession = ensureSession(str);
        updateSubscription(ensureSession, channelAddress, obj, changeSet);
        expandLinks(ensureSession, changeSet);
    }

    protected void bulkUpdateSubscription(@Nonnull String str, int i, @Nonnull Collection<Integer> collection, @Nullable Object obj, @Nonnull ChangeSet changeSet) {
        setupRegistryContext(str);
        ReplicantSession ensureSession = ensureSession(str);
        bulkUpdateSubscription(ensureSession, i, collection, obj, changeSet);
        expandLinks(ensureSession, changeSet);
    }

    protected void unsubscribe(@Nonnull String str, @Nonnull ChannelAddress channelAddress, @Nonnull ChangeSet changeSet) {
        setupRegistryContext(str);
        unsubscribe(ensureSession(str), channelAddress, true, changeSet);
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    public void updateSubscription(@Nonnull ReplicantSession replicantSession, @Nonnull ChannelAddress channelAddress, @Nullable Object obj, @Nonnull ChangeSet changeSet) {
        if (!$assertionsDisabled && getSystemMetaData().getChannelMetaData(channelAddress).getFilterType() != ChannelMetaData.FilterType.DYNAMIC) {
            throw new AssertionError();
        }
        SubscriptionEntry subscriptionEntry = replicantSession.getSubscriptionEntry(channelAddress);
        Object filter = subscriptionEntry.getFilter();
        if (doFiltersMatch(obj, filter)) {
            return;
        }
        performUpdateSubscription(replicantSession, subscriptionEntry, filter, obj, changeSet);
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    public void bulkUpdateSubscription(@Nonnull ReplicantSession replicantSession, int i, @Nonnull Collection<Integer> collection, @Nullable Object obj, @Nonnull ChangeSet changeSet) {
        ChannelMetaData channelMetaData = getSystemMetaData().getChannelMetaData(i);
        if (!$assertionsDisabled && channelMetaData.getFilterType() != ChannelMetaData.FilterType.DYNAMIC) {
            throw new AssertionError();
        }
        ArrayList<ChannelAddress> arrayList = new ArrayList<>();
        Iterator<Integer> it = collection.iterator();
        while (it.hasNext()) {
            ChannelAddress channelAddress = new ChannelAddress(i, it.next());
            if (!doFiltersMatch(obj, replicantSession.getSubscriptionEntry(channelAddress).getFilter())) {
                arrayList.add(channelAddress);
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        if (1 == arrayList.size()) {
            updateSubscription(replicantSession, arrayList.get(0), obj, changeSet);
        } else {
            if (bulkCollectDataForSubscriptionUpdate(replicantSession, arrayList, changeSet, replicantSession.getSubscriptionEntry(arrayList.get(0)).getFilter(), obj)) {
                return;
            }
            Iterator<ChannelAddress> it2 = arrayList.iterator();
            while (it2.hasNext()) {
                updateSubscription(replicantSession, it2.next(), obj, changeSet);
            }
        }
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    public void bulkSubscribe(@Nonnull ReplicantSession replicantSession, int i, @Nonnull Collection<Integer> collection, @Nullable Object obj, boolean z, @Nonnull ChangeSet changeSet) {
        if (!$assertionsDisabled && !getSystemMetaData().getChannelMetaData(i).isInstanceGraph()) {
            throw new AssertionError();
        }
        ArrayList<ChannelAddress> arrayList = new ArrayList<>();
        HashMap hashMap = new HashMap();
        Iterator<Integer> it = collection.iterator();
        while (it.hasNext()) {
            ChannelAddress channelAddress = new ChannelAddress(i, it.next());
            SubscriptionEntry findSubscriptionEntry = replicantSession.findSubscriptionEntry(channelAddress);
            if (null == findSubscriptionEntry) {
                arrayList.add(channelAddress);
            } else {
                ((ArrayList) hashMap.computeIfAbsent(findSubscriptionEntry.getFilter(), obj2 -> {
                    return new ArrayList();
                })).add(channelAddress);
            }
        }
        Object obj3 = null;
        if (!arrayList.isEmpty() && !bulkCollectDataForSubscribe(replicantSession, arrayList, changeSet, obj, z)) {
            Iterator<ChannelAddress> it2 = arrayList.iterator();
            while (it2.hasNext()) {
                try {
                    subscribe(replicantSession, it2.next(), true, obj, changeSet);
                } catch (Throwable th) {
                    obj3 = th;
                }
            }
        }
        if (!hashMap.isEmpty()) {
            for (Map.Entry entry : hashMap.entrySet()) {
                Object key = entry.getKey();
                ArrayList<ChannelAddress> arrayList2 = (ArrayList) entry.getValue();
                if (!(arrayList2.size() > 1 ? bulkCollectDataForSubscriptionUpdate(replicantSession, arrayList2, changeSet, key, obj) : false)) {
                    Iterator<ChannelAddress> it3 = arrayList2.iterator();
                    while (it3.hasNext()) {
                        try {
                            subscribe(replicantSession, it3.next(), true, obj, changeSet);
                        } catch (Throwable th2) {
                            obj3 = th2;
                        }
                    }
                }
            }
        }
        if (obj3 instanceof Error) {
            throw ((Error) obj3);
        }
        if (obj3 != null) {
            throw ((RuntimeException) obj3);
        }
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    @Nonnull
    public ReplicantSessionManager.CacheStatus subscribe(@Nonnull ReplicantSession replicantSession, @Nonnull ChannelAddress channelAddress, boolean z, @Nullable Object obj, @Nonnull ChangeSet changeSet) {
        if (!replicantSession.isSubscriptionEntryPresent(channelAddress)) {
            SubscriptionEntry createSubscriptionEntry = replicantSession.createSubscriptionEntry(channelAddress);
            try {
                return performSubscribe(replicantSession, createSubscriptionEntry, z, obj, changeSet);
            } catch (Throwable th) {
                replicantSession.deleteSubscriptionEntry(createSubscriptionEntry);
                throw th;
            }
        }
        SubscriptionEntry subscriptionEntry = replicantSession.getSubscriptionEntry(channelAddress);
        if (z) {
            subscriptionEntry.setExplicitlySubscribed(true);
        }
        ChannelMetaData channelMetaData = getSystemMetaData().getChannelMetaData(channelAddress);
        if (channelMetaData.getFilterType() == ChannelMetaData.FilterType.DYNAMIC) {
            updateSubscription(replicantSession, channelAddress, obj, changeSet);
        } else if (channelMetaData.getFilterType() == ChannelMetaData.FilterType.STATIC) {
            Object filter = subscriptionEntry.getFilter();
            if (!doFiltersMatch(obj, filter)) {
                throw new AttemptedToUpdateStaticFilterException("Attempted to update filter on channel " + subscriptionEntry.getDescriptor() + " from " + filter + " to " + obj + " for channel that has a static filter. Unsubscribe and resubscribe to channel.");
            }
        }
        return ReplicantSessionManager.CacheStatus.IGNORE;
    }

    private boolean doFiltersMatch(Object obj, Object obj2) {
        return (null == obj2 && null == obj) || (null != obj2 && obj2.equals(obj));
    }

    @Nonnull
    ReplicantSessionManager.CacheStatus performSubscribe(@Nonnull ReplicantSession replicantSession, @Nonnull SubscriptionEntry subscriptionEntry, boolean z, @Nullable Object obj, @Nonnull ChangeSet changeSet) {
        if (z) {
            subscriptionEntry.setExplicitlySubscribed(true);
        }
        subscriptionEntry.setFilter(obj);
        ChannelAddress descriptor = subscriptionEntry.getDescriptor();
        if (!getSystemMetaData().getChannelMetaData(descriptor).isCacheable()) {
            collectDataForSubscribe(descriptor, changeSet, obj);
            changeSet.mergeAction(descriptor, ChannelAction.Action.ADD, obj);
            return ReplicantSessionManager.CacheStatus.REFRESH;
        }
        ChannelCacheEntry ensureCacheEntry = ensureCacheEntry(descriptor);
        String cacheKey = ensureCacheEntry.getCacheKey();
        if (cacheKey.equals(replicantSession.getETag(descriptor))) {
            return ReplicantSessionManager.CacheStatus.USE;
        }
        ChangeSet changeSet2 = new ChangeSet();
        changeSet2.merge(ensureCacheEntry.getChangeSet(), true);
        changeSet2.mergeAction(descriptor, ChannelAction.Action.ADD, obj);
        sendPacket(replicantSession, cacheKey, changeSet2);
        return ReplicantSessionManager.CacheStatus.REFRESH;
    }

    protected boolean deleteCacheEntry(@Nonnull ChannelAddress channelAddress) {
        this._cacheLock.writeLock().lock();
        try {
            return null != this._cache.remove(channelAddress);
        } finally {
            this._cacheLock.writeLock().unlock();
        }
    }

    @Nonnull
    protected ChannelCacheEntry ensureCacheEntry(@Nonnull ChannelAddress channelAddress) {
        if (!$assertionsDisabled && !getSystemMetaData().getChannelMetaData(channelAddress).isCacheable()) {
            throw new AssertionError();
        }
        ChannelCacheEntry cacheEntry = getCacheEntry(channelAddress);
        cacheEntry.getLock().readLock().lock();
        try {
            if (cacheEntry.isInitialized()) {
                return cacheEntry;
            }
            cacheEntry.getLock().readLock().unlock();
            cacheEntry.getLock().writeLock().lock();
            try {
                if (cacheEntry.isInitialized()) {
                    return cacheEntry;
                }
                ChangeSet changeSet = new ChangeSet();
                String collectDataForSubscribe = collectDataForSubscribe(channelAddress, changeSet, null);
                if (!$assertionsDisabled && null == collectDataForSubscribe) {
                    throw new AssertionError();
                }
                cacheEntry.init(collectDataForSubscribe, changeSet);
                cacheEntry.getLock().writeLock().unlock();
                return cacheEntry;
            } finally {
                cacheEntry.getLock().writeLock().unlock();
            }
        } finally {
            cacheEntry.getLock().readLock().unlock();
        }
    }

    ChannelCacheEntry getCacheEntry(@Nonnull ChannelAddress channelAddress) {
        this._cacheLock.readLock().lock();
        try {
            ChannelCacheEntry channelCacheEntry = this._cache.get(channelAddress);
            if (null != channelCacheEntry) {
                return channelCacheEntry;
            }
            this._cacheLock.readLock().unlock();
            this._cacheLock.writeLock().lock();
            try {
                ChannelCacheEntry channelCacheEntry2 = this._cache.get(channelAddress);
                if (null != channelCacheEntry2) {
                    return channelCacheEntry2;
                }
                ChannelCacheEntry channelCacheEntry3 = new ChannelCacheEntry(channelAddress);
                this._cache.put(channelAddress, channelCacheEntry3);
                this._cacheLock.writeLock().unlock();
                return channelCacheEntry3;
            } finally {
                this._cacheLock.writeLock().unlock();
            }
        } finally {
            this._cacheLock.readLock().unlock();
        }
    }

    void performUpdateSubscription(@Nonnull ReplicantSession replicantSession, @Nonnull SubscriptionEntry subscriptionEntry, @Nullable Object obj, @Nullable Object obj2, @Nonnull ChangeSet changeSet) {
        if (!$assertionsDisabled && getSystemMetaData().getChannelMetaData(subscriptionEntry.getDescriptor()).getFilterType() == ChannelMetaData.FilterType.NONE) {
            throw new AssertionError();
        }
        subscriptionEntry.setFilter(obj2);
        ChannelAddress descriptor = subscriptionEntry.getDescriptor();
        collectDataForSubscriptionUpdate(subscriptionEntry.getDescriptor(), changeSet, obj, obj2);
        changeSet.mergeAction(descriptor, ChannelAction.Action.UPDATE, obj2);
    }

    @Nullable
    protected abstract String collectDataForSubscribe(@Nonnull ChannelAddress channelAddress, @Nonnull ChangeSet changeSet, @Nullable Object obj);

    protected abstract boolean bulkCollectDataForSubscribe(@Nonnull ReplicantSession replicantSession, @Nonnull ArrayList<ChannelAddress> arrayList, @Nonnull ChangeSet changeSet, @Nullable Object obj, boolean z);

    protected abstract void collectDataForSubscriptionUpdate(@Nonnull ChannelAddress channelAddress, @Nonnull ChangeSet changeSet, @Nullable Object obj, @Nullable Object obj2);

    protected abstract boolean bulkCollectDataForSubscriptionUpdate(@Nonnull ReplicantSession replicantSession, @Nonnull ArrayList<ChannelAddress> arrayList, @Nonnull ChangeSet changeSet, @Nullable Object obj, @Nullable Object obj2);

    protected void bulkUnsubscribe(@Nonnull String str, int i, @Nonnull Collection<Integer> collection, boolean z, @Nonnull ChangeSet changeSet) {
        setupRegistryContext(str);
        bulkUnsubscribe(ensureSession(str), i, collection, z, changeSet);
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    public void unsubscribe(@Nonnull ReplicantSession replicantSession, @Nonnull ChannelAddress channelAddress, boolean z, @Nonnull ChangeSet changeSet) {
        SubscriptionEntry findSubscriptionEntry = replicantSession.findSubscriptionEntry(channelAddress);
        if (null != findSubscriptionEntry) {
            performUnsubscribe(replicantSession, findSubscriptionEntry, z, changeSet);
        }
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    public void bulkUnsubscribe(@Nonnull ReplicantSession replicantSession, int i, @Nonnull Collection<Integer> collection, boolean z, @Nonnull ChangeSet changeSet) {
        Iterator<Integer> it = collection.iterator();
        while (it.hasNext()) {
            unsubscribe(replicantSession, new ChannelAddress(i, it.next()), z, changeSet);
        }
    }

    protected void performUnsubscribe(@Nonnull ReplicantSession replicantSession, @Nonnull SubscriptionEntry subscriptionEntry, boolean z, @Nonnull ChangeSet changeSet) {
        if (z) {
            subscriptionEntry.setExplicitlySubscribed(false);
        }
        if (subscriptionEntry.canUnsubscribe()) {
            changeSet.mergeAction(subscriptionEntry.getDescriptor(), ChannelAction.Action.REMOVE, null);
            Iterator it = new ArrayList(subscriptionEntry.getOutwardSubscriptions()).iterator();
            while (it.hasNext()) {
                delinkDownstreamSubscription(replicantSession, subscriptionEntry, (ChannelAddress) it.next(), changeSet);
            }
            replicantSession.deleteSubscriptionEntry(subscriptionEntry);
        }
    }

    private void delinkDownstreamSubscription(@Nonnull ReplicantSession replicantSession, @Nonnull SubscriptionEntry subscriptionEntry, @Nonnull ChannelAddress channelAddress, @Nonnull ChangeSet changeSet) {
        SubscriptionEntry findSubscriptionEntry = replicantSession.findSubscriptionEntry(channelAddress);
        if (null != findSubscriptionEntry) {
            delinkSubscriptionEntries(subscriptionEntry, findSubscriptionEntry);
            performUnsubscribe(replicantSession, findSubscriptionEntry, false, changeSet);
        }
    }

    protected void delinkDownstreamSubscriptions(@Nonnull ReplicantSession replicantSession, @Nonnull SubscriptionEntry subscriptionEntry, @Nonnull EntityMessage entityMessage, @Nonnull ChangeAccumulator changeAccumulator) {
        Set<ChannelLink> links = entityMessage.getLinks();
        if (null != links) {
            Iterator<ChannelLink> it = links.iterator();
            while (it.hasNext()) {
                delinkDownstreamSubscription(replicantSession, subscriptionEntry, it.next().getTargetChannel(), changeAccumulator.getChangeSet(replicantSession));
            }
        }
    }

    protected void linkSubscriptionEntries(@Nonnull SubscriptionEntry subscriptionEntry, @Nonnull SubscriptionEntry subscriptionEntry2) {
        subscriptionEntry.registerOutwardSubscriptions(subscriptionEntry2.getDescriptor());
        subscriptionEntry2.registerInwardSubscriptions(subscriptionEntry.getDescriptor());
    }

    protected void delinkSubscriptionEntries(@Nonnull SubscriptionEntry subscriptionEntry, @Nonnull SubscriptionEntry subscriptionEntry2) {
        subscriptionEntry.deregisterOutwardSubscriptions(subscriptionEntry2.getDescriptor());
        subscriptionEntry2.deregisterInwardSubscriptions(subscriptionEntry.getDescriptor());
    }

    protected void expandLinks(@Nonnull ReplicantSession replicantSession, @Nonnull ChangeSet changeSet) {
        do {
        } while (expandLink(replicantSession, changeSet));
    }

    protected boolean expandLink(@Nonnull ReplicantSession replicantSession, @Nonnull ChangeSet changeSet) {
        Set<ChannelLink> links;
        Iterator<Change> it = changeSet.getChanges().iterator();
        while (it.hasNext()) {
            EntityMessage entityMessage = it.next().getEntityMessage();
            if (entityMessage.isUpdate() && null != (links = entityMessage.getLinks())) {
                Iterator<ChannelLink> it2 = links.iterator();
                while (it2.hasNext()) {
                    if (expandLinkIfRequired(replicantSession, it2.next(), changeSet)) {
                        return true;
                    }
                }
            }
        }
        return false;
    }

    boolean expandLinkIfRequired(@Nonnull ReplicantSession replicantSession, @Nonnull ChannelLink channelLink, @Nonnull ChangeSet changeSet) {
        SubscriptionEntry findSubscriptionEntry = replicantSession.findSubscriptionEntry(channelLink.getSourceChannel());
        if (null == findSubscriptionEntry) {
            return false;
        }
        ChannelAddress targetChannel = channelLink.getTargetChannel();
        boolean z = getSystemMetaData().getChannelMetaData(targetChannel).getFilterType() == ChannelMetaData.FilterType.NONE;
        if (!z && !shouldFollowLink(findSubscriptionEntry, targetChannel)) {
            return false;
        }
        SubscriptionEntry findSubscriptionEntry2 = replicantSession.findSubscriptionEntry(targetChannel);
        if (null != findSubscriptionEntry2) {
            linkSubscriptionEntries(findSubscriptionEntry, findSubscriptionEntry2);
            return false;
        }
        subscribe(replicantSession, targetChannel, false, z ? null : findSubscriptionEntry.getFilter(), changeSet);
        linkSubscriptionEntries(findSubscriptionEntry, replicantSession.getSubscriptionEntry(targetChannel));
        return true;
    }

    protected abstract boolean shouldFollowLink(@Nonnull SubscriptionEntry subscriptionEntry, @Nonnull ChannelAddress channelAddress);

    private void setupRegistryContext(@Nonnull String str) {
        getRegistry().putResource(ServerConstants.SESSION_ID_KEY, str);
    }

    static {
        $assertionsDisabled = !ReplicantSessionManagerImpl.class.desiredAssertionStatus();
    }
}
