/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.queue.impl.single.stress;

import java.io.File;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import net.openhft.chronicle.bytes.PageUtil;
import net.openhft.chronicle.core.time.SetTimeProvider;
import net.openhft.chronicle.core.time.TimeProvider;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.QueueTestCommon;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.queue.rollcycles.TestRollCycles;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.wire.DocumentContext;
import org.jetbrains.annotations.NotNull;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;

public class RollCycleMultiThreadTest
extends QueueTestCommon {
    public static final RollCycle ROLL_CYCLE = TestRollCycles.TEST_DAILY;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRead1() throws ExecutionException, InterruptedException {
        this.finishedNormally = false;
        File path = this.getTmpDir();
        SetTimeProvider timeProvider = new SetTimeProvider();
        ExecutorService scheduledExecutorService = Executors.newSingleThreadExecutor((ThreadFactory)new NamedThreadFactory("testRead1"));
        try (SingleChronicleQueue queue0 = SingleChronicleQueueBuilder.binary((File)path).testBlockSize().rollCycle(ROLL_CYCLE).timeProvider((TimeProvider)timeProvider).build();){
            ParallelQueueObserver observer = new ParallelQueueObserver((ChronicleQueue)queue0);
            try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary((File)path).testBlockSize().rollCycle(ROLL_CYCLE).timeProvider((TimeProvider)timeProvider).build();
                 ExcerptAppender appender = queue.createAppender();){
                Assert.assertEquals((long)-2L, (long)scheduledExecutorService.submit(observer::call).get().intValue());
                timeProvider.advanceMillis(TimeUnit.DAYS.toMillis(2L));
                try (DocumentContext dc = appender.writingDocument();){
                    dc.wire().write((CharSequence)"say").text("Day 3 data");
                }
                Assert.assertEquals((long)1L, (long)scheduledExecutorService.submit(observer::call).get().intValue());
                Assert.assertEquals((long)1L, (long)observer.documentsRead);
            }
        }
        finally {
            scheduledExecutorService.shutdown();
            scheduledExecutorService.awaitTermination(1L, TimeUnit.SECONDS);
        }
        this.finishedNormally = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRead2() throws ExecutionException, InterruptedException {
        this.finishedNormally = false;
        File path = this.getTmpDir();
        Assume.assumeFalse((String)"Ignored on hugetlbfs as byte offsets will be different due to page size", (boolean)PageUtil.isHugePage((String)path.getAbsolutePath()));
        SetTimeProvider timeProvider = new SetTimeProvider();
        ExecutorService es = Executors.newSingleThreadExecutor((ThreadFactory)new NamedThreadFactory("testRead2"));
        try (SingleChronicleQueue queue0 = SingleChronicleQueueBuilder.binary((File)path).testBlockSize().rollCycle(ROLL_CYCLE).timeProvider((TimeProvider)timeProvider).build();){
            ParallelQueueObserver observer = new ParallelQueueObserver((ChronicleQueue)queue0);
            try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary((File)path).blockSize(65536).rollCycle(ROLL_CYCLE).timeProvider((TimeProvider)timeProvider).build();
                 ExcerptAppender appender = queue.createAppender();){
                try (DocumentContext dc = appender.writingDocument();){
                    dc.wire().write((CharSequence)"say").text("Day 1 data");
                }
                Assert.assertEquals((long)1L, (long)es.submit(observer).get().intValue());
                Assert.assertEquals((Object)"--- !!meta-data #binary\nheader: !STStore {\n  wireType: !WireType BINARY_LIGHT,\n  metadata: !SCQMeta {\n    roll: !SCQSRoll { length: 86400000, format: yyyyMMdd'T1', epoch: 0 },\n    sourceId: 0\n  }\n}\n# position: 152, header: 0\n--- !!data #binary\nlisting.highestCycle: 0\n# position: 192, header: 1\n--- !!data #binary\nlisting.lowestCycle: 0\n# position: 232, header: 2\n--- !!data #binary\nlisting.modCount: 5\n# position: 264, header: 3\n--- !!data #binary\nchronicle.write.lock: -9223372036854775808\n# position: 304, header: 4\n--- !!data #binary\nchronicle.append.lock: -9223372036854775808\n# position: 344, header: 5\n--- !!data #binary\nchronicle.lastIndexReplicated: -1\n# position: 392, header: 6\n--- !!data #binary\nchronicle.lastAcknowledgedIndexReplicated: -1\n# position: 448, header: 7\n--- !!data #binary\nchronicle.lastIndexMSynced: -1\n...\n# 130572 bytes remaining\n--- !!meta-data #binary\nheader: !SCQStore {\n  writePosition: [\n    400,\n    1717986918400\n  ],\n  indexing: !SCQSIndexing {\n    indexCount: 8,\n    indexSpacing: 1,\n    index2Index: 200,\n    lastIndex: 1\n  },\n  dataFormat: 1\n}\n# position: 200, header: -1\n--- !!meta-data #binary\nindex2index: [\n  # length: 8, used: 1\n  304,\n  0, 0, 0, 0, 0, 0, 0\n]\n# position: 304, header: -1\n--- !!meta-data #binary\nindex: [\n  # length: 8, used: 1\n  400,\n  0, 0, 0, 0, 0, 0, 0\n]\n# position: 400, header: 0\n--- !!data #binary\nsay: Day 1 data\n...\n# 130648 bytes remaining\n", (Object)queue.dump());
                timeProvider.advanceMillis(TimeUnit.DAYS.toMillis(2L));
                dc = appender.writingDocument();
                var12_18 = null;
                try {
                    dc.wire().write((CharSequence)"say").text("Day 3 data");
                }
                catch (Throwable throwable) {
                    var12_18 = throwable;
                    throw throwable;
                }
                finally {
                    if (dc != null) {
                        if (var12_18 != null) {
                            try {
                                dc.close();
                            }
                            catch (Throwable throwable) {
                                var12_18.addSuppressed(throwable);
                            }
                        } else {
                            dc.close();
                        }
                    }
                }
                Assert.assertEquals((Object)"--- !!meta-data #binary\nheader: !STStore {\n  wireType: !WireType BINARY_LIGHT,\n  metadata: !SCQMeta {\n    roll: !SCQSRoll { length: 86400000, format: yyyyMMdd'T1', epoch: 0 },\n    sourceId: 0\n  }\n}\n# position: 152, header: 0\n--- !!data #binary\nlisting.highestCycle: 2\n# position: 192, header: 1\n--- !!data #binary\nlisting.lowestCycle: 0\n# position: 232, header: 2\n--- !!data #binary\nlisting.modCount: 8\n# position: 264, header: 3\n--- !!data #binary\nchronicle.write.lock: -9223372036854775808\n# position: 304, header: 4\n--- !!data #binary\nchronicle.append.lock: -9223372036854775808\n# position: 344, header: 5\n--- !!data #binary\nchronicle.lastIndexReplicated: -1\n# position: 392, header: 6\n--- !!data #binary\nchronicle.lastAcknowledgedIndexReplicated: -1\n# position: 448, header: 7\n--- !!data #binary\nchronicle.lastIndexMSynced: -1\n...\n# 130572 bytes remaining\n--- !!meta-data #binary\nheader: !SCQStore {\n  writePosition: [\n    400,\n    1717986918400\n  ],\n  indexing: !SCQSIndexing {\n    indexCount: 8,\n    indexSpacing: 1,\n    index2Index: 200,\n    lastIndex: 1\n  },\n  dataFormat: 1\n}\n# position: 200, header: -1\n--- !!meta-data #binary\nindex2index: [\n  # length: 8, used: 1\n  304,\n  0, 0, 0, 0, 0, 0, 0\n]\n# position: 304, header: -1\n--- !!meta-data #binary\nindex: [\n  # length: 8, used: 1\n  400,\n  0, 0, 0, 0, 0, 0, 0\n]\n# position: 400, header: 0\n--- !!data #binary\nsay: Day 1 data\n# position: 420, header: 0 EOF\n--- !!not-ready-meta-data #binary\n...\n# 130648 bytes remaining\n--- !!meta-data #binary\nheader: !SCQStore {\n  writePosition: [\n    400,\n    1717986918400\n  ],\n  indexing: !SCQSIndexing {\n    indexCount: 8,\n    indexSpacing: 1,\n    index2Index: 200,\n    lastIndex: 1\n  },\n  dataFormat: 1\n}\n# position: 200, header: -1\n--- !!meta-data #binary\nindex2index: [\n  # length: 8, used: 1\n  304,\n  0, 0, 0, 0, 0, 0, 0\n]\n# position: 304, header: -1\n--- !!meta-data #binary\nindex: [\n  # length: 8, used: 1\n  400,\n  0, 0, 0, 0, 0, 0, 0\n]\n# position: 400, header: 0\n--- !!data #binary\nsay: Day 3 data\n...\n# 130648 bytes remaining\n", (Object)queue.dump());
                Assert.assertEquals((long)2L, (long)es.submit(observer).get().intValue());
                Assert.assertEquals((long)2L, (long)observer.documentsRead);
            }
        }
        finally {
            es.shutdown();
            es.awaitTermination(1L, TimeUnit.SECONDS);
        }
        this.finishedNormally = true;
    }

    private static class ParallelQueueObserver
    implements Callable<Integer> {
        @NotNull
        private final ExcerptTailer tailer;
        private final StringBuilder sb = new StringBuilder();
        volatile int documentsRead = 0;

        ParallelQueueObserver(@NotNull ChronicleQueue queue) {
            this.tailer = queue.createTailer();
        }

        @Override
        public synchronized Integer call() {
            System.out.println("index=" + Long.toHexString(this.tailer.index()));
            try (DocumentContext dc = this.tailer.readingDocument();){
                System.out.println("... index=" + Long.toHexString(this.tailer.index()));
                if (!dc.isPresent()) {
                    Integer n = -2;
                    return n;
                }
                this.sb.setLength(0);
                dc.wire().read("say").text(this.sb);
                if (this.sb.length() == 0) {
                    Integer n = -1;
                    return n;
                }
            }
            System.out.println("+++ index=" + Long.toHexString(this.tailer.index()));
            return ++this.documentsRead;
        }
    }
}

