/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.concurrent;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.neo4j.concurrent.AsyncEvent;
import org.neo4j.concurrent.AsyncEvents;

public class AsyncEventsTest {
    private ExecutorService executor;

    @Before
    public void setUp() {
        this.executor = Executors.newCachedThreadPool();
    }

    @After
    public void tearDown() {
        this.executor.shutdown();
    }

    @Test
    public void eventsMustBeProcessedByBackgroundThread() throws Exception {
        EventConsumer consumer = new EventConsumer();
        AsyncEvents asyncEvents = new AsyncEvents((Consumer)consumer, AsyncEvents.Monitor.NONE);
        this.executor.submit((Runnable)asyncEvents);
        Event firstSentEvent = new Event();
        asyncEvents.send((AsyncEvent)firstSentEvent);
        Event firstProcessedEvent = consumer.poll(10L, TimeUnit.SECONDS);
        Event secondSentEvent = new Event();
        asyncEvents.send((AsyncEvent)secondSentEvent);
        Event secondProcessedEvent = consumer.poll(10L, TimeUnit.SECONDS);
        asyncEvents.shutdown();
        Assert.assertThat((Object)((Object)firstProcessedEvent), (Matcher)Matchers.is((Object)((Object)firstSentEvent)));
        Assert.assertThat((Object)((Object)secondProcessedEvent), (Matcher)Matchers.is((Object)((Object)secondSentEvent)));
    }

    @Test
    public void mustNotProcessEventInSameThreadWhenNotShutDown() throws Exception {
        EventConsumer consumer = new EventConsumer();
        AsyncEvents asyncEvents = new AsyncEvents((Consumer)consumer, AsyncEvents.Monitor.NONE);
        this.executor.submit((Runnable)asyncEvents);
        asyncEvents.send((AsyncEvent)new Event());
        Thread processingThread = consumer.poll((long)10L, (TimeUnit)TimeUnit.SECONDS).processedBy;
        asyncEvents.shutdown();
        Assert.assertThat((Object)processingThread, (Matcher)Matchers.is((Matcher)Matchers.not((Object)Thread.currentThread())));
    }

    @Test(timeout=10000L)
    public void mustProcessEventsDirectlyWhenShutDown() throws Exception {
        Thread threadForSubsequentEvents;
        EventConsumer consumer = new EventConsumer();
        AsyncEvents asyncEvents = new AsyncEvents((Consumer)consumer, AsyncEvents.Monitor.NONE);
        this.executor.submit((Runnable)asyncEvents);
        asyncEvents.send((AsyncEvent)new Event());
        Thread threadForFirstEvent = consumer.poll((long)10L, (TimeUnit)TimeUnit.SECONDS).processedBy;
        asyncEvents.shutdown();
        Assert.assertThat((Object)threadForFirstEvent, (Matcher)Matchers.is((Matcher)Matchers.not((Object)Thread.currentThread())));
        do {
            asyncEvents.send((AsyncEvent)new Event());
        } while ((threadForSubsequentEvents = consumer.poll((long)10L, (TimeUnit)TimeUnit.SECONDS).processedBy) != Thread.currentThread());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=10000L)
    public void concurrentlyPublishedEventsMustAllBeProcessed() throws Exception {
        EventConsumer consumer = new EventConsumer();
        CountDownLatch startLatch = new CountDownLatch(1);
        int threads = 10;
        int iterations = 2000;
        AsyncEvents asyncEvents = new AsyncEvents((Consumer)consumer, AsyncEvents.Monitor.NONE);
        this.executor.submit((Runnable)asyncEvents);
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        Runnable runner = () -> {
            try {
                startLatch.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            for (int i = 0; i < 2000; ++i) {
                asyncEvents.send((AsyncEvent)new Event());
            }
        };
        for (int i = 0; i < 10; ++i) {
            threadPool.submit(runner);
        }
        startLatch.countDown();
        Thread thisThread = Thread.currentThread();
        int eventCount = 20000;
        try {
            for (int i = 0; i < eventCount; ++i) {
                Event event = consumer.poll(1L, TimeUnit.SECONDS);
                if (event == null) {
                    --i;
                    continue;
                }
                Assert.assertThat((Object)event.processedBy, (Matcher)Matchers.is((Matcher)Matchers.not((Object)thisThread)));
            }
        }
        finally {
            asyncEvents.shutdown();
        }
    }

    @Test
    public void awaitingShutdownMustBlockUntilAllMessagesHaveBeenProcessed() throws Exception {
        Event specialShutdownObservedEvent = new Event();
        CountDownLatch awaitStartLatch = new CountDownLatch(1);
        EventConsumer consumer = new EventConsumer();
        AsyncEvents asyncEvents = new AsyncEvents((Consumer)consumer, AsyncEvents.Monitor.NONE);
        this.executor.submit((Runnable)asyncEvents);
        do {
            asyncEvents.send((AsyncEvent)new Event());
        } while (consumer.eventsProcessed.take().processedBy == Thread.currentThread());
        Future<?> awaitShutdownFuture = this.executor.submit(() -> {
            awaitStartLatch.countDown();
            try {
                asyncEvents.awaitTermination();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            consumer.eventsProcessed.offer(specialShutdownObservedEvent);
        });
        awaitStartLatch.await();
        asyncEvents.send((AsyncEvent)new Event());
        asyncEvents.send((AsyncEvent)new Event());
        asyncEvents.send((AsyncEvent)new Event());
        asyncEvents.send((AsyncEvent)new Event());
        asyncEvents.send((AsyncEvent)new Event());
        Assert.assertThat((Object)((Object)consumer.eventsProcessed.take()), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
        Assert.assertThat((Object)((Object)consumer.eventsProcessed.take()), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
        Assert.assertThat((Object)((Object)consumer.eventsProcessed.take()), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
        Assert.assertThat((Object)((Object)consumer.eventsProcessed.take()), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
        Assert.assertThat((Object)((Object)consumer.eventsProcessed.take()), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
        Assert.assertThat((Object)((Object)consumer.eventsProcessed.poll(20L, TimeUnit.MILLISECONDS)), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
        asyncEvents.shutdown();
        awaitShutdownFuture.get();
        Assert.assertThat((Object)((Object)consumer.eventsProcessed.take()), (Matcher)Matchers.sameInstance((Object)((Object)specialShutdownObservedEvent)));
    }

    class EventConsumer
    implements Consumer<Event> {
        final BlockingQueue<Event> eventsProcessed = new LinkedBlockingQueue<Event>();

        EventConsumer() {
        }

        @Override
        public void accept(Event event) {
            event.processedBy = Thread.currentThread();
            this.eventsProcessed.offer(event);
        }

        public Event poll(long timeout, TimeUnit unit) throws InterruptedException {
            return this.eventsProcessed.poll(timeout, unit);
        }
    }

    class Event
    extends AsyncEvent {
        Thread processedBy;

        Event() {
        }
    }
}

