package com.ning.billing.util.bus;

import com.google.common.eventbus.EventBus;
import com.google.inject.Inject;
import com.ning.billing.util.Hostname;
import com.ning.billing.util.bus.Bus;
import com.ning.billing.util.bus.dao.BusEventEntry;
import com.ning.billing.util.bus.dao.PersistentBusSqlDao;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.queue.PersistentQueueBase;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Transaction;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.sqlobject.mixins.Transmogrifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ning/billing/util/bus/PersistentBus.class */
public class PersistentBus extends PersistentQueueBase implements Bus {
    private static final long DELTA_IN_PROCESSING_TIME_MS = 300000;
    private static final int MAX_BUS_EVENTS = 1;
    private static final Logger log = LoggerFactory.getLogger(PersistentBus.class);
    private final PersistentBusSqlDao dao;
    private final EventBusDelegate eventBusDelegate;
    private final Clock clock;
    private final String hostname;

    /* loaded from: input_file:com/ning/billing/util/bus/PersistentBus$EventBusDelegate.class */
    private static final class EventBusDelegate extends EventBus {
        public EventBusDelegate(String str) {
            super(str);
        }
    }

    @Inject
    public PersistentBus(IDBI idbi, Clock clock, PersistentBusConfig persistentBusConfig) {
        super("Bus", Executors.newFixedThreadPool(persistentBusConfig.getNbThreads(), new ThreadFactory() { // from class: com.ning.billing.util.bus.PersistentBus.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(new ThreadGroup(DefaultBusService.EVENT_BUS_GROUP_NAME), runnable, DefaultBusService.EVENT_BUS_TH_NAME);
            }
        }), persistentBusConfig.getNbThreads(), persistentBusConfig);
        this.dao = (PersistentBusSqlDao) idbi.onDemand(PersistentBusSqlDao.class);
        this.clock = clock;
        this.eventBusDelegate = new EventBusDelegate("Killbill EventBus");
        this.hostname = Hostname.get();
    }

    public void start() {
        startQueue();
    }

    public void stop() {
        stopQueue();
    }

    @Override // com.ning.billing.util.queue.PersistentQueueBase
    public int doProcessEvents() {
        List<BusEventEntry> nextBusEvent = getNextBusEvent();
        if (nextBusEvent.size() == 0) {
            return 0;
        }
        int i = 0;
        for (BusEventEntry busEventEntry : nextBusEvent) {
            BusEvent busEvent = (BusEvent) deserializeEvent(busEventEntry.getBusEventClass(), busEventEntry.getBusEventJson());
            i += MAX_BUS_EVENTS;
            this.eventBusDelegate.post(busEvent);
            this.dao.clearBusEvent(Long.valueOf(busEventEntry.getId()), this.hostname);
        }
        return i;
    }

    private List<BusEventEntry> getNextBusEvent() {
        Date date = this.clock.getUTCNow().toDate();
        Date date2 = this.clock.getUTCNow().plus(DELTA_IN_PROCESSING_TIME_MS).toDate();
        BusEventEntry nextBusEventEntry = this.dao.getNextBusEventEntry(MAX_BUS_EVENTS, this.hostname, date);
        if (nextBusEventEntry == null) {
            return Collections.emptyList();
        }
        if (!(this.dao.claimBusEvent(this.hostname, date2, Long.valueOf(nextBusEventEntry.getId()), date) == MAX_BUS_EVENTS)) {
            return Collections.emptyList();
        }
        this.dao.insertClaimedHistory(this.hostname, date, nextBusEventEntry.getId());
        return Collections.singletonList(nextBusEventEntry);
    }

    public void register(Object obj) throws Bus.EventBusException {
        this.eventBusDelegate.register(obj);
    }

    public void unregister(Object obj) throws Bus.EventBusException {
        this.eventBusDelegate.unregister(obj);
    }

    public void post(final BusEvent busEvent) throws Bus.EventBusException {
        this.dao.inTransaction(new Transaction<Void, PersistentBusSqlDao>() { // from class: com.ning.billing.util.bus.PersistentBus.2
            public Void inTransaction(PersistentBusSqlDao persistentBusSqlDao, TransactionStatus transactionStatus) throws Exception {
                PersistentBus.this.postFromTransaction(busEvent, persistentBusSqlDao);
                return null;
            }
        });
    }

    public void postFromTransaction(BusEvent busEvent, Transmogrifier transmogrifier) throws Bus.EventBusException {
        postFromTransaction(busEvent, (PersistentBusSqlDao) transmogrifier.become(PersistentBusSqlDao.class));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void postFromTransaction(BusEvent busEvent, PersistentBusSqlDao persistentBusSqlDao) {
        try {
            persistentBusSqlDao.insertBusEvent(new BusEventEntry(this.hostname, busEvent.getClass().getName(), this.objectMapper.writeValueAsString(busEvent)));
        } catch (Exception e) {
            log.error("Failed to post BusEvent " + busEvent, e);
        }
    }
}
