package org.realityforge.replicant.server.transport;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.PreDestroy;
import javax.json.Json;
import javax.json.JsonObjectBuilder;
import javax.transaction.TransactionSynchronizationRegistry;
import javax.websocket.CloseReason;
import javax.websocket.Session;
import org.realityforge.replicant.server.Change;
import org.realityforge.replicant.server.ChangeSet;
import org.realityforge.replicant.server.ChannelAction;
import org.realityforge.replicant.server.ChannelAddress;
import org.realityforge.replicant.server.ChannelLink;
import org.realityforge.replicant.server.EntityMessage;
import org.realityforge.replicant.server.EntityMessageEndpoint;
import org.realityforge.replicant.server.ServerConstants;
import org.realityforge.replicant.server.ee.EntityMessageCacheUtil;
import org.realityforge.replicant.server.json.TransportConstants;
import org.realityforge.replicant.server.transport.ChannelMetaData;

/* loaded from: input_file:org/realityforge/replicant/server/transport/ReplicantSessionManagerImpl.class */
public abstract class ReplicantSessionManagerImpl implements EntityMessageEndpoint, ReplicantSessionManager {

    @Nonnull
    private static final Logger LOG;

    @Nonnull
    private final ReadWriteLock _lock = new ReentrantReadWriteLock();

    @Nonnull
    private final Map<String, ReplicantSession> _sessions = new HashMap();

    @Nonnull
    private final ReadWriteLock _cacheLock = new ReentrantReadWriteLock();

    @Nonnull
    private final Map<ChannelAddress, ChannelCacheEntry> _cache = new HashMap();
    static final /* synthetic */ boolean $assertionsDisabled;

    @Nonnull
    protected abstract ReplicantMessageBroker getReplicantMessageBroker();

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    public boolean invalidateSession(@Nonnull ReplicantSession replicantSession) {
        this._lock.writeLock().lock();
        try {
            if (null == this._sessions.remove(replicantSession.getId())) {
                return false;
            }
            replicantSession.close();
            return true;
        } finally {
            this._lock.writeLock().unlock();
        }
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    @Nullable
    public ReplicantSession getSession(@Nonnull String str) {
        this._lock.readLock().lock();
        try {
            return this._sessions.get(str);
        } finally {
            this._lock.readLock().unlock();
        }
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    @Nonnull
    public Set<String> getSessionIDs() {
        this._lock.readLock().lock();
        try {
            return new HashSet(this._sessions.keySet());
        } finally {
            this._lock.readLock().unlock();
        }
    }

    @Nonnull
    Set<ReplicantSession> getSessions() {
        this._lock.readLock().lock();
        try {
            return new HashSet(this._sessions.values());
        } finally {
            this._lock.readLock().unlock();
        }
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    @Nonnull
    public ReplicantSession createSession(@Nonnull Session session) {
        ReplicantSession replicantSession = new ReplicantSession(session);
        this._lock.writeLock().lock();
        try {
            this._sessions.put(replicantSession.getId(), replicantSession);
            return replicantSession;
        } finally {
            this._lock.writeLock().unlock();
        }
    }

    @PreDestroy
    protected void preDestroy() {
        removeAllSessions();
    }

    public void pingSessions() {
        this._lock.readLock().lock();
        try {
            for (ReplicantSession replicantSession : this._sessions.values()) {
                if (LOG.isLoggable(Level.FINEST)) {
                    LOG.finest("Pinging websocket for session " + replicantSession.getId());
                }
                replicantSession.pingTransport();
            }
        } finally {
            this._lock.readLock().unlock();
        }
    }

    public void removeAllSessions() {
        this._lock.writeLock().lock();
        try {
            new ArrayList(this._sessions.values()).forEach((v0) -> {
                v0.close();
            });
            this._sessions.clear();
        } finally {
            this._lock.writeLock().unlock();
        }
    }

    public void removeClosedSessions() {
        this._lock.writeLock().lock();
        try {
            Iterator<Map.Entry<String, ReplicantSession>> it = this._sessions.entrySet().iterator();
            while (it.hasNext()) {
                if (!it.next().getValue().getWebSocketSession().isOpen()) {
                    it.remove();
                }
            }
        } finally {
            this._lock.writeLock().unlock();
        }
    }

    void queueCachedChangeSet(@Nonnull ReplicantSession replicantSession, @Nullable String str, @Nonnull ChangeSet changeSet) {
        TransactionSynchronizationRegistry registry = getRegistry();
        Integer num = (Integer) registry.getResource(ServerConstants.REQUEST_ID_KEY);
        registry.putResource(ServerConstants.REQUEST_COMPLETE_KEY, "0");
        registry.putResource(ServerConstants.CACHED_RESULT_HANDLED_KEY, "1");
        getReplicantMessageBroker().queueChangeMessage(replicantSession, true, num, str, Collections.emptyList(), changeSet);
    }

    @Nonnull
    protected abstract TransactionSynchronizationRegistry getRegistry();

    @Override // org.realityforge.replicant.server.EntityMessageEndpoint
    public boolean saveEntityMessages(@Nullable String str, @Nullable Integer num, @Nonnull Collection<EntityMessage> collection, @Nullable ChangeSet changeSet) {
        boolean z = false;
        for (ReplicantSession replicantSession : getSessions()) {
            if (replicantSession.isOpen()) {
                ChangeSet changeSet2 = new ChangeSet();
                boolean equals = Objects.equals(replicantSession.getId(), str);
                if (equals && null != changeSet) {
                    changeSet2.setRequired(changeSet.isRequired());
                    changeSet2.merge(changeSet.getChanges());
                    changeSet2.mergeActions(changeSet.getChannelActions());
                }
                if (equals) {
                    if (null == getRegistry().getResource(ServerConstants.CACHED_RESULT_HANDLED_KEY)) {
                        changeSet2.setRequired(true);
                    }
                    z = true;
                }
                getReplicantMessageBroker().queueChangeMessage(replicantSession, null != getRegistry().getResource(ServerConstants.SUBSCRIPTION_REQUEST_KEY), equals ? num : null, null, collection, changeSet2);
            }
        }
        return z;
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    public void sendChangeMessage(@Nonnull ReplicantSession replicantSession, @Nullable Integer num, @Nullable String str, @Nonnull Collection<EntityMessage> collection, @Nonnull ChangeSet changeSet) {
        processMessages(collection, replicantSession, changeSet);
        if (changeSet.hasContent()) {
            completeMessageProcessing(replicantSession, changeSet);
            replicantSession.sendPacket(num, str, changeSet);
        }
    }

    private void completeMessageProcessing(@Nonnull ReplicantSession replicantSession, @Nonnull ChangeSet changeSet) {
        try {
            expandLinks(replicantSession, changeSet);
        } catch (Exception e) {
            if (LOG.isLoggable(Level.INFO)) {
                LOG.log(Level.INFO, "Error invoking expandLinks for session " + replicantSession.getId(), (Throwable) e);
            }
            replicantSession.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "Expanding links failed"));
        }
    }

    private void processMessages(@Nonnull Collection<EntityMessage> collection, @Nonnull ReplicantSession replicantSession, @Nonnull ChangeSet changeSet) {
        Iterator<EntityMessage> it = collection.iterator();
        while (it.hasNext()) {
            processDeleteMessages(it.next(), replicantSession, changeSet);
        }
        Iterator<EntityMessage> it2 = collection.iterator();
        while (it2.hasNext()) {
            processUpdateMessages(it2.next(), replicantSession, changeSet);
        }
    }

    private void updateSubscription(@Nonnull ReplicantSession replicantSession, @Nonnull ChannelAddress channelAddress, @Nullable Object obj, @Nonnull ChangeSet changeSet) {
        SubscriptionEntry findSubscriptionEntry;
        ChannelMetaData channelMetaData = getSystemMetaData().getChannelMetaData(channelAddress);
        if (!$assertionsDisabled && !channelMetaData.hasFilterParameter()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && channelMetaData.getFilterType() != ChannelMetaData.FilterType.DYNAMIC) {
            throw new AssertionError();
        }
        SubscriptionEntry subscriptionEntry = replicantSession.getSubscriptionEntry(channelAddress);
        Object filter = subscriptionEntry.getFilter();
        if (doFiltersNotMatch(obj, filter)) {
            subscriptionEntry.setFilter(obj);
            collectDataForSubscriptionUpdate(replicantSession, channelAddress, changeSet, filter, obj);
            changeSet.mergeAction(channelAddress, ChannelAction.Action.UPDATE, obj);
            for (ChannelAction channelAction : changeSet.getChannelActions()) {
                if (ChannelAction.Action.REMOVE == channelAction.getAction() && null != (findSubscriptionEntry = replicantSession.findSubscriptionEntry(channelAction.getAddress()))) {
                    performUnsubscribe(replicantSession, findSubscriptionEntry, true, false, changeSet);
                }
            }
        }
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    public void bulkSubscribe(@Nonnull ReplicantSession replicantSession, int i, @Nonnull Collection<Integer> collection, @Nullable Object obj) throws InterruptedException {
        ReentrantLock lock = replicantSession.getLock();
        lock.lockInterruptibly();
        try {
            doBulkSubscribe(replicantSession, i, collection, obj);
            lock.unlock();
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    private void doBulkSubscribe(@Nonnull ReplicantSession replicantSession, int i, @Nonnull Collection<Integer> collection, @Nullable Object obj) {
        Throwable subscribeToAddresses;
        ChannelMetaData channelMetaData = getSystemMetaData().getChannelMetaData(i);
        if (!$assertionsDisabled && !channelMetaData.isInstanceGraph()) {
            throw new AssertionError();
        }
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        Iterator<Integer> it = collection.iterator();
        while (it.hasNext()) {
            ChannelAddress channelAddress = new ChannelAddress(i, it.next());
            SubscriptionEntry findSubscriptionEntry = replicantSession.findSubscriptionEntry(channelAddress);
            if (null == findSubscriptionEntry) {
                arrayList.add(channelAddress);
            } else {
                ((List) hashMap.computeIfAbsent(findSubscriptionEntry.getFilter(), obj2 -> {
                    return new ArrayList();
                })).add(channelAddress);
            }
        }
        Throwable th = null;
        if (!arrayList.isEmpty() && (!channelMetaData.isInstanceGraph() || !channelMetaData.areBulkLoadsSupported() || !bulkCollectDataForSubscribe(replicantSession, arrayList, obj))) {
            th = subscribeToAddresses(replicantSession, arrayList, obj);
        }
        if (!hashMap.isEmpty()) {
            for (Map.Entry entry : hashMap.entrySet()) {
                Object key = entry.getKey();
                List<ChannelAddress> list = (List) entry.getValue();
                boolean z = false;
                if (list.size() > 1 && channelMetaData.isInstanceGraph() && channelMetaData.areBulkLoadsSupported() && ChannelMetaData.FilterType.DYNAMIC == channelMetaData.getFilterType()) {
                    z = bulkCollectDataForSubscriptionUpdate(replicantSession, list, key, obj);
                }
                if (!z && null != (subscribeToAddresses = subscribeToAddresses(replicantSession, list, obj))) {
                    th = subscribeToAddresses;
                }
            }
        }
        if (th instanceof Error) {
            throw ((Error) th);
        }
        if (th != null) {
            throw ((RuntimeException) th);
        }
    }

    @Nullable
    private Throwable subscribeToAddresses(@Nonnull ReplicantSession replicantSession, @Nonnull List<ChannelAddress> list, @Nullable Object obj) {
        Throwable th = null;
        Iterator<ChannelAddress> it = list.iterator();
        while (it.hasNext()) {
            try {
                subscribe(replicantSession, it.next(), obj);
            } catch (Throwable th2) {
                th = th2;
            }
        }
        return th;
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    public void subscribe(@Nonnull ReplicantSession replicantSession, @Nonnull ChannelAddress channelAddress, @Nullable Object obj) throws InterruptedException {
        ReentrantLock lock = replicantSession.getLock();
        lock.lockInterruptibly();
        try {
            subscribe(replicantSession, channelAddress, true, obj, EntityMessageCacheUtil.getSessionChanges());
            lock.unlock();
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    void subscribe(@Nonnull ReplicantSession replicantSession, @Nonnull ChannelAddress channelAddress, boolean z, @Nullable Object obj, @Nonnull ChangeSet changeSet) {
        if (!replicantSession.isSubscriptionEntryPresent(channelAddress)) {
            SubscriptionEntry createSubscriptionEntry = replicantSession.createSubscriptionEntry(channelAddress);
            try {
                performSubscribe(replicantSession, createSubscriptionEntry, z, obj, changeSet);
                return;
            } catch (Throwable th) {
                replicantSession.deleteSubscriptionEntry(createSubscriptionEntry);
                throw th;
            }
        }
        SubscriptionEntry subscriptionEntry = replicantSession.getSubscriptionEntry(channelAddress);
        if (z) {
            subscriptionEntry.setExplicitlySubscribed(true);
        }
        ChannelMetaData channelMetaData = getSystemMetaData().getChannelMetaData(channelAddress);
        if (ChannelMetaData.FilterType.DYNAMIC == channelMetaData.getFilterType()) {
            updateSubscription(replicantSession, channelAddress, obj, changeSet);
        } else if (ChannelMetaData.FilterType.STATIC == channelMetaData.getFilterType()) {
            Object filter = subscriptionEntry.getFilter();
            if (doFiltersNotMatch(obj, filter)) {
                throw new AttemptedToUpdateStaticFilterException("Attempted to update filter on channel " + subscriptionEntry.getAddress() + " from " + filter + " to " + obj + " for channel that has a static filter. Unsubscribe and resubscribe to channel.");
            }
        }
    }

    private boolean doFiltersNotMatch(Object obj, Object obj2) {
        return !(null == obj2 && null == obj) && (null == obj2 || !obj2.equals(obj));
    }

    void performSubscribe(@Nonnull ReplicantSession replicantSession, @Nonnull SubscriptionEntry subscriptionEntry, boolean z, @Nullable Object obj, @Nonnull ChangeSet changeSet) {
        subscriptionEntry.setFilter(obj);
        ChannelAddress address = subscriptionEntry.getAddress();
        if (!getSystemMetaData().getChannelMetaData(address).isCacheable()) {
            if (collectDataForSubscribe(address, changeSet, obj).isChannelRootDeleted()) {
                changeSet.mergeAction(address, ChannelAction.Action.DELETE, null);
                return;
            }
            changeSet.mergeAction(address, ChannelAction.Action.ADD, obj);
            if (z) {
                subscriptionEntry.setExplicitlySubscribed(true);
                return;
            }
            return;
        }
        ChannelCacheEntry tryGetCacheEntry = tryGetCacheEntry(address);
        if (null == tryGetCacheEntry) {
            if (!$assertionsDisabled && !address.hasSubChannelId()) {
                throw new AssertionError();
            }
            ChangeSet changeSet2 = new ChangeSet();
            changeSet2.mergeAction(address, ChannelAction.Action.DELETE, null);
            queueCachedChangeSet(replicantSession, null, changeSet2);
            changeSet.setRequired(false);
            return;
        }
        if (z) {
            subscriptionEntry.setExplicitlySubscribed(true);
        }
        String cacheKey = tryGetCacheEntry.getCacheKey();
        if (!cacheKey.equals(replicantSession.getETag(address))) {
            replicantSession.setETag(address, null);
            ChangeSet changeSet3 = new ChangeSet();
            changeSet3.merge(tryGetCacheEntry.getChangeSet(), true);
            changeSet3.mergeAction(address, ChannelAction.Action.ADD, obj);
            queueCachedChangeSet(replicantSession, cacheKey, changeSet3);
            changeSet.setRequired(false);
            return;
        }
        if (replicantSession.getWebSocketSession().isOpen()) {
            JsonObjectBuilder add = Json.createObjectBuilder().add(TransportConstants.TYPE, "use-cache").add(TransportConstants.CHANNEL, address.toString()).add(TransportConstants.ETAG, cacheKey);
            Integer num = (Integer) getRegistry().getResource(ServerConstants.REQUEST_ID_KEY);
            if (null != num) {
                add.add(TransportConstants.REQUEST_ID, num.intValue());
            }
            WebSocketUtil.sendJsonObject(replicantSession.getWebSocketSession(), add.build());
            changeSet.setRequired(false);
            getRegistry().putResource(ServerConstants.CACHED_RESULT_HANDLED_KEY, "1");
        }
    }

    protected boolean deleteCacheEntry(@Nonnull ChannelAddress channelAddress) {
        this._cacheLock.writeLock().lock();
        try {
            return null != this._cache.remove(channelAddress);
        } finally {
            this._cacheLock.writeLock().unlock();
        }
    }

    void deleteAllCacheEntries() {
        this._cacheLock.writeLock().lock();
        try {
            this._cache.clear();
        } finally {
            this._cacheLock.writeLock().unlock();
        }
    }

    @Nullable
    ChannelCacheEntry tryGetCacheEntry(@Nonnull ChannelAddress channelAddress) {
        if (!$assertionsDisabled && !getSystemMetaData().getChannelMetaData(channelAddress).isCacheable()) {
            throw new AssertionError();
        }
        ChannelCacheEntry cacheEntry = getCacheEntry(channelAddress);
        cacheEntry.getLock().readLock().lock();
        try {
            if (cacheEntry.isInitialized()) {
                return cacheEntry;
            }
            cacheEntry.getLock().readLock().unlock();
            cacheEntry.getLock().writeLock().lock();
            try {
                if (cacheEntry.isInitialized()) {
                    return cacheEntry;
                }
                ChangeSet changeSet = new ChangeSet();
                SubscribeResult collectDataForSubscribe = collectDataForSubscribe(channelAddress, changeSet, null);
                if (collectDataForSubscribe.isChannelRootDeleted()) {
                    cacheEntry.getLock().writeLock().unlock();
                    return null;
                }
                String cacheKey = collectDataForSubscribe.getCacheKey();
                if (!$assertionsDisabled && null == cacheKey) {
                    throw new AssertionError();
                }
                cacheEntry.init(cacheKey, changeSet);
                cacheEntry.getLock().writeLock().unlock();
                return cacheEntry;
            } finally {
                cacheEntry.getLock().writeLock().unlock();
            }
        } finally {
            cacheEntry.getLock().readLock().unlock();
        }
    }

    ChannelCacheEntry getCacheEntry(@Nonnull ChannelAddress channelAddress) {
        this._cacheLock.readLock().lock();
        try {
            ChannelCacheEntry channelCacheEntry = this._cache.get(channelAddress);
            if (null != channelCacheEntry) {
                return channelCacheEntry;
            }
            this._cacheLock.readLock().unlock();
            this._cacheLock.writeLock().lock();
            try {
                ChannelCacheEntry channelCacheEntry2 = this._cache.get(channelAddress);
                if (null != channelCacheEntry2) {
                    return channelCacheEntry2;
                }
                ChannelCacheEntry channelCacheEntry3 = new ChannelCacheEntry(channelAddress);
                this._cache.put(channelAddress, channelCacheEntry3);
                this._cacheLock.writeLock().unlock();
                return channelCacheEntry3;
            } finally {
                this._cacheLock.writeLock().unlock();
            }
        } finally {
            this._cacheLock.readLock().unlock();
        }
    }

    @Nonnull
    protected SubscribeResult collectDataForSubscribe(@Nonnull ChannelAddress channelAddress, @Nonnull ChangeSet changeSet, @Nullable Object obj) {
        throw new IllegalStateException("collectDataForSubscribe called for unsupported channel " + channelAddress);
    }

    protected boolean bulkCollectDataForSubscribe(@Nonnull ReplicantSession replicantSession, @Nonnull List<ChannelAddress> list, @Nullable Object obj) {
        throw new IllegalStateException("collectDataForSubscriptionUpdate called for unsupported channel " + list.get(0));
    }

    protected void collectDataForSubscriptionUpdate(@Nonnull ReplicantSession replicantSession, @Nonnull ChannelAddress channelAddress, @Nonnull ChangeSet changeSet, @Nullable Object obj, @Nullable Object obj2) {
        throw new IllegalStateException("collectDataForSubscriptionUpdate called for unsupported channel " + channelAddress);
    }

    protected boolean bulkCollectDataForSubscriptionUpdate(@Nonnull ReplicantSession replicantSession, @Nonnull List<ChannelAddress> list, @Nullable Object obj, @Nullable Object obj2) {
        throw new IllegalStateException("bulkCollectDataForSubscriptionUpdate called for unknown channel " + list.get(0));
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    public void unsubscribe(@Nonnull ReplicantSession replicantSession, @Nonnull ChannelAddress channelAddress) throws InterruptedException {
        ReentrantLock lock = replicantSession.getLock();
        lock.lockInterruptibly();
        try {
            unsubscribe(replicantSession, channelAddress, EntityMessageCacheUtil.getSessionChanges());
            lock.unlock();
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    void unsubscribe(@Nonnull ReplicantSession replicantSession, @Nonnull ChannelAddress channelAddress, @Nonnull ChangeSet changeSet) {
        performUnsubscribe(replicantSession, channelAddress, changeSet);
    }

    private void performUnsubscribe(@Nonnull ReplicantSession replicantSession, @Nonnull ChannelAddress channelAddress, @Nonnull ChangeSet changeSet) {
        SubscriptionEntry findSubscriptionEntry = replicantSession.findSubscriptionEntry(channelAddress);
        if (null != findSubscriptionEntry) {
            performUnsubscribe(replicantSession, findSubscriptionEntry, true, false, changeSet);
        }
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantSessionManager
    public void bulkUnsubscribe(@Nonnull ReplicantSession replicantSession, int i, @Nonnull Collection<Integer> collection) throws InterruptedException {
        ReentrantLock lock = replicantSession.getLock();
        lock.lockInterruptibly();
        try {
            doBulkUnsubscribe(replicantSession, i, collection);
            lock.unlock();
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    private void doBulkUnsubscribe(@Nonnull ReplicantSession replicantSession, int i, @Nonnull Collection<Integer> collection) {
        ChangeSet sessionChanges = EntityMessageCacheUtil.getSessionChanges();
        Iterator<Integer> it = collection.iterator();
        while (it.hasNext()) {
            performUnsubscribe(replicantSession, new ChannelAddress(i, Integer.valueOf(it.next().intValue())), sessionChanges);
        }
    }

    protected void performUnsubscribe(@Nonnull ReplicantSession replicantSession, @Nonnull SubscriptionEntry subscriptionEntry, boolean z, boolean z2, @Nonnull ChangeSet changeSet) {
        if (z) {
            subscriptionEntry.setExplicitlySubscribed(false);
        }
        if (subscriptionEntry.canUnsubscribe()) {
            changeSet.mergeAction(subscriptionEntry.getAddress(), z2 ? ChannelAction.Action.DELETE : ChannelAction.Action.REMOVE, null);
            Iterator it = new ArrayList(subscriptionEntry.getOutwardSubscriptions()).iterator();
            while (it.hasNext()) {
                delinkDownstreamSubscription(replicantSession, subscriptionEntry, (ChannelAddress) it.next(), changeSet);
            }
            replicantSession.deleteSubscriptionEntry(subscriptionEntry);
        }
    }

    private void delinkDownstreamSubscription(@Nonnull ReplicantSession replicantSession, @Nonnull SubscriptionEntry subscriptionEntry, @Nonnull ChannelAddress channelAddress, @Nonnull ChangeSet changeSet) {
        SubscriptionEntry findSubscriptionEntry = replicantSession.findSubscriptionEntry(channelAddress);
        if (null != findSubscriptionEntry) {
            delinkSubscriptionEntries(subscriptionEntry, findSubscriptionEntry);
            performUnsubscribe(replicantSession, findSubscriptionEntry, false, false, changeSet);
        }
    }

    protected void delinkDownstreamSubscriptions(@Nonnull ReplicantSession replicantSession, @Nonnull SubscriptionEntry subscriptionEntry, @Nonnull EntityMessage entityMessage, @Nonnull ChangeSet changeSet) {
        Set<ChannelLink> links = entityMessage.getLinks();
        if (null != links) {
            Iterator<ChannelLink> it = links.iterator();
            while (it.hasNext()) {
                delinkDownstreamSubscription(replicantSession, subscriptionEntry, it.next().getTargetChannel(), changeSet);
            }
        }
    }

    void linkSubscriptionEntries(@Nonnull SubscriptionEntry subscriptionEntry, @Nonnull SubscriptionEntry subscriptionEntry2) {
        subscriptionEntry.registerOutwardSubscriptions(subscriptionEntry2.getAddress());
        subscriptionEntry2.registerInwardSubscriptions(subscriptionEntry.getAddress());
    }

    void delinkSubscriptionEntries(@Nonnull SubscriptionEntry subscriptionEntry, @Nonnull SubscriptionEntry subscriptionEntry2) {
        subscriptionEntry.deregisterOutwardSubscriptions(subscriptionEntry2.getAddress());
        subscriptionEntry2.deregisterInwardSubscriptions(subscriptionEntry.getAddress());
    }

    void expandLinks(@Nonnull ReplicantSession replicantSession, @Nonnull ChangeSet changeSet) {
        do {
        } while (expandLink(replicantSession, changeSet));
    }

    boolean expandLink(@Nonnull ReplicantSession replicantSession, @Nonnull ChangeSet changeSet) {
        Set<ChannelLink> links;
        Iterator<Change> it = changeSet.getChanges().iterator();
        while (it.hasNext()) {
            EntityMessage entityMessage = it.next().getEntityMessage();
            if (entityMessage.isUpdate() && null != (links = entityMessage.getLinks())) {
                Iterator<ChannelLink> it2 = links.iterator();
                while (it2.hasNext()) {
                    if (expandLinkIfRequired(replicantSession, it2.next(), changeSet)) {
                        return true;
                    }
                }
            }
        }
        return false;
    }

    boolean expandLinkIfRequired(@Nonnull ReplicantSession replicantSession, @Nonnull ChannelLink channelLink, @Nonnull ChangeSet changeSet) {
        SubscriptionEntry findSubscriptionEntry = replicantSession.findSubscriptionEntry(channelLink.getSourceChannel());
        if (null == findSubscriptionEntry) {
            return false;
        }
        ChannelAddress targetChannel = channelLink.getTargetChannel();
        boolean z = !getSystemMetaData().getChannelMetaData(targetChannel).hasFilterParameter();
        if (!z && !shouldFollowLink(findSubscriptionEntry, targetChannel)) {
            return false;
        }
        SubscriptionEntry findSubscriptionEntry2 = replicantSession.findSubscriptionEntry(targetChannel);
        if (null != findSubscriptionEntry2) {
            linkSubscriptionEntries(findSubscriptionEntry, findSubscriptionEntry2);
            return false;
        }
        subscribe(replicantSession, targetChannel, false, z ? null : findSubscriptionEntry.getFilter(), changeSet);
        linkSubscriptionEntries(findSubscriptionEntry, replicantSession.getSubscriptionEntry(targetChannel));
        return true;
    }

    protected boolean shouldFollowLink(@Nonnull SubscriptionEntry subscriptionEntry, @Nonnull ChannelAddress channelAddress) {
        throw new IllegalStateException("shouldFollowLink called for link between channel " + subscriptionEntry.getAddress() + " and " + channelAddress + " and the target has no filter or the link is unknown.");
    }

    @Nullable
    protected EntityMessage filterEntityMessage(@Nonnull ReplicantSession replicantSession, @Nonnull ChannelAddress channelAddress, @Nonnull EntityMessage entityMessage) {
        throw new IllegalStateException("filterEntityMessage called for unfiltered channel " + channelAddress);
    }

    private void processUpdateMessages(@Nonnull EntityMessage entityMessage, @Nonnull ReplicantSession replicantSession, @Nonnull ChangeSet changeSet) {
        SystemMetaData systemMetaData = getSystemMetaData();
        int channelCount = systemMetaData.getChannelCount();
        for (int i = 0; i < channelCount; i++) {
            ChannelMetaData channelMetaData = systemMetaData.getChannelMetaData(i);
            ChannelAddress extractAddressFromMessage = extractAddressFromMessage(channelMetaData, entityMessage);
            if (null != extractAddressFromMessage) {
                if (ChannelMetaData.CacheType.INTERNAL == channelMetaData.getCacheType()) {
                    deleteCacheEntry(extractAddressFromMessage);
                }
                processUpdateMessage(extractAddressFromMessage, entityMessage, replicantSession, changeSet, ChannelMetaData.FilterType.NONE != systemMetaData.getChannelMetaData(i).getFilterType() ? entityMessage2 -> {
                    return filterEntityMessage(replicantSession, extractAddressFromMessage, entityMessage2);
                } : null);
            }
        }
    }

    @Nullable
    private ChannelAddress extractAddressFromMessage(@Nonnull ChannelMetaData channelMetaData, @Nonnull EntityMessage entityMessage) {
        if (!channelMetaData.isInstanceGraph()) {
            if (entityMessage.getRoutingKeys().containsKey(channelMetaData.getName())) {
                return new ChannelAddress(channelMetaData.getChannelId());
            }
            return null;
        }
        Integer num = (Integer) entityMessage.getRoutingKeys().get(channelMetaData.getName());
        if (null != num) {
            return new ChannelAddress(channelMetaData.getChannelId(), num);
        }
        return null;
    }

    private void processUpdateMessage(@Nonnull ChannelAddress channelAddress, @Nonnull EntityMessage entityMessage, @Nonnull ReplicantSession replicantSession, @Nonnull ChangeSet changeSet, @Nullable Function<EntityMessage, EntityMessage> function) {
        if (null != replicantSession.findSubscriptionEntry(channelAddress)) {
            if (null != (null == function ? entityMessage : function.apply(entityMessage))) {
                changeSet.merge(new Change(entityMessage, channelAddress.getChannelId(), channelAddress.getSubChannelId()));
            }
        }
    }

    private void processDeleteMessages(@Nonnull EntityMessage entityMessage, @Nonnull ReplicantSession replicantSession, @Nonnull ChangeSet changeSet) {
        SystemMetaData systemMetaData = getSystemMetaData();
        int instanceChannelCount = systemMetaData.getInstanceChannelCount();
        for (int i = 0; i < instanceChannelCount; i++) {
            ChannelMetaData instanceChannelByIndex = systemMetaData.getInstanceChannelByIndex(i);
            Integer num = (Integer) entityMessage.getRoutingKeys().get(instanceChannelByIndex.getName());
            if (null != num) {
                ChannelAddress channelAddress = new ChannelAddress(instanceChannelByIndex.getChannelId(), num);
                processDeleteMessage(channelAddress, entityMessage, replicantSession, changeSet, ChannelMetaData.FilterType.NONE != systemMetaData.getInstanceChannelByIndex(i).getFilterType() ? entityMessage2 -> {
                    return filterEntityMessage(replicantSession, channelAddress, entityMessage2);
                } : null);
            }
        }
    }

    private void processDeleteMessage(@Nonnull ChannelAddress channelAddress, @Nonnull EntityMessage entityMessage, @Nonnull ReplicantSession replicantSession, @Nonnull ChangeSet changeSet, @Nullable Function<EntityMessage, EntityMessage> function) {
        SubscriptionEntry findSubscriptionEntry = replicantSession.findSubscriptionEntry(channelAddress);
        if (null != findSubscriptionEntry) {
            EntityMessage apply = null == function ? entityMessage : function.apply(entityMessage);
            if (null == apply || !apply.isDelete()) {
                return;
            }
            ChannelMetaData channelMetaData = getSystemMetaData().getChannelMetaData(channelAddress);
            if (channelMetaData.isInstanceGraph() && channelMetaData.getInstanceRootEntityTypeId().intValue() == apply.getTypeId()) {
                performUnsubscribe(replicantSession, findSubscriptionEntry, true, true, changeSet);
            }
            delinkDownstreamSubscriptions(replicantSession, findSubscriptionEntry, apply, changeSet);
        }
    }

    @Nonnull
    ReadWriteLock getLock() {
        return this._lock;
    }

    static {
        $assertionsDisabled = !ReplicantSessionManagerImpl.class.desiredAssertionStatus();
        LOG = Logger.getLogger(ReplicantSessionManagerImpl.class.getName());
    }
}
