/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.queue.impl.single;

import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import net.openhft.chronicle.queue.DirectoryUtils;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.ValueOut;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

public final class DocumentOrderingTest {
    private static final RollCycles ROLL_CYCLE = RollCycles.TEST_SECONDLY;
    private final ExecutorService executorService = Executors.newCachedThreadPool();
    private final AtomicLong clock = new AtomicLong(System.currentTimeMillis());
    private final AtomicInteger counter = new AtomicInteger(0);

    private static void expectValue(int expectedValue, ExcerptTailer tailer) {
        try (DocumentContext documentContext = tailer.readingDocument();){
            Assert.assertTrue((boolean)documentContext.isPresent());
            Assert.assertEquals((long)expectedValue, (long)documentContext.wire().getValueIn().int32());
        }
    }

    @Test
    public void queuedWriteInPreviousCycleShouldRespectTotalOrdering() throws Exception {
        try (SingleChronicleQueue queue = this.builder(DirectoryUtils.tempDir("document-ordering"), 1000L).build();){
            ExcerptAppender excerptAppender = queue.acquireAppender();
            excerptAppender.writeDocument((Object)"foo", ValueOut::text);
            DocumentContext firstOpenDocument = excerptAppender.writingDocument();
            firstOpenDocument.wire().getValueOut().int32(this.counter.getAndIncrement());
            Future<RecordInfo> secondDocumentInFirstCycle = this.attemptToWriteDocument(queue);
            this.clock.addAndGet(TimeUnit.SECONDS.toMillis(2L));
            Future<RecordInfo> otherDocumentWriter = this.attemptToWriteDocument(queue);
            firstOpenDocument.close();
            secondDocumentInFirstCycle.get(5L, TimeUnit.SECONDS);
            ExcerptTailer tailer = queue.createTailer();
            tailer.readingDocument().close();
            DocumentOrderingTest.expectValue(0, tailer);
            DocumentOrderingTest.expectValue(1, tailer);
            DocumentOrderingTest.expectValue(2, tailer);
            Assert.assertThat((Object)tailer.readingDocument().isPresent(), (Matcher)CoreMatchers.is((Object)false));
        }
    }

    @Test
    @Ignore(value="write lock prevents writing")
    public void shouldRecoverFromUnfinishedFirstMessageInPreviousQueue() throws Exception {
        try (SingleChronicleQueue queue = this.builder(DirectoryUtils.tempDir("document-ordering"), 1000L).build();){
            ExcerptAppender excerptAppender = queue.acquireAppender();
            DocumentContext documentContext = excerptAppender.writingDocument();
            documentContext.wire().getValueOut().int32(this.counter.getAndIncrement());
            this.clock.addAndGet(TimeUnit.SECONDS.toMillis(2L));
            Future<RecordInfo> otherDocumentWriter = this.attemptToWriteDocument(queue);
            Assert.assertEquals((long)1L, (long)otherDocumentWriter.get(5L, TimeUnit.SECONDS).counterValue);
            ExcerptTailer tailer = queue.createTailer();
            DocumentOrderingTest.expectValue(1, tailer);
            Assert.assertThat((Object)tailer.readingDocument().isPresent(), (Matcher)CoreMatchers.is((Object)false));
        }
    }

    @Test
    public void multipleThreadsMustWaitUntilPreviousCycleFileIsCompleted() throws Exception {
        File dir = DirectoryUtils.tempDir("document-ordering");
        try (SingleChronicleQueue queue = this.builder(dir, 5000L).build();
             SingleChronicleQueue queue2 = this.builder(dir, 5000L).build();
             SingleChronicleQueue queue3 = this.builder(dir, 5000L).build();
             SingleChronicleQueue queue4 = this.builder(dir, 5000L).build();){
            Future<RecordInfo> thirdWriter;
            Future<RecordInfo> secondWriter;
            Future<RecordInfo> firstWriter;
            ExcerptAppender excerptAppender = queue.acquireAppender();
            try (DocumentContext documentContext = excerptAppender.writingDocument();){
                this.clock.addAndGet(TimeUnit.SECONDS.toMillis(2L));
                firstWriter = this.attemptToWriteDocument(queue2);
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
                secondWriter = this.attemptToWriteDocument(queue3);
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
                thirdWriter = this.attemptToWriteDocument(queue4);
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2L));
                documentContext.wire().getValueOut().int32(this.counter.getAndIncrement());
            }
            firstWriter.get(5L, TimeUnit.SECONDS);
            secondWriter.get(5L, TimeUnit.SECONDS);
            thirdWriter.get(5L, TimeUnit.SECONDS);
            ExcerptTailer tailer = queue.createTailer();
            DocumentOrderingTest.expectValue(0, tailer);
            DocumentOrderingTest.expectValue(1, tailer);
            DocumentOrderingTest.expectValue(2, tailer);
            DocumentOrderingTest.expectValue(3, tailer);
        }
    }

    @Test
    public void codeWithinPriorDocumentMustExecuteBeforeSubsequentDocumentWhenQueueIsEmpty() throws Exception {
        try (SingleChronicleQueue queue = this.builder(DirectoryUtils.tempDir("document-ordering"), 3000L).build();){
            Future<RecordInfo> otherDocumentWriter;
            ExcerptAppender excerptAppender = queue.acquireAppender();
            try (DocumentContext documentContext = excerptAppender.writingDocument();){
                this.clock.addAndGet(TimeUnit.SECONDS.toMillis(2L));
                otherDocumentWriter = this.attemptToWriteDocument(queue);
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2L));
                documentContext.wire().getValueOut().int32(this.counter.getAndIncrement());
            }
            Assert.assertEquals((long)1L, (long)otherDocumentWriter.get(5L, TimeUnit.SECONDS).counterValue);
            ExcerptTailer tailer = queue.createTailer();
            DocumentOrderingTest.expectValue(0, tailer);
            DocumentOrderingTest.expectValue(1, tailer);
        }
    }

    @Test
    public void codeWithinPriorDocumentMustExecuteBeforeSubsequentDocumentWhenQueueIsNotEmpty() throws Exception {
        try (SingleChronicleQueue queue = this.builder(DirectoryUtils.tempDir("document-ordering"), 3000L).build();){
            Future<RecordInfo> otherDocumentWriter;
            ExcerptAppender excerptAppender = queue.acquireAppender();
            excerptAppender.writeDocument((Object)"foo", ValueOut::text);
            try (DocumentContext documentContext = excerptAppender.writingDocument();){
                this.clock.addAndGet(TimeUnit.SECONDS.toMillis(2L));
                otherDocumentWriter = this.attemptToWriteDocument(queue);
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2L));
                documentContext.wire().getValueOut().int32(this.counter.getAndIncrement());
            }
            Assert.assertEquals((long)1L, (long)otherDocumentWriter.get(5L, TimeUnit.SECONDS).counterValue);
            ExcerptTailer tailer = queue.createTailer();
            DocumentContext documentContext = tailer.readingDocument();
            Assert.assertTrue((boolean)documentContext.isPresent());
            documentContext.close();
            DocumentOrderingTest.expectValue(0, tailer);
            DocumentOrderingTest.expectValue(1, tailer);
        }
    }

    @After
    public void tearDown() {
        this.executorService.shutdownNow();
    }

    private Future<RecordInfo> attemptToWriteDocument(SingleChronicleQueue queue) throws InterruptedException {
        CountDownLatch startedLatch = new CountDownLatch(1);
        Future<RecordInfo> future = this.executorService.submit(() -> {
            int counterValue;
            startedLatch.countDown();
            try (DocumentContext documentContext = queue.acquireAppender().writingDocument();){
                counterValue = this.counter.getAndIncrement();
                documentContext.wire().getValueOut().int32(counterValue);
            }
            return new RecordInfo(counterValue);
        });
        Assert.assertTrue((String)"Task did not start", (boolean)startedLatch.await(1L, TimeUnit.MINUTES));
        return future;
    }

    private SingleChronicleQueueBuilder builder(File dir, long timeoutMS) {
        return SingleChronicleQueueBuilder.binary((File)dir).testBlockSize().rollCycle((RollCycle)ROLL_CYCLE).timeProvider(this.clock::get).timeoutMS(timeoutMS);
    }

    private static final class RecordInfo {
        private final int counterValue;

        RecordInfo(int counterValue) {
            this.counterValue = counterValue;
        }
    }
}

