package com.addc.commons.queue14;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/addc/commons/queue14/PersistingQueueReader.class */
public class PersistingQueueReader implements Runnable {
    private static final Logger LOGGER;
    private final PersistingQueue queue;
    private final PayloadDispatcher dispatcher;
    private final String threadName;
    private Thread runnerThread;
    static Class class$com$addc$commons$queue14$PersistingQueueReader;
    private Object delayMutex = new Object();
    private PersistingQueueReaderState dispatcherState = new PersistingQueueReaderState();
    private final List listeners = new LinkedList();
    private final ReaderDelayGenerator delayGenerator = new ReaderDelayGenerator(this);

    public PersistingQueueReader(PersistingQueue persistingQueue, String str, PayloadDispatcher payloadDispatcher) {
        this.queue = persistingQueue;
        this.threadName = str;
        this.dispatcher = payloadDispatcher;
    }

    public void addListener(PersistingQueueReaderListener persistingQueueReaderListener) {
        synchronized (this.listeners) {
            this.listeners.add(persistingQueueReaderListener);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        LOGGER.log(Level.FINE, "Thread starts...");
        do {
            if (this.dispatcherState.connectionLost && !isShutdown()) {
                delay();
            }
            if (this.dispatcherState.currentPayload == null) {
                this.dispatcherState.currentPayload = this.queue.take();
            }
            if (this.dispatcherState.currentPayload != null) {
                processCurrentPayload();
            }
        } while (!isShutdown());
        LOGGER.log(Level.FINE, "Thread ends...");
    }

    public void shutdown() {
        LOGGER.log(Level.FINE, "Stop the dispatcher thread...");
        setShutdown(true);
        breakDelay();
        this.queue.interruptTake();
        try {
            this.runnerThread.join();
        } catch (InterruptedException e) {
            LOGGER.log(Level.FINE, "Interrupted", (Throwable) e);
        }
        LOGGER.log(Level.FINE, "Recover any outstanding batches and send them...");
        if (this.dispatcherState.currentPayload == null) {
            this.dispatcherState.currentPayload = this.queue.poll();
        }
        if (this.dispatcherState.currentPayload != null) {
            sendPendingPayloads();
        }
        LOGGER.log(Level.FINE, new StringBuffer().append("Dispatcher terminated, returning ").append(this.dispatcherState.currentPayload).toString());
        this.queue.shutdown(this.dispatcherState.currentPayload);
    }

    public void start() {
        this.runnerThread = new Thread(this, this.threadName);
        this.runnerThread.start();
    }

    public boolean isShutdown() {
        boolean z;
        synchronized (this.dispatcherState.shutdownLock) {
            z = this.dispatcherState.shutdown;
        }
        return z;
    }

    private void delay() {
        synchronized (this.delayMutex) {
            try {
                this.delayMutex.wait(this.delayGenerator.getDelay());
            } catch (InterruptedException e) {
                LOGGER.log(Level.FINE, "Interrupted", (Throwable) e);
            }
        }
    }

    private void processCurrentPayload() {
        try {
            this.dispatcher.dispatch(this.dispatcherState.currentPayload);
            notifyForward(null);
            this.dispatcherState.currentPayload = null;
        } catch (DispatcherException e) {
            if (!e.isRecoverable()) {
                this.dispatcherState.connectionLost = true;
                notifyDispatcherError(e);
                setShutdown(true);
            } else {
                this.dispatcherState.connectionLost = true;
                notifyForward(e);
                if (e.getRetryDelay() != null) {
                    delay(e.getRetryDelay().longValue());
                }
            }
        } catch (Exception e2) {
            setShutdown(true);
        }
    }

    private void delay(long j) {
        synchronized (this.delayMutex) {
            try {
                this.delayMutex.wait(j);
            } catch (InterruptedException e) {
                LOGGER.log(Level.FINE, "Interrupted", (Throwable) e);
            }
        }
    }

    private void sendPendingPayloads() {
        boolean z = true;
        do {
            try {
                this.dispatcher.dispatch(this.dispatcherState.currentPayload);
                this.dispatcherState.currentPayload = null;
            } catch (Exception e) {
                this.dispatcherState.connectionLost = true;
                notifyForward(e);
                z = false;
            }
            if (this.dispatcherState.currentPayload == null) {
                this.dispatcherState.currentPayload = this.queue.poll();
            }
            if (this.dispatcherState.currentPayload == null) {
                return;
            }
        } while (z);
    }

    private void notifyForward(Exception exc) {
        synchronized (this.listeners) {
            Iterator it = this.listeners.iterator();
            while (it.hasNext()) {
                ((PersistingQueueReaderListener) it.next()).onProcess(this.dispatcherState.connectionLost, exc);
            }
        }
    }

    private void notifyDispatcherError(DispatcherException dispatcherException) {
        synchronized (this.listeners) {
            Iterator it = this.listeners.iterator();
            while (it.hasNext()) {
                ((PersistingQueueReaderListener) it.next()).onDispatcherError(dispatcherException);
            }
        }
    }

    private void setShutdown(boolean z) {
        synchronized (this.dispatcherState.shutdownLock) {
            this.dispatcherState.shutdown = z;
        }
    }

    private void breakDelay() {
        synchronized (this.delayMutex) {
            this.delayMutex.notifyAll();
        }
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$com$addc$commons$queue14$PersistingQueueReader == null) {
            cls = class$("com.addc.commons.queue14.PersistingQueueReader");
            class$com$addc$commons$queue14$PersistingQueueReader = cls;
        } else {
            cls = class$com$addc$commons$queue14$PersistingQueueReader;
        }
        LOGGER = Logger.getLogger(cls.getName());
    }
}
