package com.ning.billing.util.notificationq;

import com.ning.billing.util.Hostname;
import com.ning.billing.util.clock.Clock;
import com.ning.billing.util.notificationq.NotificationQueueService;
import java.lang.Thread;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ning/billing/util/notificationq/NotificationQueueBase.class */
public abstract class NotificationQueueBase implements NotificationQueue {
    private static final long MAX_NOTIFICATION_THREAD_WAIT_MS = 10000;
    private static final long NOTIFICATION_THREAD_WAIT_INCREMENT_MS = 1000;
    private static final long NANO_TO_MS = 1000000;
    protected static final String NOTIFICATION_THREAD_PREFIX = "Notification-";
    protected final String svcName;
    protected final String queueName;
    protected final NotificationQueueService.NotificationQueueHandler handler;
    protected final NotificationConfig config;
    protected final Executor executor;
    protected final Clock clock;
    protected AtomicLong nbProcessedEvents;
    protected boolean isProcessingEvents;
    protected static final Logger log = LoggerFactory.getLogger(NotificationQueueBase.class);
    protected static final AtomicInteger sequenceId = new AtomicInteger();
    protected final long STOP_WAIT_TIMEOUT_MS = 60000;
    private boolean startedComplete = false;
    private boolean stoppedComplete = false;
    protected final String hostname = Hostname.get();

    /* JADX INFO: Access modifiers changed from: package-private */
    public NotificationQueueBase(Clock clock, final String str, final String str2, NotificationQueueService.NotificationQueueHandler notificationQueueHandler, NotificationConfig notificationConfig) {
        this.clock = clock;
        this.svcName = str;
        this.queueName = str2;
        this.handler = notificationQueueHandler;
        this.config = notificationConfig;
        this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() { // from class: com.ning.billing.util.notificationq.NotificationQueueBase.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable);
                thread.setName(NotificationQueueBase.NOTIFICATION_THREAD_PREFIX + str + "-" + str2);
                thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: com.ning.billing.util.notificationq.NotificationQueueBase.1.1
                    @Override // java.lang.Thread.UncaughtExceptionHandler
                    public void uncaughtException(Thread thread2, Throwable th) {
                        NotificationQueueBase.log.error("Uncaught exception for thread " + thread2.getName(), th);
                    }
                });
                return thread;
            }
        });
    }

    @Override // com.ning.billing.util.notificationq.NotificationQueue
    public int processReadyNotification() {
        return doProcessEvents(sequenceId.incrementAndGet());
    }

    @Override // com.ning.billing.util.notificationq.NotificationQueue
    public void stopQueue() {
        if (this.config.isNotificationProcessingOff()) {
            completedQueueStop();
            return;
        }
        synchronized (this) {
            this.isProcessingEvents = false;
            try {
                log.info("NotificationQueue requested to stop");
                wait(60000L);
                log.info("NotificationQueue requested should have exited");
            } catch (InterruptedException e) {
                log.warn("NotificationQueue got interrupted exception when stopping notifications", e);
            }
        }
        waitForNotificationStopCompletion();
    }

    @Override // com.ning.billing.util.notificationq.NotificationQueue
    public void startQueue() {
        this.isProcessingEvents = true;
        this.nbProcessedEvents = new AtomicLong();
        if (this.config.isNotificationProcessingOff()) {
            log.warn(String.format("KILLBILL NOTIFICATION PROCESSING FOR SVC %s IS OFF !!!", getFullQName()));
            completedQueueStart();
        } else {
            this.executor.execute(new Runnable() { // from class: com.ning.billing.util.notificationq.NotificationQueueBase.2
                @Override // java.lang.Runnable
                public void run() {
                    NotificationQueueBase.log.info(String.format("NotificationQueue thread %s [%d] started", Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())));
                    NotificationQueueBase.this.completedQueueStart();
                    while (true) {
                        try {
                            try {
                                try {
                                    synchronized (this) {
                                        if (!NotificationQueueBase.this.isProcessingEvents) {
                                            NotificationQueueBase.log.info(String.format("NotificationQueue has been requested to stop, thread  %s  [%d] stopping...", Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())));
                                            this.notify();
                                            NotificationQueueBase.this.completedQueueStop();
                                            NotificationQueueBase.log.info(String.format("NotificationQueue thread  %s  [%d] exited...", Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())));
                                            return;
                                        }
                                    }
                                    try {
                                        NotificationQueueBase.this.doProcessEvents(NotificationQueueBase.sequenceId.getAndIncrement());
                                    } catch (Exception e) {
                                        NotificationQueueBase.log.error(String.format("NotificationQueue thread  %s  [%d] got an exception..", Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())), e);
                                    }
                                    sleepALittle();
                                } catch (Throwable th) {
                                    NotificationQueueBase.log.error(Thread.currentThread().getName() + " got an exception exiting...", th);
                                    th.printStackTrace();
                                    NotificationQueueBase.this.completedQueueStop();
                                    NotificationQueueBase.log.info(String.format("NotificationQueue thread  %s  [%d] exited...", Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())));
                                    return;
                                }
                            } catch (InterruptedException e2) {
                                NotificationQueueBase.log.warn(Thread.currentThread().getName() + " got interrupted ", e2);
                                NotificationQueueBase.this.completedQueueStop();
                                NotificationQueueBase.log.info(String.format("NotificationQueue thread  %s  [%d] exited...", Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())));
                                return;
                            }
                        } catch (Throwable th2) {
                            NotificationQueueBase.this.completedQueueStop();
                            NotificationQueueBase.log.info(String.format("NotificationQueue thread  %s  [%d] exited...", Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId())));
                            throw th2;
                        }
                    }
                }

                private void sleepALittle() throws InterruptedException {
                    Thread.sleep(NotificationQueueBase.this.config.getNotificationSleepTimeMs());
                }
            });
            waitForNotificationStartCompletion();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void completedQueueStop() {
        synchronized (this) {
            this.stoppedComplete = true;
            notifyAll();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void completedQueueStart() {
        synchronized (this) {
            this.startedComplete = true;
            notifyAll();
        }
    }

    private void waitForNotificationStartCompletion() {
        waitForNotificationEventCompletion(true);
    }

    private void waitForNotificationStopCompletion() {
        waitForNotificationEventCompletion(false);
    }

    /* JADX WARN: Code restructure failed: missing block: B:37:0x0074, code lost:
    
        if (r8.stoppedComplete == false) goto L32;
     */
    /* JADX WARN: Removed duplicated region for block: B:11:0x0041 A[Catch: all -> 0x00c3, TryCatch #0 {, blocks: (B:56:0x000d, B:18:0x0066, B:21:0x009c, B:24:0x00ac, B:25:0x00bf, B:30:0x0077, B:33:0x0087, B:34:0x009b, B:36:0x0070, B:8:0x0021, B:11:0x0041, B:13:0x0052, B:41:0x004b, B:46:0x002d, B:47:0x003c, B:5:0x0017), top: B:55:0x000d, inners: #1 }] */
    /* JADX WARN: Removed duplicated region for block: B:41:0x004b A[Catch: all -> 0x00c3, TryCatch #0 {, blocks: (B:56:0x000d, B:18:0x0066, B:21:0x009c, B:24:0x00ac, B:25:0x00bf, B:30:0x0077, B:33:0x0087, B:34:0x009b, B:36:0x0070, B:8:0x0021, B:11:0x0041, B:13:0x0052, B:41:0x004b, B:46:0x002d, B:47:0x003c, B:5:0x0017), top: B:55:0x000d, inners: #1 }] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void waitForNotificationEventCompletion(boolean r9) {
        /*
            r8 = this;
            long r0 = java.lang.System.nanoTime()
            r10 = r0
            r0 = r8
            r1 = r0
            r12 = r1
            monitor-enter(r0)
        L9:
            r0 = r9
            if (r0 == 0) goto L17
            r0 = r8
            boolean r0 = r0.startedComplete     // Catch: java.lang.Throwable -> Lc3
            if (r0 == 0) goto L21
            goto L62
        L17:
            r0 = r8
            boolean r0 = r0.stoppedComplete     // Catch: java.lang.Throwable -> Lc3
            if (r0 == 0) goto L21
            goto L62
        L21:
            r0 = r8
            r1 = 1000(0x3e8, double:4.94E-321)
            r0.wait(r1)     // Catch: java.lang.InterruptedException -> L2b java.lang.Throwable -> Lc3
            goto L3d
        L2b:
            r13 = move-exception
            java.lang.Thread r0 = java.lang.Thread.currentThread()     // Catch: java.lang.Throwable -> Lc3
            r0.interrupt()     // Catch: java.lang.Throwable -> Lc3
            com.ning.billing.util.notificationq.NotificationError r0 = new com.ning.billing.util.notificationq.NotificationError     // Catch: java.lang.Throwable -> Lc3
            r1 = r0
            r2 = r13
            r1.<init>(r2)     // Catch: java.lang.Throwable -> Lc3
            throw r0     // Catch: java.lang.Throwable -> Lc3
        L3d:
            r0 = r9
            if (r0 == 0) goto L4b
            r0 = r8
            boolean r0 = r0.startedComplete     // Catch: java.lang.Throwable -> Lc3
            if (r0 == 0) goto L52
            goto L62
        L4b:
            r0 = r8
            boolean r0 = r0.stoppedComplete     // Catch: java.lang.Throwable -> Lc3
            if (r0 != 0) goto L62
        L52:
            long r0 = java.lang.System.nanoTime()     // Catch: java.lang.Throwable -> Lc3
            r1 = r10
            long r0 = r0 - r1
            r1 = 1000000(0xf4240, double:4.940656E-318)
            long r0 = r0 / r1
            r1 = 10000(0x2710, double:4.9407E-320)
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 < 0) goto L9
        L62:
            r0 = r9
            if (r0 == 0) goto L70
            r0 = r8
            boolean r0 = r0.startedComplete     // Catch: java.lang.Throwable -> Lc3
            if (r0 == 0) goto L77
            goto L9c
        L70:
            r0 = r8
            boolean r0 = r0.stoppedComplete     // Catch: java.lang.Throwable -> Lc3
            if (r0 != 0) goto L9c
        L77:
            org.slf4j.Logger r0 = com.ning.billing.util.notificationq.NotificationQueueBase.log     // Catch: java.lang.Throwable -> Lc3
            java.lang.String r1 = "Could not {} notification thread in {} msec !!!"
            r2 = r9
            if (r2 == 0) goto L85
            java.lang.String r2 = "start"
            goto L87
        L85:
            java.lang.String r2 = "stop"
        L87:
            r3 = 10000(0x2710, double:4.9407E-320)
            java.lang.Long r3 = java.lang.Long.valueOf(r3)     // Catch: java.lang.Throwable -> Lc3
            r0.error(r1, r2, r3)     // Catch: java.lang.Throwable -> Lc3
            com.ning.billing.util.notificationq.NotificationError r0 = new com.ning.billing.util.notificationq.NotificationError     // Catch: java.lang.Throwable -> Lc3
            r1 = r0
            java.lang.String r2 = "Failed to start service!!"
            r1.<init>(r2)     // Catch: java.lang.Throwable -> Lc3
            throw r0     // Catch: java.lang.Throwable -> Lc3
        L9c:
            org.slf4j.Logger r0 = com.ning.billing.util.notificationq.NotificationQueueBase.log     // Catch: java.lang.Throwable -> Lc3
            java.lang.String r1 = "Notification thread has been {} in {} ms"
            r2 = r9
            if (r2 == 0) goto Laa
            java.lang.String r2 = "started"
            goto Lac
        Laa:
            java.lang.String r2 = "stopped"
        Lac:
            long r3 = java.lang.System.nanoTime()     // Catch: java.lang.Throwable -> Lc3
            r4 = r10
            long r3 = r3 - r4
            r4 = 1000000(0xf4240, double:4.940656E-318)
            long r3 = r3 / r4
            java.lang.Long r3 = java.lang.Long.valueOf(r3)     // Catch: java.lang.Throwable -> Lc3
            r0.info(r1, r2, r3)     // Catch: java.lang.Throwable -> Lc3
            r0 = r12
            monitor-exit(r0)     // Catch: java.lang.Throwable -> Lc3
            goto Lcb
        Lc3:
            r14 = move-exception
            r0 = r12
            monitor-exit(r0)     // Catch: java.lang.Throwable -> Lc3
            r0 = r14
            throw r0
        Lcb:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: com.ning.billing.util.notificationq.NotificationQueueBase.waitForNotificationEventCompletion(boolean):void");
    }

    @Override // com.ning.billing.util.notificationq.NotificationQueue
    public String getFullQName() {
        return this.svcName + ":" + this.queueName;
    }

    protected abstract int doProcessEvents(int i);
}
