/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.queue.impl.single;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import net.openhft.chronicle.queue.DirectoryUtils;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.ValueOut;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RollCycleMultiThreadStressTest {
    private static final Logger LOG = LoggerFactory.getLogger(RollCycleMultiThreadStressTest.class);
    private static final long SLEEP_PER_WRITE_NANOS = Long.getLong("writeLatency", 10000L);

    @Ignore(value="long running")
    @Test
    public void stress() throws Exception {
        boolean allReadersComplete;
        int i;
        File path = DirectoryUtils.tempDir("rollCycleStress");
        LOG.warn("using path {}", (Object)path);
        int numThreads = Runtime.getRuntime().availableProcessors();
        int numWriters = numThreads / 4 + 1;
        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
        AtomicInteger wrote = new AtomicInteger();
        int expectedNumberOfMessages = (int)(9.0E10 / (double)SLEEP_PER_WRITE_NANOS);
        System.out.printf("Running test with %d writers and %d readers%n", numWriters, numThreads - numWriters);
        System.out.printf("Writing %d messages with %dns interval%n", expectedNumberOfMessages, SLEEP_PER_WRITE_NANOS);
        System.out.printf("Should take ~%dsec%n", TimeUnit.NANOSECONDS.toSeconds((long)expectedNumberOfMessages * SLEEP_PER_WRITE_NANOS) / (long)numWriters);
        ArrayList<Future<Throwable>> results = new ArrayList<Future<Throwable>>();
        ArrayList<Reader> readers = new ArrayList<Reader>();
        for (i = 0; i < numWriters; ++i) {
            results.add(executorService.submit(new Writer(path, wrote, expectedNumberOfMessages)));
        }
        for (i = 0; i < numThreads - numWriters; ++i) {
            Reader reader = new Reader(path, expectedNumberOfMessages);
            readers.add(reader);
            results.add(executorService.submit(reader));
        }
        long maxWritingTime = TimeUnit.MINUTES.toMillis(3L);
        long giveUpWritingAt = System.currentTimeMillis() + maxWritingTime;
        int i2 = 0;
        while (System.currentTimeMillis() < giveUpWritingAt && wrote.get() < expectedNumberOfMessages) {
            System.out.printf("Writer has written %d of %d messages after %ds. Waiting...%n", wrote.get() + 1, expectedNumberOfMessages, i2 * 10);
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10L));
            ++i2;
        }
        Assert.assertTrue((String)("Did not write " + expectedNumberOfMessages + " within timeout"), (wrote.get() >= expectedNumberOfMessages ? 1 : 0) != 0);
        long giveUpReadingAt = System.currentTimeMillis() + 60000L;
        while (System.currentTimeMillis() < giveUpReadingAt && !(allReadersComplete = this.areAllReadersComplete(expectedNumberOfMessages, readers))) {
            System.out.printf("Not all readers are complete. Waiting...%n", new Object[0]);
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10L));
        }
        Assert.assertTrue((String)"Readers did not catch up", (boolean)this.areAllReadersComplete(expectedNumberOfMessages, readers));
        executorService.shutdownNow();
        results.forEach(f -> {
            try {
                Throwable exception = (Throwable)f.get();
                if (exception != null) {
                    exception.printStackTrace();
                }
            }
            catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        System.out.println("Test complete");
    }

    private boolean areAllReadersComplete(int expectedNumberOfMessages, List<Reader> readers) {
        boolean allReadersComplete = true;
        for (Reader reader : readers) {
            if (reader.lastRead >= expectedNumberOfMessages - 1) continue;
            allReadersComplete = false;
            System.out.printf("Reader last read: %d%n", reader.lastRead);
        }
        return allReadersComplete;
    }

    private static final class Writer
    implements Callable<Throwable> {
        private final File path;
        private final AtomicInteger wrote;
        private final int expectedNumberOfMessages;

        private Writer(File path, AtomicInteger wrote, int expectedNumberOfMessages) {
            this.path = path;
            this.wrote = wrote;
            this.expectedNumberOfMessages = expectedNumberOfMessages;
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public Throwable call() {
            try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary((File)this.path).testBlockSize().rollCycle((RollCycle)RollCycles.TEST_SECONDLY).build();){
                ExcerptAppender appender = queue.acquireAppender();
                long nextTime = System.nanoTime();
                while (true) {
                    int value;
                    try (DocumentContext writingDocument = appender.writingDocument();){
                        value = this.wrote.getAndIncrement();
                        ValueOut valueOut = writingDocument.wire().getValueOut();
                        valueOut.int32(value);
                        writingDocument.wire().addPadding(60);
                        long delay = nextTime - System.nanoTime();
                        if (delay > 0L) {
                            LockSupport.parkNanos(delay);
                        }
                        nextTime = (long)((double)nextTime + (double)SLEEP_PER_WRITE_NANOS * 0.99);
                    }
                    if (value >= this.expectedNumberOfMessages) {
                        Throwable throwable = null;
                        return throwable;
                    }
                    continue;
                    break;
                }
            }
            catch (Exception e) {
                return e;
            }
        }
    }

    private static final class Reader
    implements Callable<Throwable> {
        private final File path;
        private final int expectedNumberOfMessages;
        private volatile int lastRead = -1;

        Reader(File path, int expectedNumberOfMessages) {
            this.path = path;
            this.expectedNumberOfMessages = expectedNumberOfMessages;
        }

        @Override
        public Throwable call() {
            try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary((File)this.path).testBlockSize().rollCycle((RollCycle)RollCycles.TEST_SECONDLY).build();){
                ExcerptTailer tailer = queue.createTailer();
                while (this.lastRead != this.expectedNumberOfMessages - 1) {
                    DocumentContext rd = tailer.readingDocument();
                    Throwable throwable = null;
                    try {
                        if (!rd.isPresent()) continue;
                        int v = rd.wire().getValueIn().int32();
                        Assert.assertEquals((long)(this.lastRead + 1), (long)v);
                        this.lastRead = v;
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (rd == null) continue;
                        if (throwable != null) {
                            try {
                                rd.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        rd.close();
                    }
                }
            }
            catch (Throwable e) {
                e.printStackTrace();
                return e;
            }
            return null;
        }
    }
}

