package org.realityforge.replicant.server.transport;

import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.realityforge.replicant.server.ChangeSet;
import org.realityforge.replicant.server.EntityMessage;

/* loaded from: input_file:org/realityforge/replicant/server/transport/ReplicantMessageBrokerImpl.class */
public abstract class ReplicantMessageBrokerImpl implements ReplicantMessageBroker {

    @Nonnull
    private static final Logger LOG = Logger.getLogger(ReplicantSessionManagerImpl.class.getName());
    private static final long QUEUE_TIMEOUT = 10;

    @Nonnull
    private final Lock _lock = new ReentrantLock();

    @Nonnull
    private final BlockingQueue<ReplicantSession> _queue = new LinkedBlockingDeque();

    @Nonnull
    protected abstract ReplicantSessionManager getReplicantSessionManager();

    @Override // org.realityforge.replicant.server.transport.ReplicantMessageBroker
    public void queueChangeMessage(@Nonnull ReplicantSession replicantSession, boolean z, @Nullable Integer num, @Nullable String str, @Nonnull Collection<EntityMessage> collection, @Nonnull ChangeSet changeSet) {
        replicantSession.queuePacket(new Packet(z, num, str, collection, changeSet));
        this._queue.add(replicantSession);
    }

    @Override // org.realityforge.replicant.server.transport.ReplicantMessageBroker
    public void processPendingSessions() {
        if (!this._lock.tryLock()) {
            return;
        }
        while (true) {
            try {
                ReplicantSession poll = this._queue.poll(QUEUE_TIMEOUT, TimeUnit.MILLISECONDS);
                if (null == poll) {
                    return;
                } else {
                    processPendingSession(poll);
                }
            } catch (InterruptedException e) {
                return;
            } finally {
                this._lock.unlock();
            }
        }
    }

    private void processPendingSession(@Nonnull ReplicantSession replicantSession) {
        LOG.log(Level.FINEST, () -> {
            return "Processing pending ChangeSets for session " + replicantSession.getId();
        });
        if (!replicantSession.isOpen()) {
            return;
        }
        ReentrantLock lock = replicantSession.getLock();
        try {
            try {
                lock.lockInterruptibly();
                while (true) {
                    Packet popPendingPacket = replicantSession.popPendingPacket();
                    if (null == popPendingPacket) {
                        lock.unlock();
                        return;
                    }
                    getReplicantSessionManager().sendChangeMessage(replicantSession, popPendingPacket.getRequestId(), popPendingPacket.getEtag(), popPendingPacket.getMessages(), popPendingPacket.getChangeSet());
                }
            } catch (InterruptedException e) {
                LOG.log(Level.FINEST, () -> {
                    return "Error completing send of packet " + replicantSession.getId();
                });
                replicantSession.closeDueToInterrupt();
                lock.unlock();
            }
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }
}
