/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.queue;

import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ChronicleQueueTestBase;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.threads.NamedThreadFactory;
import org.junit.Assert;
import org.junit.Test;

public class ThreadedQueueTest
extends ChronicleQueueTestBase {
    public static final int REQUIRED_COUNT = 10;

    @Test(timeout=10000L)
    public void testMultipleThreads() throws InterruptedException, ExecutionException, TimeoutException {
        File path = this.getTmpDir();
        AtomicInteger counter = new AtomicInteger();
        ExecutorService tailerES = Executors.newSingleThreadExecutor((ThreadFactory)new NamedThreadFactory("tailer"));
        Future<?> tf = tailerES.submit(() -> {
            try (SingleChronicleQueue rqueue = ChronicleQueue.singleBuilder((File)path).testBlockSize().build();){
                ExcerptTailer tailer = rqueue.createTailer();
                Bytes bytes = Bytes.elasticByteBuffer();
                while (counter.get() < 10 && !Thread.interrupted()) {
                    bytes.clear();
                    if (!tailer.readBytes(bytes)) continue;
                    counter.incrementAndGet();
                }
                bytes.releaseLast();
            }
            catch (Throwable t) {
                t.printStackTrace();
            }
        });
        ExecutorService appenderES = Executors.newSingleThreadExecutor((ThreadFactory)new NamedThreadFactory("appender"));
        Future<?> af = appenderES.submit(() -> {
            try (SingleChronicleQueue wqueue = ChronicleQueue.singleBuilder((File)path).testBlockSize().build();){
                ExcerptAppender appender = wqueue.acquireAppender();
                Bytes message = Bytes.elasticByteBuffer();
                for (int i = 0; i < 10; ++i) {
                    message.clear();
                    message.append(i);
                    appender.writeBytes(message);
                }
                message.releaseLast();
            }
            catch (Throwable t) {
                t.printStackTrace();
            }
        });
        appenderES.shutdown();
        tailerES.shutdown();
        long end = System.currentTimeMillis() + 9000L;
        af.get(9000L, TimeUnit.MILLISECONDS);
        tf.get(end - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        Assert.assertEquals((long)10L, (long)counter.get());
    }

    @Test
    public void testTailerReadingEmptyQueue() {
        File path = this.getTmpDir();
        try (SingleChronicleQueue rqueue = SingleChronicleQueueBuilder.fieldlessBinary((File)path).testBlockSize().rollCycle((RollCycle)RollCycles.TEST_DAILY).build();){
            ExcerptTailer tailer = rqueue.createTailer();
            try (SingleChronicleQueue wqueue = SingleChronicleQueueBuilder.fieldlessBinary((File)path).testBlockSize().rollCycle((RollCycle)RollCycles.TEST_DAILY).build();){
                Bytes bytes = Bytes.elasticByteBuffer();
                Assert.assertFalse((boolean)tailer.readBytes(bytes));
                ExcerptAppender appender = wqueue.acquireAppender();
                appender.writeBytes(Bytes.wrapForRead((byte[])"Hello World".getBytes(StandardCharsets.ISO_8859_1)));
                bytes.clear();
                boolean condition = tailer.readBytes(bytes);
                if (!condition) {
                    Jvm.pause((long)1L);
                    condition = tailer.readBytes(bytes);
                }
                Assert.assertTrue((boolean)condition);
                Assert.assertEquals((Object)"Hello World", (Object)bytes.toString());
                bytes.releaseLast();
            }
        }
    }
}

