/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.queue;

import java.io.File;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import net.openhft.chronicle.queue.ChronicleQueueTestBase;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.wire.DocumentContext;
import org.junit.Assert;
import org.junit.Test;

public class CycleNotFoundTest
extends ChronicleQueueTestBase {
    private static final int NUMBER_OF_TAILERS = 10;
    private static final long INTERVAL_US = 25L;
    private static final long NUMBER_OF_MSG = 100000L;

    @Test(timeout=50000L)
    public void tailerCycleNotFoundTest() throws InterruptedException, ExecutionException {
        File path = this.getTmpDir();
        ExecutorService executorService = Executors.newFixedThreadPool(100000, (ThreadFactory)new NamedThreadFactory("tailerCycleNotFoundTest"));
        AtomicLong counter = new AtomicLong();
        Runnable reader = () -> {
            long count = 0L;
            try (SingleChronicleQueue rqueue = SingleChronicleQueueBuilder.fieldlessBinary((File)path).testBlockSize().rollCycle((RollCycle)RollCycles.TEST_SECONDLY).build();){
                Throwable throwable;
                DocumentContext dc;
                ExcerptTailer tailer = rqueue.createTailer();
                long last = -1L;
                while (count < 100000L) {
                    dc = tailer.readingDocument();
                    throwable = null;
                    try {
                        if (!dc.isPresent()) continue;
                        Assert.assertTrue((boolean)dc.isData());
                        long l = last + 1L;
                        last = dc.wire().read().int64();
                        Assert.assertEquals((long)l, (long)last);
                        ++count;
                        counter.incrementAndGet();
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (dc == null) continue;
                        if (throwable != null) {
                            try {
                                dc.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        dc.close();
                        continue;
                    }
                    if (!executorService.isShutdown()) continue;
                    Assert.fail();
                }
                dc = tailer.readingDocument();
                throwable = null;
                try {
                    Assert.assertFalse((boolean)dc.isPresent());
                }
                catch (Throwable throwable4) {
                    throwable = throwable4;
                    throw throwable4;
                }
                finally {
                    if (dc != null) {
                        if (throwable != null) {
                            try {
                                dc.close();
                            }
                            catch (Throwable throwable5) {
                                throwable.addSuppressed(throwable5);
                            }
                        } else {
                            dc.close();
                        }
                    }
                }
            }
            catch (Throwable throwable) {
                System.out.printf("Read %,d messages, thread=" + Thread.currentThread().getName() + "\n", count);
                throw throwable;
            }
            System.out.printf("Read %,d messages, thread=" + Thread.currentThread().getName() + "\n", count);
        };
        ArrayList tailers = new ArrayList();
        for (int i = 0; i < 10; ++i) {
            tailers.add(executorService.submit(reader));
        }
        ExecutorService executorService1 = Executors.newSingleThreadExecutor((ThreadFactory)new NamedThreadFactory("appender"));
        Future<?> submit = executorService1.submit(() -> {
            SingleChronicleQueue wqueue = SingleChronicleQueueBuilder.fieldlessBinary((File)path).testBlockSize().rollCycle((RollCycle)RollCycles.TEST_SECONDLY).build();
            ExcerptAppender appender = wqueue.acquireAppender();
            long next = System.nanoTime() + 25000L;
            int i = 0;
            while ((long)i < 100000L) {
                while (System.nanoTime() < next) {
                }
                try (DocumentContext dc = appender.writingDocument();){
                    dc.wire().write().int64((long)i);
                }
                next += 25000L;
                if (executorService1.isShutdown()) {
                    return;
                }
                ++i;
            }
            wqueue.close();
        });
        submit.get();
        for (Future future : tailers) {
            future.get();
        }
        executorService.shutdownNow();
        executorService1.shutdownNow();
        executorService.awaitTermination(5L, TimeUnit.SECONDS);
        executorService1.awaitTermination(5L, TimeUnit.SECONDS);
        Assert.assertEquals((long)1000000L, (long)counter.get());
    }
}

