package com.gs.fw.common.mithra.notification;

import com.gs.fw.common.mithra.MithraDataObject;
import com.gs.fw.common.mithra.MithraManagerProvider;
import com.gs.fw.common.mithra.MithraObjectPortal;
import com.gs.fw.common.mithra.MithraTransaction;
import com.gs.fw.common.mithra.MithraTransactionalObject;
import com.gs.fw.common.mithra.attribute.Attribute;
import com.gs.fw.common.mithra.attribute.update.AttributeUpdateWrapper;
import com.gs.fw.common.mithra.finder.Operation;
import com.gs.fw.common.mithra.finder.RelatedFinder;
import com.gs.fw.common.mithra.notification.listener.MithraApplicationClassLevelNotificationListener;
import com.gs.fw.common.mithra.notification.listener.MithraApplicationNotificationListener;
import com.gs.fw.common.mithra.notification.listener.MithraNotificationListener;
import com.gs.fw.common.mithra.transaction.MultiUpdateOperation;
import com.gs.fw.common.mithra.transaction.UpdateOperation;
import com.gs.fw.common.mithra.util.ListFactory;
import com.gs.fw.common.mithra.util.MithraProcessInfo;
import com.gs.fw.common.mithra.util.lz4.LZ4BlockInputStream;
import com.gs.fw.common.mithra.util.lz4.LZ4BlockOutputStream;
import com.gs.reladomo.metadata.ReladomoClassMetaData;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.collections.api.list.FixedSizeList;
import org.eclipse.collections.impl.map.mutable.UnifiedMap;
import org.eclipse.collections.impl.set.mutable.UnifiedSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/gs/fw/common/mithra/notification/MithraNotificationEventManagerImpl.class */
public class MithraNotificationEventManagerImpl implements MithraNotificationEventManager, MithraNotificationMessageHandler {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) MithraNotificationEventManagerImpl.class);
    private static final String MESSAGE_PROTOCOL_VERSION = "P0-";
    private Map<RegistrationKey, RegistrationEntryList> mithraApplicationNotificationSubscriber;
    private Map mithraListToNotificationListenerMap;
    private Map mithraListToDatabaseIdentifierMap;
    private ConcurrentHashMap<RegistrationKey, MithraNotificationListener> mithraNotificationSubscriber;
    private MithraMessagingAdapterFactory adapterFactory;
    private Map<String, MithraNotificationMessagingAdapter> subjectToAdapterMap;
    private Map<String, List<MithraNotificationEvent>> mithraNoTxNotificationEvents;
    private LinkedBlockingQueue channel;
    private ExecutorService queuedExecutor;
    private ScheduledExecutorService clockDaemon;
    private static final int PERIOD = 100;
    private LZ4BlockOutputStream lz4BlockOutputStream;
    private LZ4BlockInputStream lz4BlockInputStream;
    private volatile boolean shutdown;
    private Thread shutdownHook;

    /* loaded from: input_file:com/gs/fw/common/mithra/notification/MithraNotificationEventManagerImpl$QueueMarker.class */
    private static class QueueMarker implements Runnable {
        private volatile boolean ran;

        private QueueMarker() {
            this.ran = false;
        }

        @Override // java.lang.Runnable
        public void run() {
            synchronized (this) {
                this.ran = true;
                notifyAll();
            }
        }

        public synchronized void waitUntilDone() {
            while (!this.ran) {
                try {
                    wait();
                } catch (InterruptedException e) {
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/gs/fw/common/mithra/notification/MithraNotificationEventManagerImpl$RegistrationEntryList.class */
    public static class RegistrationEntryList {
        private List<MithraApplicationNotificationRegistrationEntry> registrationList;
        private final ReferenceQueue queue;

        private RegistrationEntryList() {
            this.registrationList = new ArrayList();
            this.queue = new ReferenceQueue();
        }

        public void addRegistrationEntry(List list, MithraApplicationNotificationListener mithraApplicationNotificationListener, Operation operation) {
            this.registrationList.add(operation != null ? new OperationBasedNotificationRegistrationEntry(operation, list, mithraApplicationNotificationListener, this.queue) : new IndexBasedNotificationRegistrationEntry(list, mithraApplicationNotificationListener, this.queue));
            if (MithraNotificationEventManagerImpl.logger.isDebugEnabled()) {
                MithraNotificationEventManagerImpl.logger.debug("*************** Added registration entry to registrationEntryList***************");
            }
        }

        public void addClassLevelRegistrationEntry(MithraApplicationClassLevelNotificationListener mithraApplicationClassLevelNotificationListener) {
            this.registrationList.add(new ClassLevelNotificationRegistrationEntry(mithraApplicationClassLevelNotificationListener));
            if (MithraNotificationEventManagerImpl.logger.isDebugEnabled()) {
                MithraNotificationEventManagerImpl.logger.debug("*************** Added class-level registration entry to registrationEntryList***************");
            }
        }

        public void processNotification(MithraNotificationEvent mithraNotificationEvent) {
            expungeStaleEntries();
            for (int i = 0; i < this.registrationList.size(); i++) {
                MithraApplicationNotificationRegistrationEntry mithraApplicationNotificationRegistrationEntry = this.registrationList.get(i);
                if (mithraApplicationNotificationRegistrationEntry != null) {
                    try {
                        mithraApplicationNotificationRegistrationEntry.processNotification(mithraNotificationEvent);
                    } catch (Throwable th) {
                        MithraNotificationEventManagerImpl.logger.error("Error in application notification listener", th);
                    }
                }
            }
        }

        private void expungeStaleEntries() {
            while (true) {
                Reference poll = this.queue.poll();
                if (poll == null) {
                    return;
                } else {
                    this.registrationList.remove(poll);
                }
            }
        }
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public long getMithraVmId() {
        return MithraProcessInfo.getVmId();
    }

    public MithraNotificationEventManagerImpl(MithraMessagingAdapterFactory mithraMessagingAdapterFactory) {
        this(mithraMessagingAdapterFactory, true);
    }

    public MithraNotificationEventManagerImpl(MithraMessagingAdapterFactory mithraMessagingAdapterFactory, boolean z) {
        this.mithraApplicationNotificationSubscriber = UnifiedMap.newMap();
        this.mithraListToNotificationListenerMap = new IdentityHashMap();
        this.mithraListToDatabaseIdentifierMap = new IdentityHashMap();
        this.mithraNotificationSubscriber = new ConcurrentHashMap<>();
        this.subjectToAdapterMap = UnifiedMap.newMap();
        this.mithraNoTxNotificationEvents = UnifiedMap.newMap();
        this.lz4BlockOutputStream = new LZ4BlockOutputStream(null, false);
        this.lz4BlockInputStream = new LZ4BlockInputStream(null);
        this.shutdownHook = null;
        initializeNotificationHelperThreads();
        this.adapterFactory = mithraMessagingAdapterFactory;
        if (z) {
            setupShutdownHook();
        }
    }

    private void setupShutdownHook() {
        Thread thread = new Thread() { // from class: com.gs.fw.common.mithra.notification.MithraNotificationEventManagerImpl.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                if (MithraNotificationEventManagerImpl.this.shutdown) {
                    return;
                }
                MithraNotificationEventManagerImpl.this.forceSendNow();
                MithraNotificationEventManagerImpl.this.shutdown(true);
            }
        };
        this.shutdownHook = thread;
        Runtime.getRuntime().addShutdownHook(thread);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized MithraNotificationMessagingAdapter getOrCreateAdapter(String str) {
        MithraNotificationMessagingAdapter mithraNotificationMessagingAdapter = this.subjectToAdapterMap.get(str);
        if (null == mithraNotificationMessagingAdapter) {
            if (logger.isDebugEnabled()) {
                logger.debug("Creating messaging adapter for subject: " + str);
            }
            mithraNotificationMessagingAdapter = this.adapterFactory.createMessagingAdapter(encodeSubject(str));
            mithraNotificationMessagingAdapter.setMessageProcessor(this);
            this.subjectToAdapterMap.put(str, mithraNotificationMessagingAdapter);
        }
        return mithraNotificationMessagingAdapter;
    }

    private String encodeSubject(String str) {
        return MESSAGE_PROTOCOL_VERSION + str;
    }

    private String decodeSubject(String str) {
        return str.substring(MESSAGE_PROTOCOL_VERSION.length());
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public void registerForApplicationNotification(String str, MithraApplicationNotificationListener mithraApplicationNotificationListener, RelatedFinder relatedFinder, List list, Operation operation) {
        Runnable applicationNotificationSubscriptionRunnable = getApplicationNotificationSubscriptionRunnable(str, relatedFinder, mithraApplicationNotificationListener, list, operation);
        if (logger.isDebugEnabled()) {
            logger.debug("***************** Adding List Notification Registration task to queue ***************************");
        }
        addTaskToQueue(applicationNotificationSubscriptionRunnable);
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public void registerForApplicationClassLevelNotification(String str, MithraApplicationClassLevelNotificationListener mithraApplicationClassLevelNotificationListener, RelatedFinder relatedFinder) {
        Runnable applicationClassLevelNotificationSubscriptionRunnable = getApplicationClassLevelNotificationSubscriptionRunnable(str, relatedFinder, mithraApplicationClassLevelNotificationListener);
        if (logger.isDebugEnabled()) {
            logger.debug("***************** Adding Class-Level Notification Registration task to queue ***************************");
        }
        addTaskToQueue(applicationClassLevelNotificationSubscriptionRunnable);
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public void registerForNotification(String str, MithraObjectPortal mithraObjectPortal) {
        RegistrationKey registrationKey = new RegistrationKey(str, mithraObjectPortal.getFinder().getFinderClassName());
        if (this.mithraNotificationSubscriber.get(registrationKey) == null) {
            MithraNotificationListener createNotificationListener = mithraObjectPortal.getCache().createNotificationListener(mithraObjectPortal);
            if (this.mithraNotificationSubscriber.putIfAbsent(registrationKey, createNotificationListener) == null) {
                Runnable notificationAdapterRunnable = getNotificationAdapterRunnable(str);
                if (logger.isDebugEnabled()) {
                    logger.debug("Registered :" + createNotificationListener.getFinderClassname() + " for subject: " + str);
                    logger.debug("***************** Adding Mithra Notification Registration task to queue ***************************");
                }
                addTaskToQueue(notificationAdapterRunnable);
            }
        }
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public void initializeFrom(MithraNotificationEventManager mithraNotificationEventManager) {
        UnifiedSet<RegistrationKey> newSet = UnifiedSet.newSet(mithraNotificationEventManager.getExistingRegistrations());
        mithraNotificationEventManager.shutdown();
        for (RegistrationKey registrationKey : newSet) {
            ReladomoClassMetaData fromFinderClassName = ReladomoClassMetaData.fromFinderClassName(registrationKey.getClassname());
            if (MithraManagerProvider.getMithraManager().getConfigManager().isClassConfigured(fromFinderClassName.getBusinessOrInterfaceClassName())) {
                registerForNotification(registrationKey.getSubject(), fromFinderClassName.getFinderInstance().getMithraObjectPortal());
            }
        }
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public Set<RegistrationKey> getExistingRegistrations() {
        return this.mithraNotificationSubscriber.keySet();
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public Map getMithraListToNotificationListenerMap() {
        return this.mithraListToNotificationListenerMap;
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public Map getMithraListToDatabaseIdentiferMap() {
        return this.mithraListToDatabaseIdentifierMap;
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public void broadcastNotificationMessage(Map map, long j) {
        Runnable sendMithraNotificationMessageRunnable = getSendMithraNotificationMessageRunnable(map, j);
        if (logger.isDebugEnabled()) {
            logger.debug("***************** Adding BroadcastNotificationMessage task to queue ***************************");
        }
        addTaskToQueue(sendMithraNotificationMessageRunnable);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public byte[] convertObjectToBytes(Object obj) throws IOException {
        byte[] byteArray;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(200);
        synchronized (this.lz4BlockOutputStream) {
            this.lz4BlockOutputStream.reset(byteArrayOutputStream);
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(this.lz4BlockOutputStream);
            objectOutputStream.writeObject(obj);
            objectOutputStream.flush();
            this.lz4BlockOutputStream.finish();
            byteArrayOutputStream.flush();
            byteArray = byteArrayOutputStream.toByteArray();
            byteArrayOutputStream.close();
        }
        return byteArray;
    }

    private Object convertBytesToObject(byte[] bArr) throws IOException, ClassNotFoundException {
        Object readObject;
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
        synchronized (this.lz4BlockInputStream) {
            this.lz4BlockInputStream.reset(byteArrayInputStream);
            ObjectInputStream objectInputStream = new ObjectInputStream(this.lz4BlockInputStream);
            readObject = objectInputStream.readObject();
            objectInputStream.close();
            this.lz4BlockInputStream.close();
            byteArrayInputStream.close();
        }
        return readObject;
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationMessageHandler
    public void processNotificationMessage(String str, byte[] bArr) {
        String decodeSubject = decodeSubject(str);
        try {
            ExternalizableMithraNotificationMessage externalizableMithraNotificationMessage = (ExternalizableMithraNotificationMessage) convertBytesToObject(bArr);
            long mithraVmId = externalizableMithraNotificationMessage.getMithraVmId();
            long requestorVmId = externalizableMithraNotificationMessage.getRequestorVmId();
            if (logger.isDebugEnabled()) {
                logger.debug("***************** Mithra: " + MithraProcessInfo.getVmId() + " received message sent by Mithra: " + mithraVmId + " on behalf of Mithra: " + requestorVmId + "  with topic: " + decodeSubject);
            }
            if (MithraProcessInfo.getVmId() != mithraVmId && MithraProcessInfo.getVmId() != requestorVmId) {
                Runnable processIncomingMessagesRunnable = getProcessIncomingMessagesRunnable(decodeSubject, externalizableMithraNotificationMessage.getNotificationEvents());
                if (logger.isDebugEnabled()) {
                    logger.debug("***************** Mithra: " + MithraProcessInfo.getVmId() + " will process message received from Mithra: " + mithraVmId + " with topic: " + decodeSubject);
                }
                addTaskToQueue(processIncomingMessagesRunnable);
            }
        } catch (IOException e) {
            logger.error("Unable to deserialize Mithra notification message for subject " + decodeSubject, (Throwable) e);
        } catch (ClassNotFoundException e2) {
            logger.error("Unable to deserialize Mithra notification message for subject " + decodeSubject, (Throwable) e2);
        }
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public void addMithraNotificationEvent(String str, String str2, byte b, MithraDataObject mithraDataObject, Object obj) {
        createAndAddNotificationEvent(str, str2, b, new MithraDataObject[]{mithraDataObject}, null, null, obj);
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public void addMithraNotificationEvent(String str, String str2, byte b, List list, Object obj) {
        MithraDataObject[] mithraDataObjectArr = new MithraDataObject[list.size()];
        for (int i = 0; i < list.size(); i++) {
            mithraDataObjectArr[i] = ((MithraTransactionalObject) list.get(i)).zGetTxDataForRead();
        }
        createAndAddNotificationEvent(str, str2, b, mithraDataObjectArr, null, null, obj);
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public void addMithraNotificationEventForBatchUpdate(String str, String str2, byte b, List list, List list2, Object obj) {
        MithraDataObject[] mithraDataObjectArr = new MithraDataObject[list.size()];
        for (int i = 0; i < list.size(); i++) {
            mithraDataObjectArr[i] = ((UpdateOperation) list.get(i)).getMithraObject().zGetTxDataForRead();
        }
        createAndAddNotificationEvent(str, str2, b, mithraDataObjectArr, list2, null, obj);
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public void addMithraNotificationEventForMultiUpdate(String str, String str2, byte b, MultiUpdateOperation multiUpdateOperation, Object obj) {
        createAndAddNotificationEvent(str, str2, b, multiUpdateOperation.getDataObjectsForNotification(), multiUpdateOperation.getUpdates(), null, obj);
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public void addMithraNotificationEventForMassDelete(String str, String str2, byte b, Operation operation) {
        createAndAddNotificationEvent(str, str2, b, null, null, operation, null);
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public void addMithraNotificationEventForUpdate(String str, String str2, byte b, MithraDataObject mithraDataObject, List list, Object obj) {
        createAndAddNotificationEvent(str, str2, b, new MithraDataObject[]{mithraDataObject}, list, null, obj);
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public void addMithraNotificationEventForUpdate(String str, String str2, byte b, MithraDataObject mithraDataObject, AttributeUpdateWrapper attributeUpdateWrapper, Object obj) {
        MithraDataObject[] mithraDataObjectArr = {mithraDataObject};
        FixedSizeList fixedSizeList = null;
        if (attributeUpdateWrapper != null) {
            fixedSizeList = ListFactory.create(attributeUpdateWrapper);
        }
        createAndAddNotificationEvent(str, str2, b, mithraDataObjectArr, fixedSizeList, null, obj);
    }

    private void createAndAddNotificationEvent(String str, String str2, byte b, MithraDataObject[] mithraDataObjectArr, List list, Operation operation, Object obj) {
        addNotificationEvent(str, createNotificationEvent(str2, b, mithraDataObjectArr, list, operation, obj));
    }

    private MithraNotificationEvent createNotificationEvent(String str, byte b, MithraDataObject[] mithraDataObjectArr, List list, Operation operation, Object obj) {
        Attribute[] attributeArr = null;
        if (list != null) {
            attributeArr = new Attribute[list.size()];
            for (int i = 0; i < list.size(); i++) {
                attributeArr[i] = ((AttributeUpdateWrapper) list.get(i)).getAttribute();
            }
        }
        return new MithraNotificationEvent(str, b, mithraDataObjectArr, attributeArr, operation, obj);
    }

    private void addNotificationEvent(String str, MithraNotificationEvent mithraNotificationEvent) {
        MithraTransaction currentTransaction = MithraManagerProvider.getMithraManager().getCurrentTransaction();
        if (null == currentTransaction) {
            addNoTxMithraNotificationEvent(str, mithraNotificationEvent);
        } else {
            currentTransaction.addMithraNotificationEvent(str, mithraNotificationEvent);
        }
    }

    private synchronized void addNoTxMithraNotificationEvent(String str, MithraNotificationEvent mithraNotificationEvent) {
        List<MithraNotificationEvent> list = this.mithraNoTxNotificationEvents.get(str);
        if (list == null) {
            list = new ArrayList();
            this.mithraNoTxNotificationEvents.put(str, list);
        }
        list.add(mithraNotificationEvent);
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public void clearNotificationSubscribers() {
        this.mithraNotificationSubscriber.clear();
        this.mithraNoTxNotificationEvents.clear();
        this.mithraListToNotificationListenerMap.clear();
        this.mithraApplicationNotificationSubscriber.clear();
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public void waitUntilCurrentNotificationTasksAreDone() {
        QueueMarker queueMarker = new QueueMarker();
        addTaskToQueue(queueMarker);
        queueMarker.waitUntilDone();
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public List getNotificationSubscribers() {
        Set<RegistrationKey> keySet = this.mithraNotificationSubscriber.keySet();
        ArrayList arrayList = new ArrayList(keySet.size());
        Iterator<RegistrationKey> it = keySet.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getClassname());
        }
        return arrayList;
    }

    private Runnable getApplicationNotificationSubscriptionRunnable(final String str, final RelatedFinder relatedFinder, final MithraApplicationNotificationListener mithraApplicationNotificationListener, final List list, final Operation operation) {
        return new Runnable() { // from class: com.gs.fw.common.mithra.notification.MithraNotificationEventManagerImpl.2
            @Override // java.lang.Runnable
            public void run() {
                MithraNotificationEventManagerImpl.this.getRegistrationEntryList(str, relatedFinder).addRegistrationEntry(list, mithraApplicationNotificationListener, operation);
            }
        };
    }

    private Runnable getApplicationClassLevelNotificationSubscriptionRunnable(final String str, final RelatedFinder relatedFinder, final MithraApplicationClassLevelNotificationListener mithraApplicationClassLevelNotificationListener) {
        return new Runnable() { // from class: com.gs.fw.common.mithra.notification.MithraNotificationEventManagerImpl.3
            @Override // java.lang.Runnable
            public void run() {
                MithraNotificationEventManagerImpl.this.getRegistrationEntryList(str, relatedFinder).addClassLevelRegistrationEntry(mithraApplicationClassLevelNotificationListener);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RegistrationEntryList getRegistrationEntryList(String str, RelatedFinder relatedFinder) {
        RegistrationKey registrationKey = new RegistrationKey(str, relatedFinder.getFinderClassName());
        if (logger.isDebugEnabled()) {
            logger.debug("**************Adding application notification registration with key:  " + registrationKey.toString());
        }
        RegistrationEntryList registrationEntryList = this.mithraApplicationNotificationSubscriber.get(registrationKey);
        if (registrationEntryList == null) {
            registrationEntryList = new RegistrationEntryList();
            this.mithraApplicationNotificationSubscriber.put(registrationKey, registrationEntryList);
        }
        return registrationEntryList;
    }

    private Runnable getNotificationAdapterRunnable(final String str) {
        return new Runnable() { // from class: com.gs.fw.common.mithra.notification.MithraNotificationEventManagerImpl.4
            @Override // java.lang.Runnable
            public void run() {
                MithraNotificationEventManagerImpl.this.getOrCreateAdapter(str);
            }
        };
    }

    private Runnable getSendMithraNotificationMessageRunnable(final Map map, final long j) {
        return new Runnable() { // from class: com.gs.fw.common.mithra.notification.MithraNotificationEventManagerImpl.5
            @Override // java.lang.Runnable
            public void run() {
                for (String str : map.keySet()) {
                    ExternalizableMithraNotificationMessage externalizableMithraNotificationMessage = new ExternalizableMithraNotificationMessage((ArrayList) map.get(str));
                    externalizableMithraNotificationMessage.setMithraVmId(MithraProcessInfo.getVmId());
                    externalizableMithraNotificationMessage.setRequestorVmId(j);
                    if (MithraNotificationEventManagerImpl.logger.isDebugEnabled()) {
                        MithraNotificationEventManagerImpl.logger.debug("***************** Mithra: " + MithraProcessInfo.getVmId() + " sending message with topic: " + str);
                    }
                    try {
                        MithraNotificationEventManagerImpl.this.getOrCreateAdapter(str).broadcastMessage(MithraNotificationEventManagerImpl.this.convertObjectToBytes(externalizableMithraNotificationMessage));
                    } catch (IOException e) {
                        throw new RuntimeException("Unable to serialize Mithra notification message");
                    }
                }
            }
        };
    }

    public synchronized Map<String, List<MithraNotificationEvent>> getNotificationsToSend() {
        Map<String, List<MithraNotificationEvent>> map = this.mithraNoTxNotificationEvents;
        if (map.isEmpty()) {
            map = Collections.EMPTY_MAP;
        } else {
            this.mithraNoTxNotificationEvents = UnifiedMap.newMap();
        }
        return map;
    }

    private Runnable getSendNoTxNotificationMessageBatchRunnable() {
        return new Runnable() { // from class: com.gs.fw.common.mithra.notification.MithraNotificationEventManagerImpl.6
            @Override // java.lang.Runnable
            public void run() {
                Map<String, List<MithraNotificationEvent>> notificationsToSend = MithraNotificationEventManagerImpl.this.getNotificationsToSend();
                if (notificationsToSend.isEmpty()) {
                    return;
                }
                for (String str : notificationsToSend.keySet()) {
                    ExternalizableMithraNotificationMessage externalizableMithraNotificationMessage = new ExternalizableMithraNotificationMessage(notificationsToSend.get(str));
                    externalizableMithraNotificationMessage.setMithraVmId(MithraProcessInfo.getVmId());
                    externalizableMithraNotificationMessage.setRequestorVmId(MithraProcessInfo.getVmId());
                    try {
                        if (MithraNotificationEventManagerImpl.logger.isDebugEnabled()) {
                            MithraNotificationEventManagerImpl.logger.debug("***************** Mithra: " + MithraProcessInfo.getVmId() + " sending message with topic: " + str);
                        }
                        MithraNotificationEventManagerImpl.this.getOrCreateAdapter(str).broadcastMessage(MithraNotificationEventManagerImpl.this.convertObjectToBytes(externalizableMithraNotificationMessage));
                    } catch (IOException e) {
                        throw new RuntimeException("Unable to serialize Mithra notification message");
                    }
                }
            }
        };
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public void processNotificationEvents(String str, List<MithraNotificationEvent> list) {
        for (int i = 0; i < list.size(); i++) {
            MithraNotificationEvent mithraNotificationEvent = list.get(i);
            RegistrationKey registrationKey = new RegistrationKey(str, mithraNotificationEvent.getClassname());
            MithraNotificationListener mithraNotificationListener = this.mithraNotificationSubscriber.get(registrationKey);
            if (mithraNotificationListener != null) {
                try {
                    if (10 == mithraNotificationEvent.getDatabaseOperation()) {
                        mithraNotificationListener.onInsert(mithraNotificationEvent.getDataObjects(), mithraNotificationEvent.getSourceAttribute());
                    } else if (20 == mithraNotificationEvent.getDatabaseOperation()) {
                        mithraNotificationListener.onUpdate(mithraNotificationEvent.getDataObjects(), mithraNotificationEvent.getUpdatedAttributes(), mithraNotificationEvent.getSourceAttribute());
                    } else if (30 == mithraNotificationEvent.getDatabaseOperation()) {
                        mithraNotificationListener.onDelete(mithraNotificationEvent.getDataObjects());
                    } else if (40 == mithraNotificationEvent.getDatabaseOperation()) {
                        mithraNotificationListener.onMassDelete(mithraNotificationEvent.getOperationForMassDelete());
                    }
                } catch (Throwable th) {
                    logger.error("Error in Mithra notification listener", th);
                }
            } else if (logger.isDebugEnabled()) {
                logger.debug("There is no listener registered to process this incoming message for class " + mithraNotificationEvent.getClassname());
            }
            processApplicationNotification(registrationKey, mithraNotificationEvent);
        }
    }

    private void processApplicationNotification(RegistrationKey registrationKey, MithraNotificationEvent mithraNotificationEvent) {
        RegistrationEntryList registrationEntryList = this.mithraApplicationNotificationSubscriber.get(registrationKey);
        if (registrationEntryList != null) {
            registrationEntryList.processNotification(mithraNotificationEvent);
        }
    }

    private Runnable getProcessIncomingMessagesRunnable(final String str, final List<MithraNotificationEvent> list) {
        return new Runnable() { // from class: com.gs.fw.common.mithra.notification.MithraNotificationEventManagerImpl.7
            @Override // java.lang.Runnable
            public void run() {
                if (MithraNotificationEventManagerImpl.logger.isDebugEnabled()) {
                    MithraNotificationEventManagerImpl.logger.debug("Started processing incoming notification message");
                }
                MithraNotificationEventManagerImpl.this.processNotificationEvents(str, list);
            }
        };
    }

    private void addTaskToQueue(Runnable runnable) {
        this.queuedExecutor.execute(runnable);
    }

    private void initializeNotificationHelperThreads() {
        this.channel = new LinkedBlockingQueue();
        this.queuedExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, this.channel, new ThreadFactory() { // from class: com.gs.fw.common.mithra.notification.MithraNotificationEventManagerImpl.8
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable);
                thread.setDaemon(true);
                thread.setPriority(5);
                thread.setName("MithraNotificationThread");
                return thread;
            }
        });
        this.clockDaemon = Executors.newScheduledThreadPool(1, new ThreadFactory() { // from class: com.gs.fw.common.mithra.notification.MithraNotificationEventManagerImpl.9
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable);
                thread.setDaemon(true);
                thread.setPriority(5);
                thread.setName("NoTxMithraNotificationThread");
                return thread;
            }
        });
        this.clockDaemon.scheduleAtFixedRate(getSendNoTxNotificationMessageBatchRunnable(), 0L, 100L, TimeUnit.MILLISECONDS);
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public void shutdown() {
        shutdown(false);
    }

    protected void shutdown(boolean z) {
        this.shutdown = true;
        if (this.queuedExecutor != null) {
            this.queuedExecutor.shutdownNow();
        }
        if (this.clockDaemon != null) {
            this.clockDaemon.shutdownNow();
        }
        Iterator<MithraNotificationMessagingAdapter> it = this.subjectToAdapterMap.values().iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        this.adapterFactory.shutdown();
        if (!z && this.shutdownHook != null) {
            try {
                Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
            } catch (IllegalStateException e) {
                logger.info("The virtual machine is already in the process of shutting down.", (Throwable) e);
            }
        }
        this.shutdownHook = null;
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public boolean isQueuedExecutorChannelEmpty() {
        return this.channel.isEmpty();
    }

    @Override // com.gs.fw.common.mithra.notification.MithraNotificationEventManager
    public void forceSendNow() {
        getSendNoTxNotificationMessageBatchRunnable().run();
        while (!this.channel.isEmpty()) {
            ArrayList arrayList = new ArrayList(this.channel.size());
            this.channel.drainTo(arrayList);
            for (int i = 0; i < arrayList.size(); i++) {
                ((Runnable) arrayList.get(i)).run();
            }
        }
    }
}
