package org.asynchttpclient.request.body.generator;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.kafka.shade.org.tukaani.xz.common.Util;
import org.asynchttpclient.request.body.Body;
import org.asynchttpclient.util.Assertions;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/async-http-client-2.12.1.jar:org/asynchttpclient/request/body/generator/ReactiveStreamsBodyGenerator.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.2.jar:META-INF/bundled-dependencies/async-http-client-2.12.1.jar:org/asynchttpclient/request/body/generator/ReactiveStreamsBodyGenerator.class */
public class ReactiveStreamsBodyGenerator implements FeedableBodyGenerator {
    private final Publisher<ByteBuf> publisher;
    private final FeedableBodyGenerator feedableBodyGenerator = new UnboundedQueueFeedableBodyGenerator();
    private final long contentLength;
    private volatile FeedListener feedListener;

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/async-http-client-2.12.1.jar:org/asynchttpclient/request/body/generator/ReactiveStreamsBodyGenerator$SimpleSubscriber.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.2.jar:META-INF/bundled-dependencies/async-http-client-2.12.1.jar:org/asynchttpclient/request/body/generator/ReactiveStreamsBodyGenerator$SimpleSubscriber.class */
    private class SimpleSubscriber implements Subscriber<ByteBuf> {
        private final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SimpleSubscriber.class);
        private final FeedableBodyGenerator feeder;
        private volatile Subscription subscription;

        public SimpleSubscriber(FeedableBodyGenerator feedableBodyGenerator) {
            this.feeder = feedableBodyGenerator;
        }

        @Override // org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            Assertions.assertNotNull(subscription, "subscription");
            if (this.subscription != null) {
                subscription.cancel();
            } else {
                this.subscription = subscription;
                this.subscription.request(Util.VLI_MAX);
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(ByteBuf byteBuf) {
            Assertions.assertNotNull(byteBuf, "bytebuf");
            try {
                this.feeder.feed(byteBuf, false);
            } catch (Exception e) {
                this.LOGGER.error("Exception occurred while processing element in stream.", (Throwable) e);
                this.subscription.cancel();
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            Assertions.assertNotNull(th, "throwable");
            this.LOGGER.debug("Error occurred while consuming body stream.", th);
            FeedListener feedListener = ReactiveStreamsBodyGenerator.this.feedListener;
            if (feedListener != null) {
                feedListener.onError(th);
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            try {
                this.feeder.feed(Unpooled.EMPTY_BUFFER, true);
            } catch (Exception e) {
                this.LOGGER.info("Ignoring exception occurred while completing stream processing.", (Throwable) e);
                this.subscription.cancel();
            }
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/async-http-client-2.12.1.jar:org/asynchttpclient/request/body/generator/ReactiveStreamsBodyGenerator$StreamedBody.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.2.jar:META-INF/bundled-dependencies/async-http-client-2.12.1.jar:org/asynchttpclient/request/body/generator/ReactiveStreamsBodyGenerator$StreamedBody.class */
    private class StreamedBody implements Body {
        private final AtomicBoolean initialized = new AtomicBoolean(false);
        private final SimpleSubscriber subscriber;
        private final Body body;
        private final long contentLength;

        public StreamedBody(FeedableBodyGenerator feedableBodyGenerator, long j) {
            this.body = feedableBodyGenerator.createBody();
            this.subscriber = new SimpleSubscriber(feedableBodyGenerator);
            this.contentLength = j;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.body.close();
        }

        @Override // org.asynchttpclient.request.body.Body
        public long getContentLength() {
            return this.contentLength;
        }

        @Override // org.asynchttpclient.request.body.Body
        public Body.BodyState transferTo(ByteBuf byteBuf) throws IOException {
            if (this.initialized.compareAndSet(false, true)) {
                ReactiveStreamsBodyGenerator.this.publisher.subscribe(this.subscriber);
            }
            return this.body.transferTo(byteBuf);
        }
    }

    public ReactiveStreamsBodyGenerator(Publisher<ByteBuf> publisher, long j) {
        this.publisher = publisher;
        this.contentLength = j;
    }

    public Publisher<ByteBuf> getPublisher() {
        return this.publisher;
    }

    @Override // org.asynchttpclient.request.body.generator.FeedableBodyGenerator
    public boolean feed(ByteBuf byteBuf, boolean z) throws Exception {
        return this.feedableBodyGenerator.feed(byteBuf, z);
    }

    @Override // org.asynchttpclient.request.body.generator.FeedableBodyGenerator
    public void setListener(FeedListener feedListener) {
        this.feedListener = feedListener;
        this.feedableBodyGenerator.setListener(feedListener);
    }

    public long getContentLength() {
        return this.contentLength;
    }

    @Override // org.asynchttpclient.request.body.generator.BodyGenerator
    public Body createBody() {
        return new StreamedBody(this.feedableBodyGenerator, this.contentLength);
    }
}
