package org.reaktivity.nukleus.tcp.internal.streams.rfc793;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.jboss.byteman.contrib.bmunit.BMScript;
import org.jboss.byteman.contrib.bmunit.BMUnitConfig;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.kaazing.k3po.junit.annotation.Specification;
import org.kaazing.k3po.junit.rules.K3poRule;
import org.reaktivity.nukleus.tcp.internal.SocketChannelHelper;
import org.reaktivity.nukleus.tcp.internal.TcpCountersRule;
import org.reaktivity.reaktor.internal.ReaktorConfiguration;
import org.reaktivity.reaktor.test.ReaktorRule;

@RunWith(BMUnitRunner.class)
@BMUnitConfig(loadDirectory = "src/test/resources")
@BMScript("SocketChannelHelper.btm")
/* loaded from: input_file:org/reaktivity/nukleus/tcp/internal/streams/rfc793/ServerPartialWriteLimitsIT.class */
public class ServerPartialWriteLimitsIT {
    private final K3poRule k3po = new K3poRule().addScriptRoot("route", "org/reaktivity/specification/nukleus/tcp/control/route").addScriptRoot("client", "org/reaktivity/specification/tcp/rfc793").addScriptRoot("server", "org/reaktivity/specification/nukleus/tcp/streams/rfc793");
    private final TestRule timeout = new DisableOnDebug(new Timeout(5, TimeUnit.SECONDS));
    private final ReaktorRule reaktor;
    private final TcpCountersRule counters;

    @Rule
    public final TestRule chain;

    public ServerPartialWriteLimitsIT() {
        String str = "tcp";
        this.reaktor = new ReaktorRule().nukleus((v1) -> {
            return r2.equals(v1);
        }).directory("target/nukleus-itests").commandBufferCapacity(1024).responseBufferCapacity(1024).counterValuesBufferCapacity(4096).configure(ReaktorConfiguration.REAKTOR_BUFFER_SLOT_CAPACITY, 16).configure(ReaktorConfiguration.REAKTOR_BUFFER_POOL_CAPACITY, 16).affinityMask("target#0", Long.MIN_VALUE).clean();
        this.counters = new TcpCountersRule(this.reaktor);
        this.chain = RuleChain.outerRule(SocketChannelHelper.RULE).around(this.reaktor).around(this.counters).around(this.k3po).around(this.timeout);
    }

    @Test
    @Specification({"${route}/server/controller", "${server}/server.sent.data.multiple.frames/server", "${client}/server.sent.data.multiple.frames/client"})
    public void shouldWriteWhenMoreDataArrivesWhileAwaitingSocketWritableWithoutOverflowingSlot() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        SocketChannelHelper.OnDataHelper.fragmentWrites(IntStream.generate(() -> {
            if (atomicInteger.incrementAndGet() == 1) {
                return 5;
            }
            return atomicInteger.get() == 2 ? 6 : -1;
        }));
        SocketChannelHelper.HandleWriteHelper.fragmentWrites(IntStream.generate(() -> {
            return atomicInteger.get() >= 2 ? -1 : 0;
        }));
        this.k3po.finish();
        Assert.assertEquals(0L, this.counters.overflows());
    }

    @Test
    @Specification({"${route}/server/controller", "${server}/server.sent.data.multiple.streams.second.was.reset/server"})
    public void shouldResetStreamsExceedingPartialWriteStreamsLimit() throws Exception {
        SocketChannelHelper.OnDataHelper.fragmentWrites(IntStream.concat(IntStream.of(1), IntStream.generate(() -> {
            return 0;
        })));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        SocketChannelHelper.HandleWriteHelper.fragmentWrites(IntStream.generate(() -> {
            return atomicBoolean.get() ? -1 : 0;
        }));
        this.k3po.start();
        this.k3po.awaitBarrier("ROUTED_SERVER");
        SocketChannel open = SocketChannel.open();
        try {
            SocketChannel open2 = SocketChannel.open();
            Throwable th = null;
            try {
                try {
                    open.connect(new InetSocketAddress("127.0.0.1", 8080));
                    open2.connect(new InetSocketAddress("127.0.0.1", 8080));
                    this.k3po.awaitBarrier("SECOND_STREAM_RESET_RECEIVED");
                    atomicBoolean.set(true);
                    ByteBuffer allocate = ByteBuffer.allocate(256);
                    while (allocate.position() < 13 && open.read(allocate) != -1) {
                    }
                    allocate.flip();
                    Assert.assertEquals("server data 1", StandardCharsets.UTF_8.decode(allocate).toString());
                    int i = 0;
                    allocate.rewind();
                    while (allocate.position() < 13) {
                        i = open2.read(allocate);
                        if (i == -1) {
                            break;
                        }
                    }
                    allocate.flip();
                    Assert.assertEquals(0L, allocate.remaining());
                    Assert.assertEquals(-1L, i);
                    this.k3po.finish();
                    if (open2 != null) {
                        $closeResource(null, open2);
                    }
                    Assert.assertEquals(1L, this.counters.overflows());
                } finally {
                }
            } catch (Throwable th2) {
                if (open2 != null) {
                    $closeResource(th, open2);
                }
                throw th2;
            }
        } finally {
            if (open != null) {
                $closeResource(null, open);
            }
        }
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
