/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.queue.impl.single;

import java.io.File;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import net.openhft.chronicle.queue.DirectoryUtils;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.ValueOut;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RollCycleMultiThreadStressTest {
    private static final Logger LOG = LoggerFactory.getLogger(RollCycleMultiThreadStressTest.class);
    private static final long SLEEP_PER_WRITE_NANOS = Long.getLong("writeLatency", 10000L);
    private static final int TEST_TIME = Integer.getInteger("testTime", 90);
    static final int NUMBER_OF_INTS = 18;

    @Ignore(value="long running")
    @Test
    public void stress() throws Exception {
        boolean allReadersComplete;
        int i;
        File path = Optional.ofNullable(System.getProperty("stress.test.dir")).map(s -> new File((String)s, UUID.randomUUID().toString())).orElse(DirectoryUtils.tempDir("rollCycleStress"));
        System.out.printf("Queue dir: %s at %s%n", path.getAbsolutePath(), Instant.now());
        int numThreads = Runtime.getRuntime().availableProcessors();
        int numWriters = numThreads / 4 + 1;
        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
        AtomicInteger wrote = new AtomicInteger();
        int expectedNumberOfMessages = (int)((double)TEST_TIME * 1.0E9 / (double)SLEEP_PER_WRITE_NANOS);
        System.out.printf("Running test with %d writers and %d readers, sleep %dns%n", numWriters, numThreads - numWriters, SLEEP_PER_WRITE_NANOS);
        System.out.printf("Writing %d messages with %dns interval%n", expectedNumberOfMessages, SLEEP_PER_WRITE_NANOS);
        System.out.printf("Should take ~%dsec%n", TimeUnit.NANOSECONDS.toSeconds((long)expectedNumberOfMessages * SLEEP_PER_WRITE_NANOS) / (long)numWriters);
        ArrayList<Future<Throwable>> results = new ArrayList<Future<Throwable>>();
        ArrayList<Reader> readers = new ArrayList<Reader>();
        ArrayList<Writer> writers = new ArrayList<Writer>();
        for (i = 0; i < numWriters; ++i) {
            Writer writer = new Writer(path, wrote, expectedNumberOfMessages);
            writers.add(writer);
            results.add(executorService.submit(writer));
        }
        for (i = 0; i < numThreads - numWriters; ++i) {
            Reader reader2 = new Reader(path, expectedNumberOfMessages);
            readers.add(reader2);
            results.add(executorService.submit(reader2));
        }
        long maxWritingTime = TimeUnit.SECONDS.toMillis(TEST_TIME);
        long giveUpWritingAt = System.currentTimeMillis() + maxWritingTime;
        int i2 = 0;
        while (System.currentTimeMillis() < giveUpWritingAt && wrote.get() < expectedNumberOfMessages) {
            String readersLastRead = readers.stream().map(reader -> Integer.toString(((Reader)reader).lastRead)).collect(Collectors.joining(","));
            System.out.printf("Writer has written %d of %d messages after %ds. Readers at %s. Waiting...%n", wrote.get() + 1, expectedNumberOfMessages, i2 * 10, readersLastRead);
            readers.stream().filter(r -> !r.isMakingProgress()).findAny().ifPresent(reader -> {
                if (((Reader)reader).exception != null) {
                    throw new AssertionError("Reader encountered exception, so stopped reading messages", ((Reader)reader).exception);
                }
                throw new AssertionError((Object)"Reader is stuck");
            });
            long waitUntil = System.currentTimeMillis() + 10000L;
            while (wrote.get() < expectedNumberOfMessages && System.currentTimeMillis() < waitUntil) {
                LockSupport.parkNanos(1L);
            }
            ++i2;
        }
        StringBuilder writerExceptions = new StringBuilder();
        writers.stream().filter(w -> ((Writer)w).exception != null).forEach(w -> writerExceptions.append("Writer failed due to: ").append(((Writer)w).exception.getMessage()).append("\n"));
        Assert.assertTrue((String)("Did not write " + expectedNumberOfMessages + " within timeout. " + writerExceptions), (wrote.get() >= expectedNumberOfMessages ? 1 : 0) != 0);
        readers.stream().filter(r -> ((Reader)r).exception != null).findAny().ifPresent(reader -> {
            throw new AssertionError("Reader encountered exception, so stopped reading messages", ((Reader)reader).exception);
        });
        long giveUpReadingAt = System.currentTimeMillis() + 60000L;
        long dumpThreadsAt = giveUpReadingAt - 15000L;
        while (System.currentTimeMillis() < giveUpReadingAt && !(allReadersComplete = this.areAllReadersComplete(expectedNumberOfMessages, readers))) {
            if (dumpThreadsAt < System.currentTimeMillis()) {
                Thread.getAllStackTraces().forEach((n, st) -> {
                    System.out.println("\n\n" + n + "\n\n");
                    Arrays.stream(st).forEach(System.out::println);
                });
            }
            System.out.printf("Not all readers are complete. Waiting...%n", new Object[0]);
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
        }
        Assert.assertTrue((String)"Readers did not catch up", (boolean)this.areAllReadersComplete(expectedNumberOfMessages, readers));
        executorService.shutdownNow();
        results.forEach(f -> {
            try {
                Throwable exception = (Throwable)f.get();
                if (exception != null) {
                    exception.printStackTrace();
                }
            }
            catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        System.out.println("Test complete");
        DirectoryUtils.deleteDir(path);
    }

    private boolean areAllReadersComplete(int expectedNumberOfMessages, List<Reader> readers) {
        boolean allReadersComplete = true;
        int count = 0;
        for (Reader reader : readers) {
            ++count;
            if (reader.lastRead >= expectedNumberOfMessages - 1) continue;
            allReadersComplete = false;
            System.out.printf("Reader #%d last read: %d%n", count, reader.lastRead);
        }
        return allReadersComplete;
    }

    private static final class Writer
    implements Callable<Throwable> {
        private final File path;
        private final AtomicInteger wrote;
        private final int expectedNumberOfMessages;
        private volatile Throwable exception;

        private Writer(File path, AtomicInteger wrote, int expectedNumberOfMessages) {
            this.path = path;
            this.wrote = wrote;
            this.expectedNumberOfMessages = expectedNumberOfMessages;
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public Throwable call() {
            try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary((File)this.path).testBlockSize().rollCycle((RollCycle)RollCycles.TEST_SECONDLY).build();){
                ExcerptAppender appender = queue.acquireAppender();
                long startTime = System.nanoTime();
                int loopIteration = 0;
                while (true) {
                    int value;
                    try (DocumentContext writingDocument = appender.writingDocument();){
                        long documentAcquireTimestamp = System.nanoTime();
                        value = this.wrote.getAndIncrement();
                        ValueOut valueOut = writingDocument.wire().getValueOut();
                        valueOut.int64(documentAcquireTimestamp);
                        for (int i = 0; i < 18; ++i) {
                            valueOut.int32(value);
                        }
                        writingDocument.wire().padToCacheAlign();
                    }
                    while (System.nanoTime() < startTime + (long)loopIteration * SLEEP_PER_WRITE_NANOS) {
                    }
                    ++loopIteration;
                    if (value >= this.expectedNumberOfMessages) {
                        Throwable throwable = null;
                        return throwable;
                    }
                    continue;
                    break;
                }
            }
            catch (Exception e) {
                this.exception = e;
                return e;
            }
        }
    }

    private static final class Reader
    implements Callable<Throwable> {
        private final File path;
        private final int expectedNumberOfMessages;
        private volatile int lastRead = -1;
        private volatile Throwable exception;
        private int readSequenceAtLastProgressCheck = -1;

        Reader(File path, int expectedNumberOfMessages) {
            this.path = path;
            this.expectedNumberOfMessages = expectedNumberOfMessages;
        }

        boolean isMakingProgress() {
            if (this.readSequenceAtLastProgressCheck == -1) {
                return true;
            }
            boolean makingProgress = this.lastRead > this.readSequenceAtLastProgressCheck;
            this.readSequenceAtLastProgressCheck = this.lastRead;
            return makingProgress;
        }

        @Override
        public Throwable call() {
            try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary((File)this.path).testBlockSize().rollCycle((RollCycle)RollCycles.TEST_SECONDLY).build();){
                ExcerptTailer tailer = queue.createTailer();
                int lastTailerCycle = -1;
                int lastQueueCycle = -1;
                while (this.lastRead != this.expectedNumberOfMessages - 1) {
                    DocumentContext rd = tailer.readingDocument();
                    Throwable throwable = null;
                    try {
                        if (!rd.isPresent()) continue;
                        int v = -1;
                        ValueIn valueIn = rd.wire().getValueIn();
                        long documentAcquireTimestamp = valueIn.int64();
                        if (documentAcquireTimestamp == 0L) {
                            throw new AssertionError((Object)"No timestamp");
                        }
                        for (int i = 0; i < 18; ++i) {
                            v = valueIn.int32();
                            if (this.lastRead + 1 == v) continue;
                            System.out.println(rd.wire());
                            String failureMessage = "Expected: " + (this.lastRead + 1) + ", actual: " + v + ", pos: " + i + ", index: " + rd.index() + ", cycle: " + tailer.cycle();
                            if (lastTailerCycle != -1) {
                                failureMessage = failureMessage + ". Tailer cycle at last read: " + lastTailerCycle + " (current: " + tailer.cycle() + "), queue cycle at last read: " + lastQueueCycle + " (current: " + queue.cycle() + ")";
                            }
                            throw new AssertionError((Object)failureMessage);
                        }
                        this.lastRead = v;
                        lastTailerCycle = tailer.cycle();
                        lastQueueCycle = queue.cycle();
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (rd == null) continue;
                        if (throwable != null) {
                            try {
                                rd.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        rd.close();
                    }
                }
            }
            catch (Throwable e) {
                e.printStackTrace();
                this.exception = e;
                return e;
            }
            return null;
        }
    }
}

