package io.servicetalk.http.router.jersey;

import io.servicetalk.buffer.netty.BufferAllocators;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.ExecutorExtension;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.http.api.DefaultHttpHeadersFactory;
import io.servicetalk.http.api.DefaultStreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.HttpExecutionContext;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpHeaderNames;
import io.servicetalk.http.api.HttpHeaderValues;
import io.servicetalk.http.api.HttpProtocolVersion;
import io.servicetalk.http.api.HttpResponseStatus;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpService;
import io.servicetalk.http.router.jersey.resources.CancellableResources;
import io.servicetalk.transport.api.IoExecutor;
import jakarta.ws.rs.core.Application;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith({MockitoExtension.class})
@MockitoSettings(strictness = Strictness.LENIENT)
/* loaded from: input_file:io/servicetalk/http/router/jersey/CancellationTest.class */
class CancellationTest {
    private static final StreamingHttpRequestResponseFactory HTTP_REQ_RES_FACTORY = new DefaultStreamingHttpRequestResponseFactory(BufferAllocators.DEFAULT_ALLOCATOR, DefaultHttpHeadersFactory.INSTANCE, HttpProtocolVersion.HTTP_1_1);
    private static final CharSequence TEST_DATA = TestUtils.newLargePayload();

    @RegisterExtension
    static final ExecutorExtension<Executor> execRule = ExecutorExtension.withCachedExecutor().setClassLevel(true);

    @Mock
    private HttpServiceContext ctx;

    @Mock
    private Executor execMock;
    private CancellableResources cancellableResources;
    private StreamingHttpService jerseyRouter;

    /* loaded from: input_file:io/servicetalk/http/router/jersey/CancellationTest$TestCancellable.class */
    private static class TestCancellable implements Cancellable {
        private boolean cancelled;

        private TestCancellable() {
        }

        public void cancel() {
            this.cancelled = true;
        }
    }

    CancellationTest() {
    }

    @BeforeEach
    void setup() {
        HttpExecutionContext httpExecutionContext = (HttpExecutionContext) Mockito.mock(HttpExecutionContext.class);
        Mockito.when(this.ctx.executionContext()).thenReturn(httpExecutionContext);
        Mockito.when(this.ctx.localAddress()).thenReturn(InetSocketAddress.createUnresolved("localhost", 8080));
        Mockito.when(httpExecutionContext.bufferAllocator()).thenReturn(BufferAllocators.DEFAULT_ALLOCATOR);
        Mockito.when(httpExecutionContext.executor()).thenReturn(this.execMock);
        Mockito.when(httpExecutionContext.ioExecutor()).thenReturn((IoExecutor) Mockito.mock(IoExecutor.class));
        Mockito.when(httpExecutionContext.executionStrategy()).thenReturn(HttpExecutionStrategies.defaultStrategy());
        this.cancellableResources = new CancellableResources();
        this.jerseyRouter = new HttpJerseyRouterBuilder().routeExecutionStrategyFactory(ExecutionStrategyTest.asFactory(Collections.singletonMap("test", HttpExecutionStrategies.defaultStrategy()))).buildStreaming(new Application() { // from class: io.servicetalk.http.router.jersey.CancellationTest.1
            public Set<Object> getSingletons() {
                return Collections.singleton(CancellationTest.this.cancellableResources);
            }
        });
    }

    @Test
    void cancelSuspended() throws Exception {
        TestCancellable testCancellable = new TestCancellable();
        Mockito.when(this.execMock.schedule((Runnable) ArgumentMatchers.any(Runnable.class), ArgumentMatchers.eq(7L), (TimeUnit) ArgumentMatchers.eq(TimeUnit.DAYS))).thenReturn(testCancellable);
        testCancelResponseSingle(get("/suspended"), false);
        MatcherAssert.assertThat(Boolean.valueOf(testCancellable.cancelled), Matchers.is(true));
    }

    @Test
    void cancelSingle() throws Exception {
        testCancelResponseSingle(get("/single"));
    }

    @Test
    void cancelOffload() throws Exception {
        testCancelResponseSingle(get("/offload"));
    }

    @Test
    void cancelOioStreams() throws Exception {
        testCancelResponsePayload(post("/oio-streams"));
        testCancelResponseSingle(post("/offload-oio-streams"));
        testCancelResponseSingle(post("/no-offloads-oio-streams"));
    }

    @Test
    void cancelRsStreams() throws Exception {
        testCancelResponsePayload(post("/rs-streams"));
        testCancelResponsePayload(post("/rs-streams?subscribe=true"));
        testCancelResponseSingle(post("/offload-rs-streams"));
        testCancelResponseSingle(post("/offload-rs-streams?subscribe=true"));
        testCancelResponseSingle(post("/no-offloads-rs-streams"));
        testCancelResponseSingle(post("/no-offloads-rs-streams?subscribe=true"));
    }

    @Test
    void cancelSse() throws Exception {
        ((Executor) Mockito.doAnswer(invocationOnMock -> {
            Object[] arguments = invocationOnMock.getArguments();
            return execRule.executor().schedule((Runnable) arguments[0], ((Long) arguments[1]).longValue(), (TimeUnit) arguments[2]);
        }).when(this.execMock)).schedule((Runnable) ArgumentMatchers.any(Runnable.class), ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
        testCancelResponsePayload(get("/sse"));
        this.cancellableResources.sseSinkClosedLatch.await();
    }

    private void testCancelResponsePayload(StreamingHttpRequest streamingHttpRequest) throws Exception {
        Future future = execRule.executor().submit(() -> {
            return this.jerseyRouter.handle(this.ctx, streamingHttpRequest, HTTP_REQ_RES_FACTORY);
        }).flatMap(Function.identity()).toFuture();
        StreamingHttpResponse streamingHttpResponse = (StreamingHttpResponse) future.get();
        future.cancel(true);
        MatcherAssert.assertThat(streamingHttpResponse.status(), Matchers.is(HttpResponseStatus.OK));
        AtomicReference atomicReference = new AtomicReference();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Publisher payloadBody = streamingHttpResponse.payloadBody();
        Objects.requireNonNull(atomicReference);
        Publisher beforeOnError = payloadBody.beforeOnError((v1) -> {
            r1.set(v1);
        });
        Objects.requireNonNull(countDownLatch);
        beforeOnError.beforeCancel(countDownLatch::countDown).ignoreElements().subscribe().cancel();
        countDownLatch.await();
        Throwable th = (Throwable) atomicReference.get();
        if (th != null) {
            throw new AssertionError(th);
        }
    }

    private void testCancelResponseSingle(StreamingHttpRequest streamingHttpRequest) throws Exception {
        testCancelResponseSingle(streamingHttpRequest, true);
    }

    private void testCancelResponseSingle(StreamingHttpRequest streamingHttpRequest, boolean z) throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        if (z) {
            Single beforeOnError = execRule.executor().submit(() -> {
                return this.jerseyRouter.handle(this.ctx, streamingHttpRequest, HTTP_REQ_RES_FACTORY);
            }).flatMap(Function.identity()).beforeOnError(th -> {
                if ((th instanceof IllegalStateException) || (th instanceof InterruptedException)) {
                    return;
                }
                atomicReference.compareAndSet(null, th);
            });
            Objects.requireNonNull(countDownLatch);
            SourceAdapters.toSource(beforeOnError.afterCancel(countDownLatch::countDown)).subscribe(new SingleSource.Subscriber<StreamingHttpResponse>() { // from class: io.servicetalk.http.router.jersey.CancellationTest.2
                public void onSubscribe(Cancellable cancellable) {
                    cancellable.cancel();
                }

                public void onSuccess(@Nullable StreamingHttpResponse streamingHttpResponse) {
                    if (streamingHttpResponse == null) {
                        atomicReference.compareAndSet(null, new NullPointerException("result == null not expected."));
                        countDownLatch.countDown();
                    } else {
                        Completable ignoreElements = streamingHttpResponse.messageBody().ignoreElements();
                        CountDownLatch countDownLatch2 = countDownLatch;
                        Objects.requireNonNull(countDownLatch2);
                        ignoreElements.afterFinally(countDownLatch2::countDown).subscribe();
                    }
                }

                public void onError(Throwable th2) {
                    if (!(th2 instanceof IllegalStateException) && !(th2 instanceof InterruptedException)) {
                        atomicReference.compareAndSet(null, th2);
                    }
                    countDownLatch.countDown();
                }
            });
        } else {
            Single handle = this.jerseyRouter.handle(this.ctx, streamingHttpRequest, HTTP_REQ_RES_FACTORY);
            Objects.requireNonNull(atomicReference);
            Single beforeOnError2 = handle.beforeOnError((v1) -> {
                r1.set(v1);
            });
            Objects.requireNonNull(countDownLatch);
            beforeOnError2.afterCancel(countDownLatch::countDown).ignoreElement().subscribe().cancel();
        }
        countDownLatch.await();
        Throwable th2 = (Throwable) atomicReference.get();
        if (th2 != null) {
            throw new AssertionError(th2);
        }
    }

    private static StreamingHttpRequest get(String str) {
        return HTTP_REQ_RES_FACTORY.get("/cancel" + str);
    }

    private static StreamingHttpRequest post(String str) {
        StreamingHttpRequest payloadBody = HTTP_REQ_RES_FACTORY.post("/cancel" + str).payloadBody(Publisher.from(BufferAllocators.DEFAULT_ALLOCATOR.fromAscii(TEST_DATA)));
        payloadBody.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN);
        return payloadBody;
    }
}
