package com.ning.billing.util.notificationq;

import com.ning.billing.config.NotificationConfig;
import com.ning.billing.util.callcontext.CallOrigin;
import com.ning.billing.util.callcontext.InternalCallContext;
import com.ning.billing.util.callcontext.InternalCallContextFactory;
import com.ning.billing.util.callcontext.UserType;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.notificationq.NotificationQueueService;
import com.ning.billing.util.notificationq.dao.NotificationSqlDao;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ning/billing/util/notificationq/DefaultNotificationQueue.class */
public class DefaultNotificationQueue extends NotificationQueueBase {
    private static final Logger log = LoggerFactory.getLogger(DefaultNotificationQueue.class);
    private final NotificationSqlDao dao;
    private final InternalCallContextFactory internalCallContextFactory;

    public DefaultNotificationQueue(IDBI idbi, Clock clock, String str, String str2, NotificationQueueService.NotificationQueueHandler notificationQueueHandler, NotificationConfig notificationConfig, InternalCallContextFactory internalCallContextFactory) {
        super(clock, str, str2, notificationQueueHandler, notificationConfig);
        this.dao = (NotificationSqlDao) idbi.onDemand(NotificationSqlDao.class);
        this.internalCallContextFactory = internalCallContextFactory;
    }

    @Override // com.ning.billing.util.notificationq.NotificationQueueBase, com.ning.billing.util.queue.PersistentQueueBase
    public int doProcessEvents() {
        logDebug("ENTER doProcessEvents", new Object[0]);
        List<Notification> readyNotifications = getReadyNotifications(createCallContext(null, null));
        if (readyNotifications.size() == 0) {
            logDebug("EXIT doProcessEvents", new Object[0]);
            return 0;
        }
        logDebug("START processing %d events at time %s", Integer.valueOf(readyNotifications.size()), getClock().getUTCNow().toDate());
        int i = 0;
        for (Notification notification : readyNotifications) {
            getNbProcessedEvents().incrementAndGet();
            logDebug("handling notification %s, key = %s for time %s", notification.getId(), notification.getNotificationKey(), notification.getEffectiveDate());
            getHandler().handleReadyNotification((NotificationKey) deserializeEvent(notification.getNotificationKeyClass(), notification.getNotificationKey()), notification.getEffectiveDate(), notification.getAccountRecordId(), notification.getTenantRecordId());
            i++;
            clearNotification(notification, createCallContext(notification.getTenantRecordId(), notification.getAccountRecordId()));
            logDebug("done handling notification %s, key = %s for time %s", notification.getId(), notification.getNotificationKey(), notification.getEffectiveDate());
        }
        return i;
    }

    @Override // com.ning.billing.util.notificationq.NotificationQueue
    public void recordFutureNotification(DateTime dateTime, UUID uuid, NotificationKey notificationKey, InternalCallContext internalCallContext) throws IOException {
        recordFutureNotificationInternal(dateTime, uuid, notificationKey, this.dao, internalCallContext);
    }

    @Override // com.ning.billing.util.notificationq.NotificationQueue
    public void recordFutureNotificationFromTransaction(Transmogrifier transmogrifier, DateTime dateTime, UUID uuid, NotificationKey notificationKey, InternalCallContext internalCallContext) throws IOException {
        recordFutureNotificationInternal(dateTime, uuid, notificationKey, (NotificationSqlDao) transmogrifier.become(NotificationSqlDao.class), internalCallContext);
    }

    private void recordFutureNotificationInternal(DateTime dateTime, UUID uuid, NotificationKey notificationKey, NotificationSqlDao notificationSqlDao, InternalCallContext internalCallContext) throws IOException {
        notificationSqlDao.insertNotification(new DefaultNotification(getFullQName(), getHostname(), notificationKey.getClass().getName(), this.objectMapper.writeValueAsString(notificationKey), uuid, dateTime, internalCallContext.getAccountRecordId(), internalCallContext.getTenantRecordId()), internalCallContext);
    }

    private void clearNotification(Notification notification, InternalCallContext internalCallContext) {
        this.dao.clearNotification(notification.getId().toString(), getHostname(), internalCallContext);
    }

    private List<Notification> getReadyNotifications(InternalCallContext internalCallContext) {
        Date date = getClock().getUTCNow().toDate();
        Date date2 = getClock().getUTCNow().plus(300000L).toDate();
        List<Notification> readyNotifications = this.dao.getReadyNotifications(date, getHostname(), NotificationQueueBase.CLAIM_TIME_MS, getFullQName(), internalCallContext);
        ArrayList<Notification> arrayList = new ArrayList();
        for (Notification notification : readyNotifications) {
            logDebug("about to claim notification %s,  key = %s for time %s", notification.getId(), notification.getNotificationKey(), notification.getEffectiveDate());
            boolean z = this.dao.claimNotification(getHostname(), date2, notification.getId().toString(), date, internalCallContext) == 1;
            logDebug("claimed notification %s, key = %s for time %s result = %s", notification.getId(), notification.getNotificationKey(), notification.getEffectiveDate(), Boolean.valueOf(z));
            if (z) {
                arrayList.add(notification);
                this.dao.insertClaimedHistory(getHostname(), date, notification.getId().toString(), internalCallContext);
            }
        }
        for (Notification notification2 : arrayList) {
            if (notification2.getOwner() != null && !notification2.getOwner().equals(getHostname())) {
                log.warn("NotificationQueue {} stealing notification {} from {}", new Object[]{getFullQName(), notification2, notification2.getOwner()});
            }
        }
        return arrayList;
    }

    private void logDebug(String str, Object... objArr) {
        if (log.isDebugEnabled()) {
            log.debug(String.format("Thread %d [queue = %s] %s", Long.valueOf(Thread.currentThread().getId()), getFullQName(), String.format(str, objArr)));
        }
    }

    @Override // com.ning.billing.util.notificationq.NotificationQueue
    public void removeNotificationsByKey(NotificationKey notificationKey, InternalCallContext internalCallContext) {
        this.dao.removeNotificationsByKey(notificationKey.toString(), internalCallContext);
    }

    @Override // com.ning.billing.util.notificationq.NotificationQueue
    public List<Notification> getNotificationForAccountAndDate(UUID uuid, DateTime dateTime, InternalCallContext internalCallContext) {
        return this.dao.getNotificationForAccountAndDate(uuid.toString(), dateTime.toDate(), internalCallContext);
    }

    @Override // com.ning.billing.util.notificationq.NotificationQueue
    public void removeNotification(UUID uuid, InternalCallContext internalCallContext) {
        this.dao.removeNotification(uuid.toString(), internalCallContext);
    }

    private InternalCallContext createCallContext(@Nullable Long l, @Nullable Long l2) {
        return this.internalCallContextFactory.createInternalCallContext(l, l2, "NotificationQueue", CallOrigin.INTERNAL, UserType.SYSTEM, (UUID) null);
    }
}
