/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.queue.impl.single;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.io.AbstractReferenceCounted;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.io.IOTools;
import net.openhft.chronicle.core.onoes.ExceptionKey;
import net.openhft.chronicle.core.onoes.LogLevel;
import net.openhft.chronicle.core.threads.ThreadDump;
import net.openhft.chronicle.core.time.SetTimeProvider;
import net.openhft.chronicle.core.time.TimeProvider;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.DirectoryUtils;
import net.openhft.chronicle.queue.DumpQueueMain;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.QueueTestCommon;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.ValueOut;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RollCycleMultiThreadStressTest
extends QueueTestCommon {
    final long SLEEP_PER_WRITE_NANOS;
    final int TEST_TIME;
    final int ROLL_EVERY_MS;
    final int DELAY_READER_RANDOM_MS;
    final int DELAY_WRITER_RANDOM_MS;
    final int WRITE_ONE_THEN_WAIT_MS;
    final int CORES;
    final Random random;
    final int NUMBER_OF_INTS;
    final boolean PRETOUCH;
    final boolean READERS_READ_ONLY;
    final boolean DUMP_QUEUE;
    boolean SHARED_WRITE_QUEUE;
    private ThreadDump threadDump;
    private Map<ExceptionKey, Integer> exceptionKeyIntegerMap;
    final Logger LOG = LoggerFactory.getLogger(this.getClass());
    final SetTimeProvider timeProvider = new SetTimeProvider();
    private ChronicleQueue sharedWriterQueue;
    @Rule
    public RepeatRule repeatRule = new RepeatRule();

    public RollCycleMultiThreadStressTest() {
        this.SLEEP_PER_WRITE_NANOS = Long.getLong("writeLatency", 30000L);
        this.TEST_TIME = Integer.getInteger("testTime", 2);
        this.ROLL_EVERY_MS = Integer.getInteger("rollEvery", 300);
        this.DELAY_READER_RANDOM_MS = Integer.getInteger("delayReader", 1);
        this.DELAY_WRITER_RANDOM_MS = Integer.getInteger("delayWriter", 1);
        this.WRITE_ONE_THEN_WAIT_MS = Integer.getInteger("writeOneThenWait", 0);
        this.CORES = Integer.getInteger("cores", Runtime.getRuntime().availableProcessors());
        this.random = new Random(99L);
        this.NUMBER_OF_INTS = Integer.getInteger("numberInts", 18);
        this.PRETOUCH = Jvm.getBoolean((String)"pretouch");
        this.READERS_READ_ONLY = Jvm.getBoolean((String)"read_only");
        this.DUMP_QUEUE = Jvm.getBoolean((String)"dump_queue");
        this.SHARED_WRITE_QUEUE = Jvm.getBoolean((String)"sharedWriteQ");
        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
        System.setProperty("org.slf4j.simpleLogger.dateTimeFormat", "HH:mm:ss.SSS");
        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "WARN");
    }

    static boolean areAllReadersComplete(int expectedNumberOfMessages, List<Reader> readers) {
        boolean allReadersComplete = true;
        int count = 0;
        for (Reader reader : readers) {
            ++count;
            if (reader.lastRead >= expectedNumberOfMessages - 1) continue;
            allReadersComplete = false;
            System.out.printf("Reader #%d last read: %d%n", count, reader.lastRead);
        }
        return allReadersComplete;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void stress() throws InterruptedException, IOException {
        long now;
        int i;
        File file = IOTools.createTempDirectory((String)"queue").toFile();
        System.out.printf("Queue dir: %s at %s%n", file.getAbsolutePath(), Instant.now());
        int numThreads = this.CORES;
        int numWriters = numThreads / 4 + 1;
        ExecutorService executorServicePretouch = Executors.newSingleThreadExecutor((ThreadFactory)new NamedThreadFactory("pretouch"));
        ExecutorService executorServiceWrite = Executors.newFixedThreadPool(numWriters, (ThreadFactory)new NamedThreadFactory("writer"));
        ExecutorService executorServiceRead = Executors.newFixedThreadPool(numThreads - numWriters, (ThreadFactory)new NamedThreadFactory("reader"));
        AtomicInteger wrote = new AtomicInteger();
        int expectedNumberOfMessages = (int)((double)this.TEST_TIME * 1.0E9 / (double)this.SLEEP_PER_WRITE_NANOS) * Math.max(1, numWriters / 2);
        System.out.printf("Running test with %d writers and %d readers, sleep %dns%n", numWriters, numThreads - numWriters, this.SLEEP_PER_WRITE_NANOS);
        System.out.printf("Writing %d messages with %dns interval%n", expectedNumberOfMessages, this.SLEEP_PER_WRITE_NANOS);
        System.out.printf("Should take ~%dms%n", TimeUnit.NANOSECONDS.toMillis((long)expectedNumberOfMessages * this.SLEEP_PER_WRITE_NANOS) / (long)(numWriters / 2));
        ArrayList<Future<Throwable>> results = new ArrayList<Future<Throwable>>();
        ArrayList<Reader> readers = new ArrayList<Reader>();
        ArrayList<Writer> writers = new ArrayList<Writer>();
        if (this.READERS_READ_ONLY) {
            this.createQueue(file);
        }
        if (this.SHARED_WRITE_QUEUE) {
            this.sharedWriterQueue = this.createQueue(file);
        }
        PretoucherThread pretoucherThread = null;
        if (this.PRETOUCH) {
            pretoucherThread = new PretoucherThread(file);
            executorServicePretouch.submit(pretoucherThread);
        }
        if (this.WRITE_ONE_THEN_WAIT_MS > 0) {
            Writer tempWriter = new Writer(file, wrote, expectedNumberOfMessages);
            try (ChronicleQueue queue = this.writerQueue(file);){
                tempWriter.write(queue.acquireAppender());
            }
        }
        for (i = 0; i < numThreads - numWriters; ++i) {
            Reader reader2 = new Reader(file, expectedNumberOfMessages);
            readers.add(reader2);
            results.add(executorServiceRead.submit(reader2));
        }
        if (this.WRITE_ONE_THEN_WAIT_MS > 0) {
            this.LOG.warn("Wrote one now waiting for {}ms", (Object)this.WRITE_ONE_THEN_WAIT_MS);
            Jvm.pause((long)this.WRITE_ONE_THEN_WAIT_MS);
        }
        for (i = 0; i < numWriters; ++i) {
            Writer writer = new Writer(file, wrote, expectedNumberOfMessages);
            writers.add(writer);
            results.add(executorServiceWrite.submit(writer));
        }
        long maxWritingTime = TimeUnit.SECONDS.toMillis(this.TEST_TIME + 5) + this.queueBuilder(file).timeoutMS();
        long startTime = System.currentTimeMillis();
        long giveUpWritingAt = startTime + maxWritingTime;
        long nextRollTime = System.currentTimeMillis() + (long)this.ROLL_EVERY_MS;
        long nextCheckTime = System.currentTimeMillis() + 5000L;
        int i2 = 0;
        while ((now = System.currentTimeMillis()) < giveUpWritingAt && wrote.get() < expectedNumberOfMessages) {
            if (now > nextRollTime) {
                this.timeProvider.advanceMillis(1000L);
                nextRollTime += (long)this.ROLL_EVERY_MS;
            }
            if (now > nextCheckTime) {
                String readersLastRead = readers.stream().map(reader -> Integer.toString(reader.lastRead)).collect(Collectors.joining(","));
                System.out.printf("Writer has written %d of %d messages after %dms. Readers at %s. Waiting...%n", wrote.get() + 1, expectedNumberOfMessages, i2 * 10, readersLastRead);
                readers.stream().filter(r -> !r.isMakingProgress()).findAny().ifPresent(reader -> {
                    if (reader.exception != null) {
                        throw new AssertionError("Reader encountered exception, so stopped reading messages", reader.exception);
                    }
                    throw new AssertionError((Object)"Reader is stuck");
                });
                if (pretoucherThread != null && pretoucherThread.exception != null) {
                    throw new AssertionError("Preloader encountered exception", pretoucherThread.exception);
                }
                nextCheckTime = System.currentTimeMillis() + 10000L;
            }
            ++i2;
            Jvm.pause((long)5L);
        }
        double timeToWriteSecs = (double)(System.currentTimeMillis() - startTime) / 1000.0;
        StringBuilder writerExceptions = new StringBuilder();
        writers.stream().filter(w -> w.exception != null).forEach(w -> writerExceptions.append("Writer failed due to: ").append(w.exception.getMessage()).append("\n"));
        Assert.assertTrue((String)("Wrote " + wrote.get() + " which is less than " + expectedNumberOfMessages + " within timeout. " + writerExceptions), (wrote.get() >= expectedNumberOfMessages ? 1 : 0) != 0);
        readers.stream().filter(r -> r.exception != null).findAny().ifPresent(reader -> {
            throw new AssertionError("Reader encountered exception, so stopped reading messages", reader.exception);
        });
        System.out.println(String.format("All messages written in %,.0fsecs at rate of %,.0f/sec %,.0f/sec per writer (actual writeLatency %,.0fns)", timeToWriteSecs, (double)expectedNumberOfMessages / timeToWriteSecs, (double)expectedNumberOfMessages / timeToWriteSecs / (double)numWriters, 1.0E9 / ((double)expectedNumberOfMessages / timeToWriteSecs / (double)numWriters)));
        long giveUpReadingAt = System.currentTimeMillis() + 20000L;
        long dumpThreadsAt = giveUpReadingAt - 5000L;
        try {
            while (System.currentTimeMillis() < giveUpReadingAt) {
                results.forEach(f -> {
                    try {
                        Throwable exception;
                        if (f.isDone() && (exception = (Throwable)f.get()) != null) {
                            throw Jvm.rethrow((Throwable)exception);
                        }
                    }
                    catch (InterruptedException exception) {
                    }
                    catch (ExecutionException e) {
                        throw Jvm.rethrow((Throwable)e);
                    }
                });
                boolean allReadersComplete = RollCycleMultiThreadStressTest.areAllReadersComplete(expectedNumberOfMessages, readers);
                if (allReadersComplete) break;
                System.out.printf("Not all readers are complete. Waiting...%n", new Object[0]);
                Jvm.pause((long)2000L);
            }
            Assert.assertTrue((String)"Readers did not catch up", (boolean)RollCycleMultiThreadStressTest.areAllReadersComplete(expectedNumberOfMessages, readers));
        }
        finally {
            executorServiceRead.shutdown();
            executorServiceWrite.shutdown();
            executorServicePretouch.shutdown();
            if (!executorServiceRead.awaitTermination(1L, TimeUnit.SECONDS)) {
                executorServiceRead.shutdownNow();
            }
            if (!executorServiceWrite.awaitTermination(1L, TimeUnit.SECONDS)) {
                executorServiceWrite.shutdownNow();
            }
            if (!executorServicePretouch.awaitTermination(1L, TimeUnit.SECONDS)) {
                executorServicePretouch.shutdownNow();
            }
            Closeable.closeQuietly((Object)this.sharedWriterQueue);
            results.forEach(f -> {
                try {
                    Throwable exception = (Throwable)f.get(100L, TimeUnit.MILLISECONDS);
                    if (exception != null) {
                        exception.printStackTrace();
                    }
                }
                catch (InterruptedException | TimeoutException exception) {
                }
                catch (ExecutionException e) {
                    throw Jvm.rethrow((Throwable)e);
                }
            });
            DirectoryUtils.deleteDir(file);
        }
        System.out.println("Test complete");
    }

    @NotNull
    SingleChronicleQueueBuilder queueBuilder(File path) {
        return SingleChronicleQueueBuilder.binary((File)path).testBlockSize().timeProvider((TimeProvider)this.timeProvider).rollCycle((RollCycle)RollCycles.TEST_SECONDLY);
    }

    @NotNull
    private ChronicleQueue createQueue(File path) {
        return this.queueBuilder(path).build();
    }

    @NotNull
    private ChronicleQueue writerQueue(File path) {
        return this.sharedWriterQueue != null ? this.sharedWriterQueue : this.createQueue(path);
    }

    @Before
    public void multiCPU() {
        Assume.assumeTrue((Runtime.getRuntime().availableProcessors() > 1 ? 1 : 0) != 0);
    }

    @Before
    public void before() {
        this.threadDump = new ThreadDump();
        this.exceptionKeyIntegerMap = Jvm.recordExceptions();
    }

    @After
    public void after() {
        this.threadDump.assertNoNewThreads();
        this.exceptionKeyIntegerMap.entrySet().removeIf(entry -> ((ExceptionKey)entry.getKey()).level.equals((Object)LogLevel.WARN));
        if (Jvm.hasException(this.exceptionKeyIntegerMap)) {
            Jvm.dumpException(this.exceptionKeyIntegerMap);
            Assert.fail();
        }
        Jvm.resetExceptionHandlers();
        AbstractReferenceCounted.assertReferencesReleased();
    }

    static /* synthetic */ ChronicleQueue access$200(RollCycleMultiThreadStressTest x0, File x1) {
        return x0.writerQueue(x1);
    }

    static /* synthetic */ ChronicleQueue access$300(RollCycleMultiThreadStressTest x0) {
        return x0.sharedWriterQueue;
    }

    static {
        Jvm.disableDebugHandler();
    }

    class PretoucherThread
    implements Callable<Throwable> {
        final File path;
        volatile Throwable exception;

        PretoucherThread(File path) {
            this.path = path;
        }

        @Override
        public Throwable call() {
            SingleChronicleQueue queue0 = null;
            try (SingleChronicleQueue queue = RollCycleMultiThreadStressTest.this.queueBuilder(this.path).build();){
                queue0 = queue;
                ExcerptAppender appender = queue.acquireAppender();
                System.out.println("Starting pretoucher");
                while (!Thread.currentThread().isInterrupted() && !queue.isClosed()) {
                    Jvm.pause((long)50L);
                    appender.pretouch();
                }
            }
            catch (Throwable e) {
                if (queue0 != null && queue0.isClosed()) {
                    return null;
                }
                this.exception = e;
                return e;
            }
            return null;
        }
    }

    final class Writer
    implements Callable<Throwable> {
        final File path;
        final AtomicInteger wrote;
        final int expectedNumberOfMessages;
        volatile Throwable exception;

        Writer(File path, AtomicInteger wrote, int expectedNumberOfMessages) {
            this.path = path;
            this.wrote = wrote;
            this.expectedNumberOfMessages = expectedNumberOfMessages;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
         * Unable to fully structure code
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public Throwable call() {
            queue = RollCycleMultiThreadStressTest.access$200(RollCycleMultiThreadStressTest.this, this.path);
            try {
                appender = queue.acquireAppender();
                var3_4 = null;
                Jvm.pause((long)RollCycleMultiThreadStressTest.this.random.nextInt(RollCycleMultiThreadStressTest.this.DELAY_WRITER_RANDOM_MS));
                startTime = System.nanoTime();
                loopIteration = 0;
lbl9:
                // 2 sources

                while (true) {
                    value = this.write(appender);
                    while (System.nanoTime() < startTime + (long)loopIteration * RollCycleMultiThreadStressTest.this.SLEEP_PER_WRITE_NANOS) {
                    }
                    ++loopIteration;
                    if (value < this.expectedNumberOfMessages) ** GOTO lbl42
                    var8_9 = null;
                    if (appender == null) return var8_9;
                    if (var3_4 != null) {
                    }
                    ** GOTO lbl40
                    break;
                }
                {
                    catch (Throwable var4_6) {
                        var3_4 = var4_6;
                        throw var4_6;
                    }
                    catch (Throwable var10_11) {
                        if (appender == null) throw var10_11;
                        if (var3_4 == null) {
                            appender.close();
                            throw var10_11;
                        }
                        try {
                            appender.close();
                            throw var10_11;
                        }
                        catch (Throwable var11_12) {
                            var3_4.addSuppressed(var11_12);
                            throw var10_11;
                        }
                    }
                    try {
                        appender.close();
                        return var8_9;
                    }
                    catch (Throwable var9_10) {
                        var3_4.addSuppressed(var9_10);
                        return var8_9;
                    }
lbl40:
                    // 1 sources

                    appender.close();
                    return var8_9;
lbl42:
                    // 1 sources

                    ** try [egrp 4[TRYBLOCK] [7 : 150->200)] { 
lbl-1000:
                    // 1 sources

                    {
                        ** continue;
                    }
                }
            }
lbl44:
            // 2 sources

            catch (Throwable e) {
                this.exception = e;
                var3_4 = e;
                return var3_4;
            }
            finally {
                if (queue != RollCycleMultiThreadStressTest.access$300(RollCycleMultiThreadStressTest.this)) {
                    queue.close();
                }
            }
        }

        private int write(ExcerptAppender appender) {
            int value;
            try (DocumentContext writingDocument = appender.writingDocument();){
                long documentAcquireTimestamp = System.nanoTime();
                value = this.wrote.getAndIncrement();
                ValueOut valueOut = writingDocument.wire().getValueOut();
                valueOut.int64(documentAcquireTimestamp);
                for (int i = 0; i < RollCycleMultiThreadStressTest.this.NUMBER_OF_INTS; ++i) {
                    valueOut.int32(value);
                }
                writingDocument.wire().padToCacheAlign();
            }
            return value;
        }
    }

    final class Reader
    implements Callable<Throwable> {
        final File path;
        final int expectedNumberOfMessages;
        volatile int lastRead = -1;
        volatile Throwable exception;
        int readSequenceAtLastProgressCheck = -1;

        Reader(File path, int expectedNumberOfMessages) {
            this.path = path;
            this.expectedNumberOfMessages = expectedNumberOfMessages;
        }

        boolean isMakingProgress() {
            if (this.readSequenceAtLastProgressCheck == -1) {
                return true;
            }
            boolean makingProgress = this.lastRead > this.readSequenceAtLastProgressCheck;
            this.readSequenceAtLastProgressCheck = this.lastRead;
            return makingProgress;
        }

        @Override
        public Throwable call() {
            SingleChronicleQueueBuilder builder = RollCycleMultiThreadStressTest.this.queueBuilder(this.path);
            if (RollCycleMultiThreadStressTest.this.READERS_READ_ONLY) {
                builder.readOnly(true);
            }
            long last = System.currentTimeMillis();
            try (SingleChronicleQueue queue = builder.build();
                 ExcerptTailer tailer = queue.createTailer();){
                int lastTailerCycle = -1;
                int lastQueueCycle = -1;
                Jvm.pause((long)RollCycleMultiThreadStressTest.this.random.nextInt(RollCycleMultiThreadStressTest.this.DELAY_READER_RANDOM_MS));
                while (this.lastRead != this.expectedNumberOfMessages - 1) {
                    DocumentContext dc = tailer.readingDocument();
                    Throwable throwable = null;
                    try {
                        if (!dc.isPresent()) {
                            long now = System.currentTimeMillis();
                            if (now <= last + 2000L) continue;
                            if (this.lastRead < 0) {
                                throw new AssertionError((Object)"read nothing after 2 seconds");
                            }
                            System.out.println(Thread.currentThread() + " - Last read: " + this.lastRead);
                            last = now;
                            continue;
                        }
                        int v = -1;
                        ValueIn valueIn = dc.wire().getValueIn();
                        long documentAcquireTimestamp = valueIn.int64();
                        if (documentAcquireTimestamp == 0L) {
                            throw new AssertionError((Object)"No timestamp");
                        }
                        for (int i = 0; i < RollCycleMultiThreadStressTest.this.NUMBER_OF_INTS; ++i) {
                            v = valueIn.int32();
                            if (this.lastRead + 1 == v) continue;
                            String failureMessage = "Expected: " + (this.lastRead + 1) + ", actual: " + v + ", pos: " + i + ", index: " + Long.toHexString(dc.index()) + ", cycle: " + tailer.cycle();
                            if (lastTailerCycle != -1) {
                                failureMessage = failureMessage + ". Tailer cycle at last read: " + lastTailerCycle + " (current: " + tailer.cycle() + "), queue cycle at last read: " + lastQueueCycle + " (current: " + queue.cycle() + ")";
                            }
                            if (RollCycleMultiThreadStressTest.this.DUMP_QUEUE) {
                                DumpQueueMain.dump((File)queue.file(), (PrintStream)System.out, (long)Long.MAX_VALUE);
                            }
                            throw new AssertionError((Object)failureMessage);
                        }
                        this.lastRead = v;
                        lastTailerCycle = tailer.cycle();
                        lastQueueCycle = queue.cycle();
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (dc == null) continue;
                        if (throwable != null) {
                            try {
                                dc.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        dc.close();
                    }
                }
            }
            catch (Throwable e) {
                this.exception = e;
                return e;
            }
            return null;
        }
    }

    public static class RepeatRule
    implements TestRule {
        public Statement apply(Statement statement, Description description) {
            Statement result = statement;
            Repeat repeat = (Repeat)description.getAnnotation(Repeat.class);
            if (repeat != null) {
                int times = repeat.times();
                result = new RepeatStatement(times, statement);
            }
            return result;
        }

        private static class RepeatStatement
        extends Statement {
            private final int times;
            private final Statement statement;

            private RepeatStatement(int times, Statement statement) {
                this.times = times;
                this.statement = statement;
            }

            public void evaluate() throws Throwable {
                for (int i = 0; i < this.times; ++i) {
                    this.statement.evaluate();
                }
            }
        }

        @Retention(value=RetentionPolicy.RUNTIME)
        @Target(value={ElementType.METHOD})
        public static @interface Repeat {
            public int times();
        }
    }
}

