package org.asynchttpclient.reactivestreams;

import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.asynchttpclient.AbstractBasicTest;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.HttpResponseHeaders;
import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.Response;
import org.asynchttpclient.handler.StreamedAsyncHandler;
import org.asynchttpclient.test.TestUtils;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/asynchttpclient/reactivestreams/ReactiveStreamsTest.class */
public abstract class ReactiveStreamsTest extends AbstractBasicTest {

    /* loaded from: input_file:org/asynchttpclient/reactivestreams/ReactiveStreamsTest$CancellingStreamedAsyncProvider.class */
    static class CancellingStreamedAsyncProvider implements StreamedAsyncHandler<CancellingStreamedAsyncProvider> {
        private final int cancelAfter;

        public CancellingStreamedAsyncProvider(int i) {
            this.cancelAfter = i;
        }

        public AsyncHandler.State onStream(Publisher<HttpResponseBodyPart> publisher) {
            publisher.subscribe(new CancellingSubscriber(this.cancelAfter));
            return AsyncHandler.State.CONTINUE;
        }

        public void onThrowable(Throwable th) {
            throw new AssertionError(th);
        }

        public AsyncHandler.State onBodyPartReceived(HttpResponseBodyPart httpResponseBodyPart) throws Exception {
            throw new AssertionError("Should not have received body part");
        }

        public AsyncHandler.State onStatusReceived(HttpResponseStatus httpResponseStatus) throws Exception {
            return AsyncHandler.State.CONTINUE;
        }

        public AsyncHandler.State onHeadersReceived(HttpResponseHeaders httpResponseHeaders) throws Exception {
            return AsyncHandler.State.CONTINUE;
        }

        /* renamed from: onCompleted, reason: merged with bridge method [inline-methods] */
        public CancellingStreamedAsyncProvider m27onCompleted() throws Exception {
            return this;
        }
    }

    /* loaded from: input_file:org/asynchttpclient/reactivestreams/ReactiveStreamsTest$CancellingSubscriber.class */
    static class CancellingSubscriber<T> implements Subscriber<T> {
        private final int cancelAfter;
        private volatile Subscription subscription;
        private volatile int count;

        public CancellingSubscriber(int i) {
            this.cancelAfter = i;
        }

        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            if (this.cancelAfter == 0) {
                subscription.cancel();
            } else {
                subscription.request(1L);
            }
        }

        public void onNext(T t) {
            this.count++;
            if (this.count == this.cancelAfter) {
                this.subscription.cancel();
            } else {
                this.subscription.request(1L);
            }
        }

        public void onError(Throwable th) {
        }

        public void onComplete() {
        }
    }

    /* loaded from: input_file:org/asynchttpclient/reactivestreams/ReactiveStreamsTest$SimpleStreamedAsyncHandler.class */
    protected static class SimpleStreamedAsyncHandler implements StreamedAsyncHandler<SimpleStreamedAsyncHandler> {
        private final SimpleSubscriber<HttpResponseBodyPart> subscriber;

        public SimpleStreamedAsyncHandler() {
            this(new SimpleSubscriber());
        }

        public SimpleStreamedAsyncHandler(SimpleSubscriber<HttpResponseBodyPart> simpleSubscriber) {
            this.subscriber = simpleSubscriber;
        }

        public AsyncHandler.State onStream(Publisher<HttpResponseBodyPart> publisher) {
            publisher.subscribe(this.subscriber);
            return AsyncHandler.State.CONTINUE;
        }

        public void onThrowable(Throwable th) {
            throw new AssertionError(th);
        }

        public AsyncHandler.State onBodyPartReceived(HttpResponseBodyPart httpResponseBodyPart) throws Exception {
            throw new AssertionError("Should not have received body part");
        }

        public AsyncHandler.State onStatusReceived(HttpResponseStatus httpResponseStatus) throws Exception {
            return AsyncHandler.State.CONTINUE;
        }

        public AsyncHandler.State onHeadersReceived(HttpResponseHeaders httpResponseHeaders) throws Exception {
            return AsyncHandler.State.CONTINUE;
        }

        /* renamed from: onCompleted, reason: merged with bridge method [inline-methods] */
        public SimpleStreamedAsyncHandler m28onCompleted() throws Exception {
            return this;
        }

        public byte[] getBytes() throws Throwable {
            List<HttpResponseBodyPart> elements = this.subscriber.getElements();
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            Iterator<HttpResponseBodyPart> it = elements.iterator();
            while (it.hasNext()) {
                it.next().writeTo(byteArrayOutputStream);
            }
            return byteArrayOutputStream.toByteArray();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/asynchttpclient/reactivestreams/ReactiveStreamsTest$SimpleSubscriber.class */
    public static class SimpleSubscriber<T> implements Subscriber<T> {
        private volatile Subscription subscription;
        private volatile Throwable error;
        private final List<T> elements = Collections.synchronizedList(new ArrayList());
        private final CountDownLatch latch = new CountDownLatch(1);

        protected SimpleSubscriber() {
        }

        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            subscription.request(1L);
        }

        public void onNext(T t) {
            this.elements.add(t);
            this.subscription.request(1L);
        }

        public void onError(Throwable th) {
            this.error = th;
            this.latch.countDown();
        }

        public void onComplete() {
            this.latch.countDown();
        }

        public List<T> getElements() throws Throwable {
            this.latch.await();
            if (this.error != null) {
                throw this.error;
            }
            return this.elements;
        }
    }

    @Test(groups = {"standalone", "default_provider"})
    public void streamedResponseTest() throws Throwable {
        AsyncHttpClient asyncHttpClient = getAsyncHttpClient(null);
        Throwable th = null;
        try {
            Assert.assertEquals(((SimpleStreamedAsyncHandler) asyncHttpClient.preparePost(getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES).execute(new SimpleStreamedAsyncHandler()).get()).getBytes(), TestUtils.LARGE_IMAGE_BYTES);
            Assert.assertEquals(((SimpleStreamedAsyncHandler) asyncHttpClient.preparePost(getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES).execute(new SimpleStreamedAsyncHandler()).get()).getBytes(), TestUtils.LARGE_IMAGE_BYTES);
            Assert.assertEquals(((Response) asyncHttpClient.preparePost(getTargetUrl()).setBody("Hello").execute().get()).getResponseBody(), "Hello");
            if (asyncHttpClient != null) {
                if (0 == 0) {
                    asyncHttpClient.close();
                    return;
                }
                try {
                    asyncHttpClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (asyncHttpClient != null) {
                if (0 != 0) {
                    try {
                        asyncHttpClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    asyncHttpClient.close();
                }
            }
            throw th3;
        }
    }

    @Test(groups = {"standalone", "default_provider"})
    public void cancelStreamedResponseTest() throws Throwable {
        AsyncHttpClient asyncHttpClient = getAsyncHttpClient(null);
        Throwable th = null;
        try {
            asyncHttpClient.preparePost(getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES).execute(new CancellingStreamedAsyncProvider(0)).get();
            asyncHttpClient.preparePost(getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES).execute(new CancellingStreamedAsyncProvider(1)).get();
            asyncHttpClient.preparePost(getTargetUrl()).setBody(TestUtils.LARGE_IMAGE_BYTES).execute(new CancellingStreamedAsyncProvider(10)).get();
            Assert.assertEquals(((Response) asyncHttpClient.preparePost(getTargetUrl()).setBody("Hello").execute().get()).getResponseBody(), "Hello");
            if (asyncHttpClient != null) {
                if (0 == 0) {
                    asyncHttpClient.close();
                    return;
                }
                try {
                    asyncHttpClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (asyncHttpClient != null) {
                if (0 != 0) {
                    try {
                        asyncHttpClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    asyncHttpClient.close();
                }
            }
            throw th3;
        }
    }
}
