/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.driver.core;

import com.datastax.driver.core.ExceptionCatchingRunnable;
import com.datastax.driver.core.GuavaCompatibility;
import com.datastax.driver.core.utils.MoreFutures;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class EventDebouncer<T> {
    private static final Logger logger = LoggerFactory.getLogger(EventDebouncer.class);
    private static final int DEFAULT_MAX_QUEUED_EVENTS = 10000;
    private final String name;
    private final AtomicReference<DeliveryAttempt> immediateDelivery = new AtomicReference<Object>(null);
    private final AtomicReference<DeliveryAttempt> delayedDelivery = new AtomicReference<Object>(null);
    private final ScheduledExecutorService executor;
    private final DeliveryCallback<T> callback;
    private final int maxQueuedEvents;
    private final Queue<Entry<T>> events;
    private final AtomicInteger eventCount;
    private volatile State state;
    private static final long OVERFLOW_WARNING_INTERVAL = TimeUnit.NANOSECONDS.convert(5L, TimeUnit.SECONDS);
    private volatile long lastOverflowWarning = Long.MIN_VALUE;

    EventDebouncer(String name, ScheduledExecutorService executor, DeliveryCallback<T> callback) {
        this(name, executor, callback, 10000);
    }

    EventDebouncer(String name, ScheduledExecutorService executor, DeliveryCallback<T> callback, int maxQueuedEvents) {
        this.name = name;
        this.executor = executor;
        this.callback = callback;
        this.maxQueuedEvents = maxQueuedEvents;
        this.events = new ConcurrentLinkedQueue<Entry<T>>();
        this.eventCount = new AtomicInteger();
        this.state = State.NEW;
    }

    abstract int maxPendingEvents();

    abstract long delayMs();

    void start() {
        logger.trace("Starting {} debouncer...", (Object)this.name);
        this.state = State.RUNNING;
        if (!this.events.isEmpty()) {
            logger.trace("{} debouncer: {} events were accumulated before the debouncer started: delivering now", (Object)this.name, (Object)this.eventCount.get());
            this.scheduleImmediateDelivery();
        }
    }

    void stop() {
        DeliveryAttempt previous;
        logger.trace("Stopping {} debouncer...", (Object)this.name);
        this.state = State.STOPPED;
        while (!this.delayedDelivery.compareAndSet(previous = this.cancelDelayedDelivery(), null)) {
        }
        while (!this.immediateDelivery.compareAndSet(previous = this.cancelImmediateDelivery(), null)) {
        }
        this.completeAllPendingFutures();
        logger.trace("{} debouncer stopped", (Object)this.name);
    }

    private void completeAllPendingFutures() {
        Entry<T> entry;
        while ((entry = this.events.poll()) != null) {
            entry.future.set(null);
        }
    }

    ListenableFuture<Void> eventReceived(T event) {
        if (this.state == State.STOPPED) {
            logger.trace("{} debouncer is stopped, rejecting event: {}", (Object)this.name, (Object)event);
            return MoreFutures.VOID_SUCCESS;
        }
        Preconditions.checkNotNull(event);
        logger.trace("{} debouncer: event received {}", (Object)this.name, (Object)event);
        if (this.eventCount.incrementAndGet() > this.maxQueuedEvents) {
            long now = System.nanoTime();
            if (now > this.lastOverflowWarning + OVERFLOW_WARNING_INTERVAL) {
                this.lastOverflowWarning = now;
                logger.warn("{} debouncer enqueued more than {} events, rejecting new events. This should not happen and is likely a sign that something is wrong.", (Object)this.name, (Object)this.maxQueuedEvents);
            }
            this.eventCount.decrementAndGet();
            return MoreFutures.VOID_SUCCESS;
        }
        Entry<T> entry = new Entry<T>(event);
        try {
            this.events.add(entry);
        }
        catch (RuntimeException e) {
            this.eventCount.decrementAndGet();
            throw e;
        }
        if (this.state == State.RUNNING) {
            int maxPendingEvents;
            int count = this.eventCount.get();
            if (count < (maxPendingEvents = this.maxPendingEvents())) {
                this.scheduleDelayedDelivery();
            } else if (count == maxPendingEvents) {
                this.scheduleImmediateDelivery();
            }
        } else if (this.state == State.STOPPED) {
            entry.future.set(null);
        }
        return entry.future;
    }

    void scheduleImmediateDelivery() {
        this.cancelDelayedDelivery();
        while (this.state == State.RUNNING) {
            DeliveryAttempt current;
            DeliveryAttempt previous = this.immediateDelivery.get();
            if (previous != null) {
                previous.cancel();
            }
            if (!this.immediateDelivery.compareAndSet(previous, current = new DeliveryAttempt())) continue;
            current.executeNow();
            return;
        }
    }

    private void scheduleDelayedDelivery() {
        while (this.state == State.RUNNING) {
            DeliveryAttempt next;
            DeliveryAttempt previous = this.cancelDelayedDelivery();
            if (!this.delayedDelivery.compareAndSet(previous, next = new DeliveryAttempt())) continue;
            next.scheduleAfterDelay();
            break;
        }
    }

    private DeliveryAttempt cancelDelayedDelivery() {
        return this.cancelDelivery(this.delayedDelivery.get());
    }

    private DeliveryAttempt cancelImmediateDelivery() {
        return this.cancelDelivery(this.immediateDelivery.get());
    }

    private DeliveryAttempt cancelDelivery(DeliveryAttempt previous) {
        if (previous != null) {
            previous.cancel();
        }
        return previous;
    }

    private void deliverEvents() {
        Entry<T> entry;
        if (this.state == State.STOPPED) {
            this.completeAllPendingFutures();
            return;
        }
        ArrayList toDeliver = Lists.newArrayList();
        final ArrayList<SettableFuture<Void>> futures = Lists.newArrayList();
        int count = 0;
        while (++count <= this.maxQueuedEvents && (entry = this.events.poll()) != null) {
            toDeliver.add(entry.event);
            futures.add(entry.future);
        }
        this.eventCount.addAndGet(-toDeliver.size());
        if (toDeliver.isEmpty()) {
            logger.trace("{} debouncer: no events to deliver", (Object)this.name);
        } else {
            logger.trace("{} debouncer: delivering {} events", (Object)this.name, (Object)toDeliver.size());
            ListenableFuture<?> delivered = this.callback.deliver(toDeliver);
            GuavaCompatibility.INSTANCE.addCallback(delivered, new FutureCallback<Object>(){

                @Override
                public void onSuccess(Object result) {
                    for (SettableFuture future : futures) {
                        future.set(null);
                    }
                }

                @Override
                public void onFailure(Throwable t2) {
                    for (SettableFuture future : futures) {
                        future.setException(t2);
                    }
                }
            });
        }
        if (this.eventCount.get() > 0) {
            this.scheduleDelayedDelivery();
        }
    }

    static class Entry<T> {
        final T event;
        final SettableFuture<Void> future;

        Entry(T event) {
            this.event = event;
            this.future = SettableFuture.create();
        }
    }

    static interface DeliveryCallback<T> {
        public ListenableFuture<?> deliver(List<T> var1);
    }

    class DeliveryAttempt
    extends ExceptionCatchingRunnable {
        volatile Future<?> deliveryFuture;

        DeliveryAttempt() {
        }

        boolean isDone() {
            return this.deliveryFuture != null && this.deliveryFuture.isDone();
        }

        void cancel() {
            if (this.deliveryFuture != null) {
                this.deliveryFuture.cancel(true);
            }
        }

        void executeNow() {
            if (EventDebouncer.this.state != State.STOPPED) {
                this.deliveryFuture = EventDebouncer.this.executor.submit(this);
            }
        }

        void scheduleAfterDelay() {
            if (EventDebouncer.this.state != State.STOPPED) {
                this.deliveryFuture = EventDebouncer.this.executor.schedule(this, EventDebouncer.this.delayMs(), TimeUnit.MILLISECONDS);
            }
        }

        @Override
        public void runMayThrow() throws Exception {
            EventDebouncer.this.deliverEvents();
        }
    }

    private static enum State {
        NEW,
        RUNNING,
        STOPPED;

    }
}

