package io.confluent.parallelconsumer.vertx;

import io.confluent.csid.utils.Java8StreamUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.PollContext;
import io.confluent.parallelconsumer.internal.UserFunctions;
import io.confluent.parallelconsumer.vertx.VertxParallelEoSStreamProcessor;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/vertx/JStreamVertxParallelEoSStreamProcessor.class */
public class JStreamVertxParallelEoSStreamProcessor<K, V> extends VertxParallelEoSStreamProcessor<K, V> implements JStreamVertxParallelStreamProcessor<K, V> {
    private static final Logger log = LoggerFactory.getLogger(JStreamVertxParallelEoSStreamProcessor.class);
    private final Stream<VertxCPResult<K, V>> stream;
    private final ConcurrentLinkedDeque<VertxCPResult<K, V>> userProcessResultsStream;

    /* loaded from: input_file:io/confluent/parallelconsumer/vertx/JStreamVertxParallelEoSStreamProcessor$VertxCPResult.class */
    public static class VertxCPResult<K, V> {
        private final PollContext<K, V> in;
        private final Future<HttpResponse<Buffer>> asr;
        private final Optional<VertxParallelEoSStreamProcessor.RequestInfo> requestInfo;
        private final Optional<HttpRequest<Buffer>> httpReq;

        /* loaded from: input_file:io/confluent/parallelconsumer/vertx/JStreamVertxParallelEoSStreamProcessor$VertxCPResult$VertxCPResultBuilder.class */
        public static class VertxCPResultBuilder<K, V> {
            private PollContext<K, V> in;
            private Future<HttpResponse<Buffer>> asr;
            private boolean requestInfo$set;
            private Optional<VertxParallelEoSStreamProcessor.RequestInfo> requestInfo$value;
            private boolean httpReq$set;
            private Optional<HttpRequest<Buffer>> httpReq$value;

            VertxCPResultBuilder() {
            }

            public VertxCPResultBuilder<K, V> in(PollContext<K, V> pollContext) {
                this.in = pollContext;
                return this;
            }

            public VertxCPResultBuilder<K, V> asr(Future<HttpResponse<Buffer>> future) {
                this.asr = future;
                return this;
            }

            public VertxCPResultBuilder<K, V> requestInfo(Optional<VertxParallelEoSStreamProcessor.RequestInfo> optional) {
                this.requestInfo$value = optional;
                this.requestInfo$set = true;
                return this;
            }

            public VertxCPResultBuilder<K, V> httpReq(Optional<HttpRequest<Buffer>> optional) {
                this.httpReq$value = optional;
                this.httpReq$set = true;
                return this;
            }

            public VertxCPResult<K, V> build() {
                Optional<VertxParallelEoSStreamProcessor.RequestInfo> optional = this.requestInfo$value;
                if (!this.requestInfo$set) {
                    optional = VertxCPResult.access$000();
                }
                Optional<HttpRequest<Buffer>> optional2 = this.httpReq$value;
                if (!this.httpReq$set) {
                    optional2 = VertxCPResult.access$100();
                }
                return new VertxCPResult<>(this.in, this.asr, optional, optional2);
            }

            public String toString() {
                return "JStreamVertxParallelEoSStreamProcessor.VertxCPResult.VertxCPResultBuilder(in=" + this.in + ", asr=" + this.asr + ", requestInfo$value=" + this.requestInfo$value + ", httpReq$value=" + this.httpReq$value + ")";
            }
        }

        private static <K, V> Optional<VertxParallelEoSStreamProcessor.RequestInfo> $default$requestInfo() {
            return Optional.empty();
        }

        private static <K, V> Optional<HttpRequest<Buffer>> $default$httpReq() {
            return Optional.empty();
        }

        VertxCPResult(PollContext<K, V> pollContext, Future<HttpResponse<Buffer>> future, Optional<VertxParallelEoSStreamProcessor.RequestInfo> optional, Optional<HttpRequest<Buffer>> optional2) {
            this.in = pollContext;
            this.asr = future;
            this.requestInfo = optional;
            this.httpReq = optional2;
        }

        public static <K, V> VertxCPResultBuilder<K, V> builder() {
            return new VertxCPResultBuilder<>();
        }

        public PollContext<K, V> getIn() {
            return this.in;
        }

        public Future<HttpResponse<Buffer>> getAsr() {
            return this.asr;
        }

        public Optional<VertxParallelEoSStreamProcessor.RequestInfo> getRequestInfo() {
            return this.requestInfo;
        }

        public Optional<HttpRequest<Buffer>> getHttpReq() {
            return this.httpReq;
        }

        static /* synthetic */ Optional access$000() {
            return $default$requestInfo();
        }

        static /* synthetic */ Optional access$100() {
            return $default$httpReq();
        }
    }

    public JStreamVertxParallelEoSStreamProcessor(Vertx vertx, WebClient webClient, ParallelConsumerOptions<K, V> parallelConsumerOptions) {
        super(vertx, webClient, parallelConsumerOptions);
        this.userProcessResultsStream = new ConcurrentLinkedDeque<>();
        this.stream = Java8StreamUtils.setupStreamFromDeque(this.userProcessResultsStream);
    }

    public JStreamVertxParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> parallelConsumerOptions) {
        this(null, null, parallelConsumerOptions);
    }

    @Override // io.confluent.parallelconsumer.vertx.JStreamVertxParallelStreamProcessor
    public Stream<VertxCPResult<K, V>> vertxHttpReqInfoStream(Function<PollContext<K, V>, VertxParallelEoSStreamProcessor.RequestInfo> function) {
        VertxCPResult.VertxCPResultBuilder builder = VertxCPResult.builder();
        super.vertxHttpReqInfo(pollContext -> {
            builder.in(pollContext);
            VertxParallelEoSStreamProcessor.RequestInfo requestInfo = (VertxParallelEoSStreamProcessor.RequestInfo) UserFunctions.carefullyRun(function, pollContext);
            builder.requestInfo(Optional.of(requestInfo));
            return requestInfo;
        }, future -> {
            builder.asr(future);
            this.userProcessResultsStream.add(builder.build());
        }, asyncResult -> {
        });
        return this.stream;
    }

    @Override // io.confluent.parallelconsumer.vertx.JStreamVertxParallelStreamProcessor
    public Stream<VertxCPResult<K, V>> vertxHttpRequestStream(BiFunction<WebClient, PollContext<K, V>, HttpRequest<Buffer>> biFunction) {
        VertxCPResult.VertxCPResultBuilder builder = VertxCPResult.builder();
        super.vertxHttpRequest((webClient, pollContext) -> {
            builder.in(pollContext);
            HttpRequest httpRequest = (HttpRequest) UserFunctions.carefullyRun(biFunction, webClient, pollContext);
            builder.httpReq(Optional.of(httpRequest));
            return httpRequest;
        }, future -> {
            builder.asr(future);
            this.userProcessResultsStream.add(builder.build());
        }, asyncResult -> {
        });
        return this.stream;
    }

    @Override // io.confluent.parallelconsumer.vertx.JStreamVertxParallelStreamProcessor
    public Stream<VertxCPResult<K, V>> vertxHttpWebClientStream(BiFunction<WebClient, PollContext<K, V>, Future<HttpResponse<Buffer>>> biFunction) {
        VertxCPResult.VertxCPResultBuilder builder = VertxCPResult.builder();
        super.vertxHttpWebClient((webClient, pollContext) -> {
            builder.in(pollContext);
            Future<HttpResponse<Buffer>> future = (Future) UserFunctions.carefullyRun(biFunction, webClient, pollContext);
            builder.asr(future);
            return future;
        }, future -> {
            builder.asr(future);
            this.userProcessResultsStream.add(builder.build());
        });
        return this.stream;
    }
}
