/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.queue.channel;

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Stream;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.queue.channel.DummyData;
import net.openhft.chronicle.queue.channel.DummyDataSmall;
import net.openhft.chronicle.queue.channel.Echoing;
import net.openhft.chronicle.queue.channel.EchoingSmall;
import net.openhft.chronicle.queue.channel.PipeHandler;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.channel.ChannelHandler;
import net.openhft.chronicle.wire.channel.ChronicleChannelSupplier;
import net.openhft.chronicle.wire.channel.ChronicleContext;
import net.openhft.chronicle.wire.channel.InternalChronicleChannel;

public class PerfThroughputMain {
    static final String URL = System.getProperty("url", "tcp://:1248");
    static final int RUN_TIME = Integer.getInteger("runTime", 5);
    static final int CLIENTS = Integer.getInteger("clients", 2);
    static final boolean METHODS = Jvm.getBoolean((String)"methods");
    static final String PATH = System.getProperty("path", ".");

    public static void main(String[] args) {
        System.out.println("-Durl=" + URL + " -Dpath=" + PATH + " -DrunTime=" + RUN_TIME + " -Dclients=" + CLIENTS + " -Dmethods=" + METHODS);
        PerfThroughputMain.doTest("buffered", true);
        PerfThroughputMain.doTest("unbuffered", false);
    }

    private static void doTest(String desc, boolean buffered) {
        String prefix = PATH + "/q" + Long.toString(System.nanoTime(), 36) + "-";
        try (ChronicleContext context = ChronicleContext.newContext((String)URL);){
            InternalChronicleChannel[] clients = new InternalChronicleChannel[CLIENTS];
            for (int i = 0; i < CLIENTS; ++i) {
                PipeHandler handler = (PipeHandler)new PipeHandler().publish(prefix + i).subscribe(prefix + i).buffered(Boolean.valueOf(buffered));
                ChronicleChannelSupplier supplier = context.newChannelSupplier((ChannelHandler)handler);
                clients[i] = (InternalChronicleChannel)supplier.get();
            }
            for (int size = 131072; size >= 8; size /= 2) {
                Consumer<InternalChronicleChannel> sendAndReceive;
                long start = System.currentTimeMillis();
                long end = start + (long)RUN_TIME * 1000L;
                int window = 0x400000 / size / CLIENTS;
                AtomicLong totalRead = new AtomicLong(0L);
                int finalSize = size;
                if (METHODS) {
                    if (size < 256) {
                        DummyDataSmall dd = new DummyDataSmall();
                        dd.data(new byte[size - 8]);
                        sendAndReceive = icc -> {
                            int written = 0;
                            int read = 0;
                            EchoingSmall echoing = (EchoingSmall)icc.methodWriter(EchoingSmall.class, new Class[0]);
                            do {
                                echoing.echo(dd);
                                read = PerfThroughputMain.readUpto(window, icc, ++written, read);
                            } while (System.currentTimeMillis() < end);
                            PerfThroughputMain.readUpto(0, icc, written, read);
                            totalRead.addAndGet(read);
                        };
                    } else {
                        DummyData dd = new DummyData();
                        dd.data(new byte[size - 8]);
                        sendAndReceive = icc -> {
                            int written = 0;
                            int read = 0;
                            Echoing echoing = (Echoing)icc.methodWriter(Echoing.class, new Class[0]);
                            do {
                                echoing.echo(dd);
                                read = PerfThroughputMain.readUpto(window, icc, ++written, read);
                            } while (System.currentTimeMillis() < end);
                            PerfThroughputMain.readUpto(0, icc, written, read);
                            totalRead.addAndGet(read);
                        };
                    }
                } else {
                    sendAndReceive = icc -> {
                        int written = 0;
                        int read = 0;
                        do {
                            Bytes bytes = icc.acquireProducer().bytes();
                            bytes.writeInt(finalSize);
                            for (int i = 0; i < finalSize; i += 8) {
                                bytes.writeLong(0L);
                            }
                            icc.releaseProducer();
                            read = PerfThroughputMain.readUpto(window, icc, ++written, read);
                        } while (System.currentTimeMillis() < end);
                        PerfThroughputMain.readUpto(0, icc, written, read);
                        totalRead.addAndGet(read);
                    };
                }
                ((Stream)Stream.of(clients).parallel()).forEach(sendAndReceive);
                long count = totalRead.get();
                long time = System.currentTimeMillis() - start;
                long totalBytes = (long)size * count;
                long MBps = totalBytes / time / 1000L;
                long rate = count * 1000L / time;
                System.out.printf("desc: %s, size: %,d, MBps: %,d, mps: %,d%n", desc, size, MBps, rate);
            }
        }
    }

    private static int readUpto(int window, InternalChronicleChannel icc, int written, int read) {
        do {
            try (DocumentContext dc = icc.readingDocument();){
                if (dc.isPresent()) {
                    ++read;
                }
            }
            Jvm.nanoPause();
        } while (written - read > window);
        return read;
    }
}

