package io.confluent.parallelconsumer.reactor;

import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.DrainingCloseable;
import io.confluent.parallelconsumer.internal.ExternalEngine;
import io.confluent.parallelconsumer.internal.UserFunctions;
import io.confluent.parallelconsumer.state.WorkContainer;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:io/confluent/parallelconsumer/reactor/ReactorProcessor.class */
public class ReactorProcessor<K, V> extends ExternalEngine<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ReactorProcessor.class);
    private static final String REACTOR_TYPE = "reactor.x-type";
    private final Supplier<Scheduler> schedulerSupplier;
    private final Supplier<Scheduler> defaultSchedulerSupplier;

    public ReactorProcessor(ParallelConsumerOptions parallelConsumerOptions, Supplier<Scheduler> supplier) {
        super(parallelConsumerOptions);
        this.defaultSchedulerSupplier = Schedulers::boundedElastic;
        this.schedulerSupplier = supplier == null ? this.defaultSchedulerSupplier : supplier;
    }

    public ReactorProcessor(ParallelConsumerOptions<K, V> parallelConsumerOptions) {
        this(parallelConsumerOptions, null);
    }

    protected boolean isAsyncFutureWork(List<?> list) {
        Iterator<?> it = list.iterator();
        if (it.hasNext()) {
            return it.next() instanceof Disposable;
        }
        return false;
    }

    public void close(Duration duration, DrainingCloseable.DrainingMode drainingMode) {
        super.close(duration, drainingMode);
    }

    public void react(Function<ConsumerRecord<K, V>, Publisher<?>> function) {
        supervisorLoop(consumerRecord -> {
            log.trace("asyncPoll - Consumed a record ({}), executing void function...", Long.valueOf(consumerRecord.offset()));
            WorkContainer workContainerForRecord = this.wm.getSm().getWorkContainerForRecord(consumerRecord);
            if (workContainerForRecord == null) {
                throw new IllegalStateException(StringUtils.msg("WC for record is null! {}", new Object[]{consumerRecord}));
            }
            workContainerForRecord.setWorkType(REACTOR_TYPE);
            Disposable subscribe = Flux.from((Publisher) UserFunctions.carefullyRun(function, consumerRecord)).publishOn(getScheduler()).doOnNext(obj -> {
                log.trace("doOnNext {}", obj);
            }).doOnComplete(() -> {
                log.debug("Reactor success (doOnComplete)");
                workContainerForRecord.onUserFunctionSuccess();
                addToMailbox(workContainerForRecord);
            }).doOnError(th -> {
                log.error("Reactor fail signal", th);
                workContainerForRecord.onUserFunctionFailure();
                addToMailbox(workContainerForRecord);
            }).subscribeOn(getScheduler()).subscribe();
            log.trace("asyncPoll - user function finished ok.");
            return UniLists.of(subscribe);
        }, obj -> {
            log.trace("Void callback applied.");
        });
    }

    private Scheduler getScheduler() {
        return this.schedulerSupplier.get();
    }
}
