/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.queue.internal.reader;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.queue.ChronicleQueueTestBase;
import net.openhft.chronicle.queue.DirectoryUtils;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.GcControls;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.queue.internal.reader.Say;
import net.openhft.chronicle.queue.reader.ChronicleReader;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.VanillaMethodWriterBuilder;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;

public class ChronicleReaderTest
extends ChronicleQueueTestBase {
    private static final byte[] ONE_KILOBYTE = new byte[1024];
    private final Queue<String> capturedOutput = new ConcurrentLinkedQueue<String>();
    private Path dataDir;

    private static long getCurrentQueueFileLength(Path dataDir) throws IOException {
        try (RandomAccessFile file = new RandomAccessFile(Files.list(dataDir).filter(p -> p.toString().endsWith("cq4")).findFirst().orElseThrow(AssertionError::new).toFile(), "r");){
            long l = file.length();
            return l;
        }
    }

    @Before
    public void before() {
        this.dataDir = this.getTmpDir().toPath();
        try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary((Path)this.dataDir).sourceId(1).testBlockSize().build();){
            ExcerptAppender excerptAppender = queue.acquireAppender();
            VanillaMethodWriterBuilder methodWriterBuilder = excerptAppender.methodWriterBuilder(Say.class);
            methodWriterBuilder.recordHistory(true);
            Say events = (Say)methodWriterBuilder.build();
            for (int i = 0; i < 24; ++i) {
                events.say(i % 2 == 0 ? "hello" : "goodbye");
            }
        }
    }

    @Test(timeout=10000L)
    public void shouldReadQueueWithNonDefaultRollCycle() {
        if (OS.isWindows()) {
            return;
        }
        this.expectException("Overriding roll length from existing metadata");
        this.expectException("Overriding roll cycle from");
        Path path = this.getTmpDir().toPath();
        path.toFile().mkdirs();
        try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary((Path)path).rollCycle((RollCycle)RollCycles.MINUTELY).testBlockSize().sourceId(1).build();){
            ExcerptAppender excerptAppender = queue.acquireAppender();
            VanillaMethodWriterBuilder methodWriterBuilder = excerptAppender.methodWriterBuilder(Say.class);
            methodWriterBuilder.recordHistory(true);
            Say events = (Say)methodWriterBuilder.build();
            for (int i = 0; i < 24; ++i) {
                events.say(i % 2 == 0 ? "hello" : "goodbye");
            }
        }
        new ChronicleReader().withBasePath(path).withMessageSink(this.capturedOutput::add).execute();
        Assert.assertFalse((boolean)this.capturedOutput.isEmpty());
    }

    @Test(timeout=10000L)
    public void shouldReadQueueWithNonDefaultRollCycleWhenMetadataDeleted() throws IOException {
        Assume.assumeFalse((String)"Read-only mode is not supported on Windows", (boolean)OS.isWindows());
        this.expectException("Failback to readonly tablestore");
        Path path = this.getTmpDir().toPath();
        path.toFile().mkdirs();
        try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary((Path)path).rollCycle((RollCycle)RollCycles.MINUTELY).testBlockSize().sourceId(1).build();){
            ExcerptAppender excerptAppender = queue.acquireAppender();
            VanillaMethodWriterBuilder methodWriterBuilder = excerptAppender.methodWriterBuilder(Say.class);
            methodWriterBuilder.recordHistory(true);
            Say events = (Say)methodWriterBuilder.build();
            for (int i = 0; i < 24; ++i) {
                events.say(i % 2 == 0 ? "hello" : "goodbye");
            }
        }
        Files.list(path).filter(f -> f.getFileName().toString().endsWith(".cq4t")).findFirst().ifPresent(p -> p.toFile().delete());
        GcControls.waitForGcCycle();
        new ChronicleReader().withBasePath(path).withMessageSink(this.capturedOutput::add).execute();
        Assert.assertFalse((boolean)this.capturedOutput.isEmpty());
    }

    @Test
    public void shouldNotFailOnEmptyQueue() {
        Path path = this.getTmpDir().toPath();
        path.toFile().mkdirs();
        if (OS.isWindows()) {
            this.expectException("Read-only mode is not supported on Windows");
        } else {
            this.expectException("Failback to readonly tablestore");
        }
        new ChronicleReader().withBasePath(path).withMessageSink(this.capturedOutput::add).execute();
        Assert.assertTrue((boolean)this.capturedOutput.isEmpty());
    }

    @Test
    public void shouldNotFailWhenNoMetadata() throws IOException {
        this.expectException("Failback to readonly tablestore");
        Files.list(this.dataDir).filter(f -> f.getFileName().toString().endsWith(".cq4t")).findFirst().ifPresent(path -> path.toFile().delete());
        this.basicReader().execute();
        Assert.assertTrue((boolean)this.capturedOutput.stream().anyMatch(msg -> msg.contains("history:")));
    }

    @Test
    public void shouldIncludeMessageHistoryByDefault() {
        this.basicReader().execute();
        Assert.assertTrue((boolean)this.capturedOutput.stream().anyMatch(msg -> msg.contains("history:")));
    }

    @Test
    public void shouldApplyIncludeRegexToHistoryMessagesAndBusinessMessages() {
        this.basicReader().withInclusionRegex("goodbye").asMethodReader(null).execute();
        Assert.assertFalse((boolean)this.capturedOutput.stream().anyMatch(msg -> msg.contains("history:")));
    }

    @Test(timeout=5000L)
    public void readOnlyQueueTailerShouldObserveChangesAfterInitiallyObservedReadLimit() throws IOException, InterruptedException, TimeoutException, ExecutionException {
        DirectoryUtils.deleteDir(this.dataDir.toFile());
        this.dataDir.toFile().mkdirs();
        try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary((Path)this.dataDir).testBlockSize().build();){
            Say events = (Say)queue.acquireAppender().methodWriterBuilder(Say.class).build();
            events.say("hello");
            long readerCapacity = ChronicleReaderTest.getCurrentQueueFileLength(this.dataDir);
            RecordCounter recordCounter = new RecordCounter();
            ChronicleReader chronicleReader = this.basicReader().withMessageSink((Consumer)recordCounter);
            ExecutorService executorService = Executors.newSingleThreadExecutor((ThreadFactory)new NamedThreadFactory("executor"));
            Future<?> submit = executorService.submit(() -> ((ChronicleReader)chronicleReader).execute());
            long expectedReadingDocumentCount = readerCapacity / (long)ONE_KILOBYTE.length + 1L;
            int i = 0;
            while ((long)i < expectedReadingDocumentCount) {
                events.say(new String(ONE_KILOBYTE));
                ++i;
            }
            recordCounter.latch.countDown();
            executorService.shutdown();
            executorService.awaitTermination(Jvm.isDebug() ? 50L : 5L, TimeUnit.SECONDS);
            submit.get(1L, TimeUnit.SECONDS);
            if (!OS.isWindows()) {
                Assert.assertEquals((long)expectedReadingDocumentCount, (long)(recordCounter.recordCount.get() - 1L));
            }
        }
    }

    @Test
    public void shouldBeAbleToReadFromReadOnlyFile() throws IOException {
        Assume.assumeFalse((String)"#460 read-only not supported on Windows", (boolean)OS.isWindows());
        Path queueFile = Files.list(this.dataDir).filter(f -> f.getFileName().toString().endsWith(".cq4")).findFirst().orElseThrow(() -> new AssertionError((Object)("Could not find queue file in directory " + this.dataDir)));
        Assert.assertTrue((boolean)queueFile.toFile().setWritable(false));
        this.basicReader().execute();
    }

    @Test
    public void shouldConvertEntriesToText() {
        this.basicReader().execute();
        Assert.assertEquals((long)48L, (long)this.capturedOutput.size());
        Assert.assertTrue((boolean)this.capturedOutput.stream().anyMatch(msg -> msg.contains("hello")));
    }

    @Test
    public void shouldFilterByInclusionRegex() {
        this.basicReader().withInclusionRegex(".*good.*").execute();
        Assert.assertEquals((long)24L, (long)this.capturedOutput.size());
        this.capturedOutput.stream().filter(msg -> !msg.startsWith("0x")).forEach(msg -> Assert.assertThat((Object)msg, (Matcher)CoreMatchers.containsString((String)"goodbye")));
    }

    @Test
    public void shouldFilterByMultipleInclusionRegex() {
        this.basicReader().withInclusionRegex(".*bye$").withInclusionRegex(".*o.*").execute();
        Assert.assertEquals((long)24L, (long)this.capturedOutput.size());
        this.capturedOutput.stream().filter(msg -> !msg.startsWith("0x")).forEach(msg -> Assert.assertThat((Object)msg, (Matcher)CoreMatchers.containsString((String)"goodbye")));
        this.capturedOutput.stream().filter(msg -> !msg.startsWith("0x")).forEach(msg -> Assert.assertThat((Object)msg, (Matcher)CoreMatchers.not((Matcher)CoreMatchers.containsString((String)"hello"))));
    }

    @Test(expected=IllegalArgumentException.class)
    public void shouldThrowExceptionIfInputDirectoryDoesNotExist() {
        this.basicReader().withBasePath(Paths.get("/does/not/exist", new String[0])).execute();
    }

    @Test
    public void shouldFilterByExclusionRegex() {
        this.basicReader().withExclusionRegex(".*good.*").execute();
        Assert.assertEquals((long)24L, (long)this.capturedOutput.size());
        this.capturedOutput.forEach(msg -> Assert.assertThat((Object)msg, (Matcher)CoreMatchers.not((Matcher)CoreMatchers.containsString((String)"goodbye"))));
    }

    @Test
    public void shouldFilterByMultipleExclusionRegex() {
        this.basicReader().withExclusionRegex(".*bye$").withExclusionRegex(".*ell.*").execute();
        Assert.assertEquals((long)0L, (long)this.capturedOutput.stream().filter(msg -> !msg.startsWith("0x")).count());
    }

    @Test
    public void shouldReturnNoMoreThanTheSpecifiedNumberOfMaxRecords() {
        this.basicReader().historyRecords(5L).execute();
        Assert.assertThat((Object)this.capturedOutput.stream().filter(msg -> !msg.startsWith("0x")).count(), (Matcher)CoreMatchers.is((Object)5L));
    }

    @Test
    public void shouldForwardToSpecifiedIndex() {
        long knownIndex = Long.decode(this.findAnExistingIndex());
        this.basicReader().withStartIndex(knownIndex).execute();
        Assert.assertEquals((long)24L, (long)this.capturedOutput.size());
        Assert.assertTrue((boolean)this.capturedOutput.poll().contains(Long.toHexString(knownIndex)));
    }

    @Test(expected=IllegalArgumentException.class)
    public void shouldFailIfSpecifiedIndexIsBeforeFirstIndex() {
        this.basicReader().withStartIndex(1L).execute();
    }

    @Test
    public void shouldNotRewindPastStartOfQueueWhenDisplayingHistory() {
        this.basicReader().historyRecords(Long.MAX_VALUE).execute();
        Assert.assertThat((Object)this.capturedOutput.stream().filter(msg -> !msg.startsWith("0x")).count(), (Matcher)CoreMatchers.is((Object)24L));
    }

    @Test
    public void shouldContinueToPollQueueWhenTailModeIsEnabled() {
        int expectedPollCountWhenDocumentIsEmpty = 3;
        FiniteDocumentPollMethod pollMethod = new FiniteDocumentPollMethod(3);
        try {
            this.basicReader().withDocumentPollMethod((Function)pollMethod).tail().execute();
        }
        catch (ArithmeticException arithmeticException) {
            // empty catch block
        }
        Assert.assertEquals((long)3L, (long)pollMethod.invocationCount);
    }

    private String findAnExistingIndex() {
        this.basicReader().execute();
        List indicies = this.capturedOutput.stream().filter(s -> s.startsWith("0x")).collect(Collectors.toList());
        this.capturedOutput.clear();
        return ((String)indicies.get(indicies.size() / 2)).trim().replaceAll(":", "");
    }

    private ChronicleReader basicReader() {
        if (OS.isWindows()) {
            this.expectException("Read-only mode is not supported on Windows");
        }
        return new ChronicleReader().withBasePath(this.dataDir).withMessageSink(this.capturedOutput::add);
    }

    @After
    public void clearInterrupt() {
        Thread.interrupted();
    }

    static {
        Arrays.fill(ONE_KILOBYTE, (byte)7);
    }

    private static final class FiniteDocumentPollMethod
    implements Function<ExcerptTailer, DocumentContext> {
        private final int maxPollsReturningEmptyDocument;
        private int invocationCount;

        private FiniteDocumentPollMethod(int maxPollsReturningEmptyDocument) {
            this.maxPollsReturningEmptyDocument = maxPollsReturningEmptyDocument;
        }

        @Override
        public DocumentContext apply(ExcerptTailer excerptTailer) {
            DocumentContext documentContext = excerptTailer.readingDocument();
            if (!documentContext.isPresent()) {
                ++this.invocationCount;
                if (this.invocationCount >= this.maxPollsReturningEmptyDocument) {
                    throw new ArithmeticException("For testing purposes");
                }
            }
            return documentContext;
        }
    }

    private static final class RecordCounter
    implements Consumer<String> {
        private final AtomicLong recordCount = new AtomicLong();
        private final CountDownLatch latch = new CountDownLatch(1);

        private RecordCounter() {
        }

        @Override
        public void accept(String msg) {
            try {
                this.latch.await();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (!msg.startsWith("0x")) {
                this.recordCount.incrementAndGet();
            }
        }
    }
}

