package org.reaktivity.nukleus.sse.internal.bench;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.collections.MutableInteger;
import org.agrona.concurrent.AtomicBuffer;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.ringbuffer.OneToOneRingBuffer;
import org.agrona.concurrent.ringbuffer.RingBufferDescriptor;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Group;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Control;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.reaktivity.nukleus.Configuration;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.function.MessageFunction;
import org.reaktivity.nukleus.function.MessagePredicate;
import org.reaktivity.nukleus.route.RouteManager;
import org.reaktivity.nukleus.sse.internal.stream.ServerStreamFactoryBuilder;
import org.reaktivity.nukleus.sse.internal.types.control.Role;
import org.reaktivity.nukleus.sse.internal.types.control.RouteFW;
import org.reaktivity.nukleus.sse.internal.types.control.SseRouteExFW;
import org.reaktivity.nukleus.sse.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.sse.internal.types.stream.DataFW;
import org.reaktivity.nukleus.sse.internal.types.stream.FrameFW;
import org.reaktivity.nukleus.sse.internal.types.stream.HttpBeginExFW;
import org.reaktivity.nukleus.sse.internal.types.stream.WindowFW;
import org.reaktivity.nukleus.stream.StreamFactory;
import org.reaktivity.reaktor.internal.buffer.DefaultBufferPool;
import org.reaktivity.reaktor.internal.router.ReferenceKind;

@Warmup(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
@State(Scope.Benchmark)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(3)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode({Mode.Throughput})
/* loaded from: input_file:org/reaktivity/nukleus/sse/internal/bench/ServerStreamBM.class */
public class ServerStreamBM {
    private DataFW dataRO;
    private Long2ObjectHashMap<MessageConsumer> streams;
    private MessageConsumer stream;
    private OneToOneRingBuffer source;
    private OneToOneRingBuffer nukleus;
    private OneToOneRingBuffer target;
    private final MessageHandler readHandler = (v1, v2, v3, v4) -> {
        handleRead(v1, v2, v3, v4);
    };
    private final FrameFW frameRO = new FrameFW();

    /* loaded from: input_file:org/reaktivity/nukleus/sse/internal/bench/ServerStreamBM$Router.class */
    private static final class Router implements RouteManager {
        private MutableDirectBuffer writeBuffer;
        private AtomicLong routeRefs;
        private SseRouteExFW.Builder sseRouteExRW;
        private Map<String, MessageConsumer> throttles;
        private Map<String, MessageConsumer> targets;

        private Router() {
            this.writeBuffer = new UnsafeBuffer(new byte[1024]);
            this.routeRefs = new AtomicLong(0L);
            this.sseRouteExRW = new SseRouteExFW.Builder();
            this.throttles = new HashMap();
            this.targets = new HashMap();
        }

        public <R> R resolve(long j, MessagePredicate messagePredicate, MessageFunction<R> messageFunction) {
            RouteFW build = new RouteFW.Builder().wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).correlationId(1L).role(builder -> {
                builder.set(Role.SERVER);
            }).source("source").sourceRef(ReferenceKind.SERVER.nextRef(this.routeRefs)).target("target").targetRef(ReferenceKind.SERVER.nextRef(this.routeRefs)).extension(builder2 -> {
                builder2.set((mutableDirectBuffer, i, i2) -> {
                    return this.sseRouteExRW.wrap(mutableDirectBuffer, i, i2).pathInfo("/").limit();
                });
            }).build();
            return (R) messageFunction.apply(build.typeId(), build.buffer(), build.offset(), build.sizeof());
        }

        public void forEach(MessageConsumer messageConsumer) {
        }

        public MessageConsumer supplyTarget(String str) {
            return this.targets.get(str);
        }

        public void setThrottle(String str, long j, MessageConsumer messageConsumer) {
            this.throttles.put(str, messageConsumer);
        }

        public void setTarget(String str, MessageConsumer messageConsumer) {
            this.targets.put(str, messageConsumer);
        }

        public MessageConsumer getSource(String str) {
            return this.throttles.get(str);
        }
    }

    @Setup(Level.Trial)
    public void init() {
        this.streams = new Long2ObjectHashMap<>();
        this.source = new OneToOneRingBuffer(new UnsafeBuffer(ByteBuffer.allocateDirect(67108864 + RingBufferDescriptor.TRAILER_LENGTH)));
        this.nukleus = new OneToOneRingBuffer(new UnsafeBuffer(ByteBuffer.allocateDirect(67108864 + RingBufferDescriptor.TRAILER_LENGTH)));
        this.target = new OneToOneRingBuffer(new UnsafeBuffer(ByteBuffer.allocateDirect(67108864 + RingBufferDescriptor.TRAILER_LENGTH)));
        Configuration configuration = new Configuration();
        DefaultBufferPool defaultBufferPool = new DefaultBufferPool(0, 0);
        MutableInteger mutableInteger = new MutableInteger();
        MutableInteger mutableInteger2 = new MutableInteger();
        MutableInteger mutableInteger3 = new MutableInteger();
        MutableInteger mutableInteger4 = new MutableInteger();
        UnsafeBuffer unsafeBuffer = new UnsafeBuffer(new byte[1024]);
        Router router = new Router();
        StreamFactory build = new ServerStreamFactoryBuilder(configuration).setAccumulatorSupplier(str -> {
            return j -> {
            };
        }).setCounterSupplier(str2 -> {
            return () -> {
                return 0L;
            };
        }).setBufferPoolSupplier(() -> {
            return defaultBufferPool;
        }).setTargetCorrelationIdSupplier(() -> {
            int i = mutableInteger.value + 1;
            mutableInteger.value = i;
            return i;
        }).setGroupBudgetClaimer(j -> {
            return i -> {
                return i;
            };
        }).setGroupBudgetReleaser(j2 -> {
            return i -> {
                return i;
            };
        }).setGroupIdSupplier(() -> {
            int i = mutableInteger2.value + 1;
            mutableInteger2.value = i;
            return i;
        }).setRouteManager(router).setStreamIdSupplier(() -> {
            int i = mutableInteger3.value + 1;
            mutableInteger3.value = i;
            return i;
        }).setTraceSupplier(() -> {
            int i = mutableInteger4.value + 1;
            mutableInteger4.value = i;
            return i;
        }).setWriteBuffer(unsafeBuffer).build();
        BeginFW beginFW = new BeginFW();
        DataFW dataFW = new DataFW();
        WindowFW.Builder builder = new WindowFW.Builder();
        HttpBeginExFW.Builder builder2 = new HttpBeginExFW.Builder();
        UnsafeBuffer unsafeBuffer2 = new UnsafeBuffer(new byte[1024]);
        MessageConsumer[] messageConsumerArr = new MessageConsumer[1];
        router.setTarget("source", (i, directBuffer, i2, i3) -> {
            ((MutableDirectBuffer) directBuffer).putLong(i2 + 8, System.nanoTime());
            this.source.write(i, directBuffer, i2, i3);
            if (i != 1) {
                if (i == 2) {
                    DataFW wrap = dataFW.wrap(directBuffer, i2, i3);
                    WindowFW build2 = builder.wrap(unsafeBuffer2, 0, unsafeBuffer2.capacity()).streamId(wrap.streamId()).credit(wrap.length()).padding(wrap.padding()).groupId(wrap.groupId()).build();
                    messageConsumerArr[0].accept(build2.typeId(), build2.buffer(), build2.offset(), build2.sizeof());
                    return;
                }
                return;
            }
            MessageConsumer source = router.getSource("source");
            Objects.requireNonNull(source);
            WindowFW build3 = builder.wrap(unsafeBuffer2, 0, unsafeBuffer2.capacity()).streamId(beginFW.wrap(directBuffer, i2, i3).streamId()).credit(Integer.MAX_VALUE).padding(0).groupId(0L).build();
            source.accept(build3.typeId(), build3.buffer(), build3.offset(), build3.sizeof());
            messageConsumerArr[0] = source;
        });
        UnsafeBuffer unsafeBuffer3 = new UnsafeBuffer(new byte[1024]);
        router.setTarget("target", (i4, directBuffer2, i5, i6) -> {
            ((MutableDirectBuffer) directBuffer2).putLong(i5 + 8, System.nanoTime());
            this.target.write(i4, directBuffer2, i5, i6);
            if (i4 == 1) {
                MessageConsumer source = router.getSource("target");
                Objects.requireNonNull(source);
                BeginFW wrap = beginFW.wrap(directBuffer2, i5, i6);
                WindowFW build2 = builder.wrap(unsafeBuffer3, 0, unsafeBuffer3.capacity()).streamId(wrap.streamId()).credit(Integer.MAX_VALUE).padding(0).groupId(0L).build();
                source.accept(build2.typeId(), build2.buffer(), build2.offset(), build2.sizeof());
                BeginFW build3 = new BeginFW.Builder().wrap(unsafeBuffer3, 0, unsafeBuffer3.capacity()).streamId(1L).source("target").sourceRef(0L).correlationId(wrap.correlationId()).build();
                this.stream = build.newStream(build3.typeId(), build3.buffer(), build3.offset(), build3.sizeof(), (i4, directBuffer2, i5, i6) -> {
                });
                this.stream.accept(build3.typeId(), build3.buffer(), build3.offset(), build3.sizeof());
                this.streams.put(1L, this.stream);
            }
        });
        UnsafeBuffer unsafeBuffer4 = new UnsafeBuffer(new byte[256]);
        BeginFW build2 = new BeginFW.Builder().wrap(unsafeBuffer4, 0, unsafeBuffer4.capacity()).streamId(1L).source("source").sourceRef(1L).correlationId(2L).extension(builder3 -> {
            builder3.set((mutableDirectBuffer, i7, i8) -> {
                return builder2.wrap(mutableDirectBuffer, i7, i8).headersItem(builder3 -> {
                    builder3.name("accept").value("text/event-stream");
                }).headersItem(builder4 -> {
                    builder4.name(":path").value("/");
                }).limit();
            });
        }).build();
        build.newStream(build2.typeId(), build2.buffer(), build2.offset(), build2.sizeof(), (i7, directBuffer3, i8, i9) -> {
        }).accept(build2.typeId(), build2.buffer(), build2.offset(), build2.sizeof());
        this.dataRO = new DataFW.Builder().wrap(unsafeBuffer4, 0, unsafeBuffer4.capacity()).streamId(1L).groupId(0L).padding(0).payload(builder4 -> {
            builder4.set(new byte[128]);
        }).build();
    }

    @Setup(Level.Iteration)
    public void reset() {
        AtomicBuffer buffer = this.nukleus.buffer();
        buffer.setMemory(buffer.capacity() - RingBufferDescriptor.TRAILER_LENGTH, RingBufferDescriptor.TRAILER_LENGTH, (byte) 0);
        buffer.putLongOrdered(0, 0L);
    }

    @Benchmark
    @Group("data")
    public void write(Control control) {
        while (!control.stopMeasurement && !this.nukleus.write(this.dataRO.typeId(), this.dataRO.buffer(), this.dataRO.offset(), this.dataRO.sizeof())) {
            Thread.yield();
        }
    }

    @Benchmark
    @Group("data")
    public void read() {
        this.nukleus.read(this.readHandler);
    }

    private void handleRead(int i, DirectBuffer directBuffer, int i2, int i3) {
        ((MessageConsumer) this.streams.get(this.frameRO.wrap(directBuffer, i2, i2 + i3).streamId())).accept(i, directBuffer, i2, i3);
    }

    public static void main(String[] strArr) throws RunnerException {
        new Runner(new OptionsBuilder().include(ServerStreamBM.class.getSimpleName()).forks(0).build()).run();
    }
}
