/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.queue.impl.single;

import java.io.File;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.queue.DirectoryUtils;
import net.openhft.chronicle.queue.impl.single.RollCycleMultiThreadStressTest;
import net.openhft.chronicle.threads.NamedThreadFactory;
import org.junit.Assert;
import org.junit.Test;

public class PretoucherStressTest
extends RollCycleMultiThreadStressTest {
    @Override
    @Test
    public void stress() {
        boolean allReadersComplete;
        long now;
        File path = Optional.ofNullable(System.getProperty("stress.test.dir")).map(s -> new File((String)s, UUID.randomUUID().toString())).orElse(DirectoryUtils.tempDir("pretouchStress"));
        System.out.printf("Queue dir: %s at %s%n", path.getAbsolutePath(), Instant.now());
        ExecutorService executorService = Executors.newFixedThreadPool(3, (ThreadFactory)new NamedThreadFactory("pretouch"));
        AtomicInteger wrote = new AtomicInteger();
        int expectedNumberOfMessages = (int)((double)this.TEST_TIME * 1.0E9 / (double)this.SLEEP_PER_WRITE_NANOS);
        System.out.printf("Writing %d messages with %dns interval%n", expectedNumberOfMessages, this.SLEEP_PER_WRITE_NANOS);
        System.out.printf("Should take ~%dsec%n", TimeUnit.NANOSECONDS.toSeconds((long)expectedNumberOfMessages * this.SLEEP_PER_WRITE_NANOS));
        ArrayList<Future<Throwable>> results = new ArrayList<Future<Throwable>>();
        ArrayList<RollCycleMultiThreadStressTest.Reader> readers = new ArrayList<RollCycleMultiThreadStressTest.Reader>();
        ArrayList<RollCycleMultiThreadStressTest.Writer> writers = new ArrayList<RollCycleMultiThreadStressTest.Writer>();
        RollCycleMultiThreadStressTest.PretoucherThread pretoucherThread = new RollCycleMultiThreadStressTest.PretoucherThread(path);
        executorService.submit(pretoucherThread);
        RollCycleMultiThreadStressTest.Reader reader2 = new RollCycleMultiThreadStressTest.Reader(path, expectedNumberOfMessages);
        readers.add(reader2);
        results.add(executorService.submit(reader2));
        RollCycleMultiThreadStressTest.Writer writer = new RollCycleMultiThreadStressTest.Writer(path, wrote, expectedNumberOfMessages);
        writers.add(writer);
        results.add(executorService.submit(writer));
        long maxWritingTime = TimeUnit.SECONDS.toMillis(this.TEST_TIME + 1) + this.queueBuilder(path).timeoutMS();
        long startTime = System.currentTimeMillis();
        long giveUpWritingAt = startTime + maxWritingTime;
        long nextRollTime = System.currentTimeMillis() + (long)this.ROLL_EVERY_MS;
        long nextCheckTime = System.currentTimeMillis() + 5000L;
        int i = 0;
        while ((now = System.currentTimeMillis()) < giveUpWritingAt && wrote.get() < expectedNumberOfMessages) {
            if (now > nextRollTime) {
                this.timeProvider.advanceMillis(1000L);
                nextRollTime += (long)this.ROLL_EVERY_MS;
            }
            if (now > nextCheckTime) {
                String readersLastRead = readers.stream().map(reader -> Integer.toString(reader.lastRead)).collect(Collectors.joining(","));
                System.out.printf("Writer has written %d of %d messages after %dms. Readers at %s. Waiting...%n", wrote.get() + 1, expectedNumberOfMessages, i * 10, readersLastRead);
                readers.stream().filter(r -> !r.isMakingProgress()).findAny().ifPresent(reader -> {
                    if (reader.exception != null) {
                        throw new AssertionError("Reader encountered exception, so stopped reading messages", reader.exception);
                    }
                    throw new AssertionError((Object)"Reader is stuck");
                });
                if (pretoucherThread.exception != null) {
                    throw new AssertionError("Preloader encountered exception", pretoucherThread.exception);
                }
                nextCheckTime = System.currentTimeMillis() + 10000L;
            }
            ++i;
            Jvm.pause((long)5L);
        }
        double timeToWriteSecs = (double)(System.currentTimeMillis() - startTime) / 1000.0;
        StringBuilder writerExceptions = new StringBuilder();
        writers.stream().filter(w -> w.exception != null).forEach(w -> writerExceptions.append("Writer failed due to: ").append(w.exception.getMessage()).append("\n"));
        Assert.assertTrue((String)("Wrote " + wrote.get() + " which is less than " + expectedNumberOfMessages + " within timeout. " + writerExceptions), (wrote.get() >= expectedNumberOfMessages ? 1 : 0) != 0);
        readers.stream().filter(r -> r.exception != null).findAny().ifPresent(reader -> {
            throw new AssertionError("Reader encountered exception, so stopped reading messages", reader.exception);
        });
        System.out.println(String.format("All messages written in %,.0fsecs at rate of %,.0f/sec %,.0f/sec per writer (actual writeLatency %,.0fns)", timeToWriteSecs, (double)expectedNumberOfMessages / timeToWriteSecs, (double)expectedNumberOfMessages / timeToWriteSecs, 1.0E9 / ((double)expectedNumberOfMessages / timeToWriteSecs)));
        long giveUpReadingAt = System.currentTimeMillis() + 60000L;
        long dumpThreadsAt = giveUpReadingAt - 15000L;
        while (System.currentTimeMillis() < giveUpReadingAt && !(allReadersComplete = PretoucherStressTest.areAllReadersComplete(expectedNumberOfMessages, readers))) {
            if (dumpThreadsAt < System.currentTimeMillis()) {
                Thread.getAllStackTraces().forEach((n, st) -> {
                    System.out.println("\n\n" + n + "\n\n");
                    Arrays.stream(st).forEach(System.out::println);
                });
            }
            System.out.printf("Not all readers are complete. Waiting...%n", new Object[0]);
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
        }
        Assert.assertTrue((String)"Readers did not catch up", (boolean)PretoucherStressTest.areAllReadersComplete(expectedNumberOfMessages, readers));
        executorService.shutdownNow();
        results.forEach(f -> {
            try {
                Throwable exception = (Throwable)f.get();
                if (exception != null) {
                    exception.printStackTrace();
                }
            }
            catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        System.out.println("Test complete");
        DirectoryUtils.deleteDir(path);
    }
}

