package org.realityforge.replicant.server.ee;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.json.JsonString;
import javax.persistence.EntityManager;
import javax.transaction.TransactionSynchronizationRegistry;
import javax.transaction.Transactional;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import org.realityforge.replicant.server.ChannelAddress;
import org.realityforge.replicant.server.EntityMessageEndpoint;
import org.realityforge.replicant.server.json.TransportConstants;
import org.realityforge.replicant.server.transport.ChannelMetaData;
import org.realityforge.replicant.server.transport.ReplicantSession;
import org.realityforge.replicant.server.transport.ReplicantSessionManager;
import org.realityforge.replicant.server.transport.WebSocketUtil;

/* loaded from: input_file:org/realityforge/replicant/server/ee/AbstractReplicantEndpoint.class */
public abstract class AbstractReplicantEndpoint {

    @Nonnull
    protected static final Logger LOG = Logger.getLogger(AbstractEeReplicantEndpoint.class.getName());

    @Nonnull
    private final transient ObjectMapper _jsonMapper = new ObjectMapper();

    @Nonnull
    protected abstract ReplicantSessionManager getSessionManager();

    @Nonnull
    protected abstract TransactionSynchronizationRegistry getRegistry();

    @Nonnull
    protected abstract EntityManager getEntityManager();

    @Nonnull
    protected abstract EntityMessageEndpoint getEndpoint();

    @OnOpen
    public void onOpen(@Nonnull Session session) {
        ReplicantSession createSession = getSessionManager().createSession(session);
        if (LOG.isLoggable(Level.FINE)) {
            LOG.log(Level.FINE, "Opening WebSocket Session " + session.getId() + " for replicant session " + getReplicantSession(session).getId());
        }
        WebSocketUtil.sendJsonObject(session, Json.createObjectBuilder().add(TransportConstants.TYPE, "session-created").add("sessionId", createSession.getId()).build());
    }

    @OnMessage
    @Transactional
    public void command(@Nonnull Session session, @Nonnull String str) throws IOException {
        try {
            ReplicantSession replicantSession = getReplicantSession(session);
            if (LOG.isLoggable(Level.FINE)) {
                LOG.log(Level.FINE, "Message on WebSocket Session " + session.getId() + " for replicant session " + getReplicantSession(session).getId() + ". Message:\n" + str);
            }
            try {
                JsonObject readObject = Json.createReader(new StringReader(str)).readObject();
                String string = readObject.getString(TransportConstants.TYPE);
                int i = readObject.getInt(TransportConstants.REQUEST_ID);
                if (!"auth".equals(string) && !isAuthorized(replicantSession)) {
                    sendErrorAndClose(session, "Replicant session not authroized");
                    return;
                }
                beforeCommand(replicantSession, string, readObject);
                if ("etags".equals(string)) {
                    try {
                        onETags(replicantSession, readObject);
                    } catch (InterruptedException e) {
                        replicantSession.closeDueToInterrupt();
                    }
                } else if ("ping".equals(string)) {
                    sendOk(session, i);
                } else if ("sub".equals(string)) {
                    try {
                        onSubscribe(replicantSession, readObject);
                    } catch (InterruptedException e2) {
                        replicantSession.closeDueToInterrupt();
                    }
                } else if ("bulk-sub".equals(string)) {
                    try {
                        onBulkSubscribe(replicantSession, readObject);
                    } catch (InterruptedException e3) {
                        replicantSession.closeDueToInterrupt();
                    }
                } else if ("unsub".equals(string)) {
                    try {
                        onUnsubscribe(replicantSession, readObject);
                    } catch (InterruptedException e4) {
                        replicantSession.closeDueToInterrupt();
                    }
                } else if ("bulk-unsub".equals(string)) {
                    try {
                        onBulkUnsubscribe(replicantSession, readObject);
                    } catch (InterruptedException e5) {
                        replicantSession.closeDueToInterrupt();
                    }
                } else if ("auth".equals(string)) {
                    onAuthorize(replicantSession, readObject);
                } else {
                    onUnknownCommand(replicantSession, readObject);
                }
                afterCommand(replicantSession, string, readObject);
            } catch (Throwable th) {
                onMalformedMessage(replicantSession, str);
            }
        } catch (Throwable th2) {
            sendErrorAndClose(session, "Unable to locate associated replicant session");
        }
    }

    protected void onSessionClose(@Nonnull ReplicantSession replicantSession) {
    }

    protected void beforeCommand(@Nonnull ReplicantSession replicantSession, @Nonnull String str, @Nonnull JsonObject jsonObject) {
    }

    protected void afterCommand(@Nonnull ReplicantSession replicantSession, @Nonnull String str, @Nonnull JsonObject jsonObject) {
    }

    protected boolean isAuthorized(@Nonnull ReplicantSession replicantSession) {
        return true;
    }

    private void sendOk(@Nonnull Session session, int i) {
        WebSocketUtil.sendJsonObject(session, Json.createObjectBuilder().add(TransportConstants.TYPE, "ok").add(TransportConstants.REQUEST_ID, i).build());
    }

    private void onETags(@Nonnull ReplicantSession replicantSession, @Nonnull JsonObject jsonObject) throws InterruptedException {
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : jsonObject.getJsonObject("etags").entrySet()) {
            hashMap.put(ChannelAddress.parse((String) entry.getKey()), ((JsonString) entry.getValue()).getString());
        }
        ReplicationRequestUtil.sessionLockingRequest(getRegistry(), getEntityManager(), getEndpoint(), "setEtags()", replicantSession, null, () -> {
            replicantSession.setETags(hashMap);
        });
        sendOk(replicantSession.getWebSocketSession(), jsonObject.getInt(TransportConstants.REQUEST_ID));
    }

    private void onMalformedMessage(@Nonnull ReplicantSession replicantSession, @Nonnull String str) {
        closeWithError(replicantSession, "Malformed message", Json.createObjectBuilder().add(TransportConstants.TYPE, "malformed-message").add("message", str).build());
    }

    private void onUnknownCommand(@Nonnull ReplicantSession replicantSession, @Nonnull JsonObject jsonObject) {
        closeWithError(replicantSession, "Unknown command", Json.createObjectBuilder().add(TransportConstants.TYPE, "unknown-command").add("command", jsonObject).build());
    }

    private void onAuthorize(@Nonnull ReplicantSession replicantSession, @Nonnull JsonObject jsonObject) {
        replicantSession.setAuthToken(jsonObject.getString("token"));
        sendOk(replicantSession.getWebSocketSession(), jsonObject.getInt(TransportConstants.REQUEST_ID));
    }

    private void onSubscribe(@Nonnull ReplicantSession replicantSession, @Nonnull JsonObject jsonObject) throws IOException, InterruptedException {
        ChannelAddress parse = ChannelAddress.parse(jsonObject.getString(TransportConstants.CHANNEL));
        ChannelMetaData channelMetaData = getChannelMetaData(parse.getChannelId());
        if (checkSubscribeRequest(replicantSession, channelMetaData, parse)) {
            subscribe(replicantSession, jsonObject.getInt(TransportConstants.REQUEST_ID), parse, extractFilter(channelMetaData, jsonObject));
        }
    }

    private boolean checkSubscribeRequest(@Nonnull ReplicantSession replicantSession, @Nonnull ChannelMetaData channelMetaData, @Nonnull ChannelAddress channelAddress) throws IOException {
        if (!channelMetaData.isExternal()) {
            sendErrorAndClose(replicantSession, "Attempted to subscribe to internal-only channel");
            return false;
        }
        if (channelAddress.hasSubChannelId() && channelMetaData.isTypeGraph()) {
            sendErrorAndClose(replicantSession, "Attempted to subscribe to type channel with instance data");
            return false;
        }
        if (channelAddress.hasSubChannelId() || !channelMetaData.isInstanceGraph()) {
            return true;
        }
        sendErrorAndClose(replicantSession, "Attempted to subscribe to instance channel without instance data");
        return false;
    }

    private void subscribe(@Nonnull ReplicantSession replicantSession, int i, @Nonnull ChannelAddress channelAddress, @Nullable Object obj) throws InterruptedException {
        ReplicationRequestUtil.sessionUpdateRequest(getRegistry(), getEntityManager(), getEndpoint(), "Subscribe(" + channelAddress + ")", replicantSession, i, () -> {
            doSubscribe(replicantSession, channelAddress, obj);
        });
    }

    private void doSubscribe(@Nonnull ReplicantSession replicantSession, @Nonnull ChannelAddress channelAddress, @Nullable Object obj) {
        EntityMessageCacheUtil.getSessionChanges().setRequired(true);
        try {
            getSessionManager().subscribe(replicantSession, channelAddress, obj);
        } catch (InterruptedException e) {
            replicantSession.closeDueToInterrupt();
        }
    }

    private void onBulkSubscribe(@Nonnull ReplicantSession replicantSession, @Nonnull JsonObject jsonObject) throws IOException, InterruptedException {
        ChannelAddress[] extractChannels = extractChannels(jsonObject);
        if (0 == extractChannels.length) {
            return;
        }
        int channelId = extractChannels[0].getChannelId();
        ChannelMetaData channelMetaData = getChannelMetaData(channelId);
        ArrayList arrayList = new ArrayList();
        for (ChannelAddress channelAddress : extractChannels) {
            if (!checkSubscribeRequest(replicantSession, channelMetaData, channelAddress)) {
                return;
            }
            if (channelAddress.getChannelId() != channelId) {
                sendErrorAndClose(replicantSession, "Bulk channel subscribe included addresses from multiple channels");
                return;
            } else {
                if (!channelAddress.hasSubChannelId()) {
                    sendErrorAndClose(replicantSession, "Bulk channel subscribe included addresses channel without sub-channel ids");
                    return;
                }
                arrayList.add(channelAddress.getSubChannelId());
            }
        }
        int i = jsonObject.getInt(TransportConstants.REQUEST_ID);
        Object extractFilter = extractFilter(channelMetaData, jsonObject);
        if (1 == extractChannels.length) {
            subscribe(replicantSession, i, extractChannels[0], extractFilter);
        } else {
            ReplicationRequestUtil.sessionUpdateRequest(getRegistry(), getEntityManager(), getEndpoint(), "BulkSubscribe(" + channelMetaData.getChannelId() + ")", replicantSession, i, () -> {
                doBulkSubscribe(replicantSession, channelId, arrayList, extractFilter);
            });
        }
    }

    private void doBulkSubscribe(@Nonnull ReplicantSession replicantSession, int i, @Nullable List<Integer> list, @Nullable Object obj) {
        EntityMessageCacheUtil.getSessionChanges().setRequired(true);
        try {
            getSessionManager().bulkSubscribe(replicantSession, i, list, obj);
        } catch (InterruptedException e) {
            replicantSession.closeDueToInterrupt();
        }
    }

    @Nonnull
    private ChannelAddress[] extractChannels(@Nonnull JsonObject jsonObject) {
        JsonArray jsonArray = jsonObject.getJsonArray("channels");
        int size = jsonArray.size();
        ChannelAddress[] channelAddressArr = new ChannelAddress[size];
        for (int i = 0; i < size; i++) {
            channelAddressArr[i] = ChannelAddress.parse(jsonArray.getString(i));
        }
        return channelAddressArr;
    }

    @Nullable
    private Object extractFilter(ChannelMetaData channelMetaData, @Nonnull JsonObject jsonObject) {
        if (!jsonObject.containsKey(TransportConstants.CHANNEL_FILTER) || jsonObject.isNull(TransportConstants.CHANNEL_FILTER)) {
            return null;
        }
        return toFilter(channelMetaData, jsonObject.getJsonObject(TransportConstants.CHANNEL_FILTER));
    }

    private void onUnsubscribe(@Nonnull ReplicantSession replicantSession, @Nonnull JsonObject jsonObject) throws IOException, InterruptedException {
        ChannelAddress parse = ChannelAddress.parse(jsonObject.getString(TransportConstants.CHANNEL));
        if (checkUnsubscribeRequest(replicantSession, getChannelMetaData(parse.getChannelId()), parse)) {
            unsubscribe(replicantSession, jsonObject.getInt(TransportConstants.REQUEST_ID), parse);
        }
    }

    private void onBulkUnsubscribe(@Nonnull ReplicantSession replicantSession, @Nonnull JsonObject jsonObject) throws IOException, InterruptedException {
        ChannelAddress[] extractChannels = extractChannels(jsonObject);
        if (0 == extractChannels.length) {
            return;
        }
        int channelId = extractChannels[0].getChannelId();
        ChannelMetaData channelMetaData = getChannelMetaData(channelId);
        ArrayList arrayList = new ArrayList();
        for (ChannelAddress channelAddress : extractChannels) {
            if (!checkUnsubscribeRequest(replicantSession, channelMetaData, channelAddress)) {
                return;
            }
            if (channelAddress.getChannelId() != channelId) {
                sendErrorAndClose(replicantSession, "Bulk channel unsubscribe included addresses from multiple channels");
                return;
            } else {
                if (!channelAddress.hasSubChannelId()) {
                    sendErrorAndClose(replicantSession, "Bulk channel unsubscribe included addresses channel without sub-channel ids");
                    return;
                }
                arrayList.add(channelAddress.getSubChannelId());
            }
        }
        int i = jsonObject.getInt(TransportConstants.REQUEST_ID);
        if (1 == extractChannels.length) {
            unsubscribe(replicantSession, i, extractChannels[0]);
        } else {
            ReplicationRequestUtil.sessionUpdateRequest(getRegistry(), getEntityManager(), getEndpoint(), "BulkUnsubscribe(" + channelMetaData.getChannelId() + ")", replicantSession, i, () -> {
                doBulkUnsubscribe(replicantSession, channelId, arrayList);
            });
        }
    }

    private void doBulkUnsubscribe(@Nonnull ReplicantSession replicantSession, int i, List<Integer> list) {
        EntityMessageCacheUtil.getSessionChanges().setRequired(true);
        try {
            getSessionManager().bulkUnsubscribe(replicantSession, i, list);
        } catch (InterruptedException e) {
            replicantSession.closeDueToInterrupt();
        }
    }

    private void unsubscribe(@Nonnull ReplicantSession replicantSession, int i, @Nonnull ChannelAddress channelAddress) throws InterruptedException {
        ReplicationRequestUtil.sessionUpdateRequest(getRegistry(), getEntityManager(), getEndpoint(), "Unsubscribe(" + channelAddress + ")", replicantSession, i, () -> {
            doUnsubscribe(replicantSession, channelAddress);
        });
    }

    private void doUnsubscribe(@Nonnull ReplicantSession replicantSession, @Nonnull ChannelAddress channelAddress) {
        EntityMessageCacheUtil.getSessionChanges().setRequired(true);
        try {
            getSessionManager().unsubscribe(replicantSession, channelAddress);
        } catch (InterruptedException e) {
            replicantSession.closeDueToInterrupt();
        }
    }

    private boolean checkUnsubscribeRequest(@Nonnull ReplicantSession replicantSession, @Nonnull ChannelMetaData channelMetaData, @Nonnull ChannelAddress channelAddress) throws IOException {
        if (!channelMetaData.isExternal()) {
            sendErrorAndClose(replicantSession, "Attempted to unsubscribe from internal-only channel");
            return false;
        }
        if (channelAddress.hasSubChannelId() && channelMetaData.isTypeGraph()) {
            sendErrorAndClose(replicantSession, "Attempted to unsubscribe from type channel with instance data");
            return false;
        }
        if (channelAddress.hasSubChannelId() || !channelMetaData.isInstanceGraph()) {
            return true;
        }
        sendErrorAndClose(replicantSession, "Attempted to unsubscribe from instance channel without instance data");
        return false;
    }

    @Nullable
    private ReplicantSession findReplicantSession(@Nonnull Session session) {
        try {
            return getSessionManager().getSession(session.getId());
        } catch (Throwable th) {
            return null;
        }
    }

    @Nonnull
    private ReplicantSession getReplicantSession(@Nonnull Session session) {
        ReplicantSession findReplicantSession = findReplicantSession(session);
        if (null != findReplicantSession) {
            return findReplicantSession;
        }
        throw new IllegalStateException("Unable to locate ReplicantSession for WebSocket session " + session.getId());
    }

    @OnError
    public void onError(@Nonnull Session session, @Nonnull Throwable th) throws IOException {
        if (LOG.isLoggable(Level.INFO)) {
            LOG.log(Level.INFO, "Error on WebSocket Session " + session.getId(), th);
        }
        sendErrorAndClose(session, th.toString());
    }

    private void sendErrorAndClose(@Nonnull ReplicantSession replicantSession, @Nonnull String str) throws IOException {
        sendErrorAndClose(replicantSession.getWebSocketSession(), str);
    }

    private void sendErrorAndClose(@Nonnull Session session, @Nonnull String str) throws IOException {
        if (session.isOpen()) {
            WebSocketUtil.sendJsonObject(session, Json.createObjectBuilder().add(TransportConstants.TYPE, "error").add("message", str).build());
            session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "Unexpected error"));
        }
        ReplicantSession findReplicantSession = findReplicantSession(session);
        if (null != findReplicantSession) {
            closeReplicantSession(findReplicantSession);
        }
    }

    @OnClose
    public void onClose(@Nonnull Session session) {
        ReplicantSession findReplicantSession = findReplicantSession(session);
        if (null == findReplicantSession) {
            LOG.log(Level.FINE, () -> {
                return "Closing WebSocket Session " + session.getId() + " but no replicant session found. This can occur except during application undeploy or when the session has errored.";
            });
        } else {
            LOG.log(Level.FINE, () -> {
                return "Closing WebSocket Session " + session.getId() + " for replicant session " + findReplicantSession.getId();
            });
            closeReplicantSession(findReplicantSession);
        }
    }

    @Nullable
    private Object toFilter(@Nonnull ChannelMetaData channelMetaData, @Nonnull JsonObject jsonObject) {
        if (channelMetaData.hasFilterParameter()) {
            return parseFilter(channelMetaData, jsonObject);
        }
        return null;
    }

    @Nonnull
    private Object parseFilter(@Nonnull ChannelMetaData channelMetaData, @Nonnull JsonObject jsonObject) {
        try {
            return this._jsonMapper.readValue(jsonObject.toString(), channelMetaData.getFilterParameterType());
        } catch (IOException e) {
            throw new IllegalArgumentException("Unable to parse filter: " + jsonObject, e);
        }
    }

    @Nonnull
    private ChannelMetaData getChannelMetaData(int i) {
        return getSessionManager().getSystemMetaData().getChannelMetaData(i);
    }

    private void closeWithError(@Nonnull ReplicantSession replicantSession, @Nonnull String str, @Nonnull JsonObject jsonObject) {
        WebSocketUtil.sendJsonObject(replicantSession.getWebSocketSession(), jsonObject);
        replicantSession.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, str));
        closeReplicantSession(replicantSession);
    }

    private void closeReplicantSession(@Nonnull ReplicantSession replicantSession) {
        onSessionClose(replicantSession);
        getSessionManager().invalidateSession(replicantSession);
    }
}
