/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.queue.impl.single;

import java.io.File;
import java.io.PrintStream;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.time.SetTimeProvider;
import net.openhft.chronicle.core.time.TimeProvider;
import net.openhft.chronicle.queue.DirectoryUtils;
import net.openhft.chronicle.queue.DumpQueueMain;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.ValueOut;
import org.jetbrains.annotations.NotNull;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RollCycleMultiThreadStressTest {
    final Logger LOG = LoggerFactory.getLogger(this.getClass());
    static final long SLEEP_PER_WRITE_NANOS = Long.getLong("writeLatency", 40000L);
    static final int TEST_TIME = Integer.getInteger("testTime", 2);
    static final int ROLL_EVERY_MS = Integer.getInteger("rollEvery", 100);
    static final int DELAY_READER_RANDOM_MS = Integer.getInteger("delayReader", 1);
    static final int DELAY_WRITER_RANDOM_MS = Integer.getInteger("delayWriter", 1);
    static final int WRITE_ONE_THEN_WAIT_MS = Integer.getInteger("writeOneThenWait", 0);
    static final int CORES = Integer.getInteger("cores", Runtime.getRuntime().availableProcessors());
    static final Random random = new Random(99L);
    static final int NUMBER_OF_INTS = Integer.getInteger("numberInts", 18);
    static final boolean PRETOUCH = Boolean.getBoolean("pretouch");
    static final boolean READERS_READ_ONLY = Boolean.getBoolean("read_only");
    static final boolean DUMP_QUEUE = Boolean.getBoolean("dump_queue");
    final SetTimeProvider timeProvider = new SetTimeProvider();

    @Ignore(value="run manually")
    @Test
    public void repeatStress() throws InterruptedException {
        for (int i = 0; i < 100; ++i) {
            this.stress();
        }
    }

    @Test
    public void stress() throws InterruptedException {
        boolean allReadersComplete;
        long now;
        int i;
        File path = Optional.ofNullable(System.getProperty("stress.test.dir")).map(s -> new File((String)s, UUID.randomUUID().toString())).orElse(DirectoryUtils.tempDir("rollCycleStress"));
        System.out.printf("Queue dir: %s at %s%n", path.getAbsolutePath(), Instant.now());
        int numThreads = CORES;
        int numWriters = numThreads / 4 + 1;
        ExecutorService executorServicePretouch = Executors.newSingleThreadExecutor((ThreadFactory)new NamedThreadFactory("pretouch"));
        ExecutorService executorServiceWrite = Executors.newFixedThreadPool(numWriters, (ThreadFactory)new NamedThreadFactory("writer"));
        ExecutorService executorServiceRead = Executors.newFixedThreadPool(numThreads - numWriters, (ThreadFactory)new NamedThreadFactory("reader"));
        AtomicInteger wrote = new AtomicInteger();
        int expectedNumberOfMessages = (int)((double)TEST_TIME * 1.0E9 / (double)SLEEP_PER_WRITE_NANOS);
        System.out.printf("Running test with %d writers and %d readers, sleep %dns%n", numWriters, numThreads - numWriters, SLEEP_PER_WRITE_NANOS);
        System.out.printf("Writing %d messages with %dns interval%n", expectedNumberOfMessages, SLEEP_PER_WRITE_NANOS);
        System.out.printf("Should take ~%dsec%n", TimeUnit.NANOSECONDS.toSeconds((long)expectedNumberOfMessages * SLEEP_PER_WRITE_NANOS) / (long)numWriters);
        ArrayList<Future<Throwable>> results = new ArrayList<Future<Throwable>>();
        ArrayList<Reader> readers = new ArrayList<Reader>();
        ArrayList<Writer> writers = new ArrayList<Writer>();
        PretoucherThread pretoucherThread = null;
        if (PRETOUCH) {
            pretoucherThread = new PretoucherThread(path);
            executorServicePretouch.submit(pretoucherThread);
        }
        if (WRITE_ONE_THEN_WAIT_MS > 0) {
            Writer tempWriter = new Writer(path, wrote, expectedNumberOfMessages);
            try (SingleChronicleQueue queue = tempWriter.queue();){
                tempWriter.write(queue.acquireAppender());
            }
        }
        for (i = 0; i < numThreads - numWriters; ++i) {
            Reader reader2 = new Reader(path, expectedNumberOfMessages);
            readers.add(reader2);
            results.add(executorServiceRead.submit(reader2));
        }
        if (WRITE_ONE_THEN_WAIT_MS > 0) {
            this.LOG.warn("Wrote one now waiting for {}ms", (Object)WRITE_ONE_THEN_WAIT_MS);
            Jvm.pause((long)WRITE_ONE_THEN_WAIT_MS);
        }
        for (i = 0; i < numWriters; ++i) {
            Writer writer = new Writer(path, wrote, expectedNumberOfMessages);
            writers.add(writer);
            results.add(executorServiceWrite.submit(writer));
        }
        long maxWritingTime = TimeUnit.SECONDS.toMillis(TEST_TIME + 5) + this.queueBuilder(path).timeoutMS();
        long startTime = System.currentTimeMillis();
        long giveUpWritingAt = startTime + maxWritingTime;
        long nextRollTime = System.currentTimeMillis() + (long)ROLL_EVERY_MS;
        long nextCheckTime = System.currentTimeMillis() + 5000L;
        int i2 = 0;
        while ((now = System.currentTimeMillis()) < giveUpWritingAt && wrote.get() < expectedNumberOfMessages) {
            if (now > nextRollTime) {
                this.timeProvider.advanceMillis(1000L);
                nextRollTime += (long)ROLL_EVERY_MS;
            }
            if (now > nextCheckTime) {
                String readersLastRead = readers.stream().map(reader -> Integer.toString(reader.lastRead)).collect(Collectors.joining(","));
                System.out.printf("Writer has written %d of %d messages after %dms. Readers at %s. Waiting...%n", wrote.get() + 1, expectedNumberOfMessages, i2 * 10, readersLastRead);
                readers.stream().filter(r -> !r.isMakingProgress()).findAny().ifPresent(reader -> {
                    if (reader.exception != null) {
                        throw new AssertionError("Reader encountered exception, so stopped reading messages", reader.exception);
                    }
                    throw new AssertionError((Object)"Reader is stuck");
                });
                if (pretoucherThread != null && pretoucherThread.exception != null) {
                    throw new AssertionError("Preloader encountered exception", pretoucherThread.exception);
                }
                nextCheckTime = System.currentTimeMillis() + 10000L;
            }
            ++i2;
            Jvm.pause((long)5L);
        }
        double timeToWriteSecs = (double)(System.currentTimeMillis() - startTime) / 1000.0;
        StringBuilder writerExceptions = new StringBuilder();
        writers.stream().filter(w -> w.exception != null).forEach(w -> writerExceptions.append("Writer failed due to: ").append(w.exception.getMessage()).append("\n"));
        Assert.assertTrue((String)("Wrote " + wrote.get() + " which is less than " + expectedNumberOfMessages + " within timeout. " + writerExceptions), (wrote.get() >= expectedNumberOfMessages ? 1 : 0) != 0);
        readers.stream().filter(r -> r.exception != null).findAny().ifPresent(reader -> {
            throw new AssertionError("Reader encountered exception, so stopped reading messages", reader.exception);
        });
        System.out.println(String.format("All messages written in %,.0fsecs at rate of %,.0f/sec %,.0f/sec per writer (actual writeLatency %,.0fns)", timeToWriteSecs, (double)expectedNumberOfMessages / timeToWriteSecs, (double)expectedNumberOfMessages / timeToWriteSecs / (double)numWriters, 1.0E9 / ((double)expectedNumberOfMessages / timeToWriteSecs / (double)numWriters)));
        long giveUpReadingAt = System.currentTimeMillis() + 60000L;
        long dumpThreadsAt = giveUpReadingAt - 15000L;
        while (System.currentTimeMillis() < giveUpReadingAt && !(allReadersComplete = RollCycleMultiThreadStressTest.areAllReadersComplete(expectedNumberOfMessages, readers))) {
            if (dumpThreadsAt < System.currentTimeMillis()) {
                Thread.getAllStackTraces().forEach((n, st) -> {
                    System.out.println("\n\n" + n + "\n\n");
                    Arrays.stream(st).forEach(System.out::println);
                });
            }
            System.out.printf("Not all readers are complete. Waiting...%n", new Object[0]);
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
        }
        Assert.assertTrue((String)"Readers did not catch up", (boolean)RollCycleMultiThreadStressTest.areAllReadersComplete(expectedNumberOfMessages, readers));
        executorServiceRead.shutdown();
        executorServiceWrite.shutdown();
        executorServicePretouch.shutdown();
        if (!executorServiceRead.awaitTermination(1L, TimeUnit.SECONDS)) {
            executorServiceRead.shutdownNow();
        }
        if (!executorServiceWrite.awaitTermination(1L, TimeUnit.SECONDS)) {
            executorServiceWrite.shutdownNow();
        }
        if (!executorServicePretouch.awaitTermination(1L, TimeUnit.SECONDS)) {
            executorServicePretouch.shutdownNow();
        }
        results.forEach(f -> {
            try {
                Throwable exception = (Throwable)f.get();
                if (exception != null) {
                    exception.printStackTrace();
                }
            }
            catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        System.out.println("Test complete");
        DirectoryUtils.deleteDir(path);
    }

    @NotNull
    SingleChronicleQueueBuilder queueBuilder(File path) {
        return SingleChronicleQueueBuilder.binary((File)path).testBlockSize().timeProvider((TimeProvider)this.timeProvider).rollCycle((RollCycle)RollCycles.TEST_SECONDLY);
    }

    static boolean areAllReadersComplete(int expectedNumberOfMessages, List<Reader> readers) {
        boolean allReadersComplete = true;
        int count = 0;
        for (Reader reader : readers) {
            ++count;
            if (reader.lastRead >= expectedNumberOfMessages - 1) continue;
            allReadersComplete = false;
            System.out.printf("Reader #%d last read: %d%n", count, reader.lastRead);
        }
        return allReadersComplete;
    }

    static {
        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
        System.setProperty("disableFastForwardHeaderNumber", "true");
        System.setProperty("org.slf4j.simpleLogger.dateTimeFormat", "HH:mm:ss.SSS");
        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "WARN");
    }

    class PretoucherThread
    implements Callable<Throwable> {
        final File path;
        volatile Throwable exception;

        PretoucherThread(File path) {
            this.path = path;
        }

        @Override
        public Throwable call() throws Exception {
            SingleChronicleQueue queue0 = null;
            try (SingleChronicleQueue queue = RollCycleMultiThreadStressTest.this.queueBuilder(this.path).build();){
                queue0 = queue;
                ExcerptAppender appender = queue.acquireAppender();
                System.out.println("Starting pretoucher");
                while (!Thread.currentThread().isInterrupted() && !queue.isClosed()) {
                    Jvm.pause((long)50L);
                    appender.pretouch();
                }
            }
            catch (Throwable e) {
                if (queue0 != null && queue0.isClosed()) {
                    return null;
                }
                this.exception = e;
                return e;
            }
            return null;
        }
    }

    final class Writer
    implements Callable<Throwable> {
        final File path;
        final AtomicInteger wrote;
        final int expectedNumberOfMessages;
        volatile Throwable exception;

        Writer(File path, AtomicInteger wrote, int expectedNumberOfMessages) {
            this.path = path;
            this.wrote = wrote;
            this.expectedNumberOfMessages = expectedNumberOfMessages;
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public Throwable call() {
            try (SingleChronicleQueue queue = this.queue();){
                ExcerptAppender appender = queue.acquireAppender();
                Jvm.pause((long)random.nextInt(DELAY_WRITER_RANDOM_MS));
                long startTime = System.nanoTime();
                int loopIteration = 0;
                while (true) {
                    int value = this.write(appender);
                    while (System.nanoTime() < startTime + (long)loopIteration * SLEEP_PER_WRITE_NANOS) {
                    }
                    ++loopIteration;
                    if (value >= this.expectedNumberOfMessages) {
                        Throwable throwable = null;
                        return throwable;
                    }
                    continue;
                    break;
                }
            }
            catch (Throwable e) {
                this.exception = e;
                return e;
            }
        }

        private SingleChronicleQueue queue() {
            return RollCycleMultiThreadStressTest.this.queueBuilder(this.path).build();
        }

        private int write(ExcerptAppender appender) {
            int value;
            try (DocumentContext writingDocument = appender.writingDocument();){
                long documentAcquireTimestamp = System.nanoTime();
                value = this.wrote.getAndIncrement();
                ValueOut valueOut = writingDocument.wire().getValueOut();
                valueOut.int64(documentAcquireTimestamp);
                for (int i = 0; i < NUMBER_OF_INTS; ++i) {
                    valueOut.int32(value);
                }
                writingDocument.wire().padToCacheAlign();
            }
            return value;
        }
    }

    final class Reader
    implements Callable<Throwable> {
        final File path;
        final int expectedNumberOfMessages;
        volatile int lastRead = -1;
        volatile Throwable exception;
        int readSequenceAtLastProgressCheck = -1;

        Reader(File path, int expectedNumberOfMessages) {
            this.path = path;
            this.expectedNumberOfMessages = expectedNumberOfMessages;
        }

        boolean isMakingProgress() {
            if (this.readSequenceAtLastProgressCheck == -1) {
                return true;
            }
            boolean makingProgress = this.lastRead > this.readSequenceAtLastProgressCheck;
            this.readSequenceAtLastProgressCheck = this.lastRead;
            return makingProgress;
        }

        @Override
        public Throwable call() {
            SingleChronicleQueueBuilder builder = RollCycleMultiThreadStressTest.this.queueBuilder(this.path);
            if (READERS_READ_ONLY) {
                builder.readOnly(true);
            }
            try (SingleChronicleQueue queue = builder.build();){
                ExcerptTailer tailer = queue.createTailer();
                int lastTailerCycle = -1;
                int lastQueueCycle = -1;
                Jvm.pause((long)random.nextInt(DELAY_READER_RANDOM_MS));
                while (this.lastRead != this.expectedNumberOfMessages - 1) {
                    DocumentContext dc = tailer.readingDocument();
                    Throwable throwable = null;
                    try {
                        if (!dc.isPresent()) continue;
                        int v = -1;
                        ValueIn valueIn = dc.wire().getValueIn();
                        long documentAcquireTimestamp = valueIn.int64();
                        if (documentAcquireTimestamp == 0L) {
                            throw new AssertionError((Object)"No timestamp");
                        }
                        for (int i = 0; i < NUMBER_OF_INTS; ++i) {
                            v = valueIn.int32();
                            if (this.lastRead + 1 == v) continue;
                            System.out.println(dc.wire());
                            String failureMessage = "Expected: " + (this.lastRead + 1) + ", actual: " + v + ", pos: " + i + ", index: " + Long.toHexString(dc.index()) + ", cycle: " + tailer.cycle();
                            if (lastTailerCycle != -1) {
                                failureMessage = failureMessage + ". Tailer cycle at last read: " + lastTailerCycle + " (current: " + tailer.cycle() + "), queue cycle at last read: " + lastQueueCycle + " (current: " + queue.cycle() + ")";
                            }
                            if (DUMP_QUEUE) {
                                DumpQueueMain.dump((File)queue.file(), (PrintStream)System.out, (long)Long.MAX_VALUE);
                            }
                            throw new AssertionError((Object)failureMessage);
                        }
                        this.lastRead = v;
                        lastTailerCycle = tailer.cycle();
                        lastQueueCycle = queue.cycle();
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (dc == null) continue;
                        if (throwable != null) {
                            try {
                                dc.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        dc.close();
                    }
                }
            }
            catch (Throwable e) {
                e.printStackTrace();
                this.exception = e;
                return e;
            }
            return null;
        }
    }
}

