package io.confluent.parallelconsumer;

import io.confluent.csid.utils.TimeUtils;
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.InternalRuntimeError;
import io.confluent.parallelconsumer.internal.UserFunctions;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

/* loaded from: input_file:io/confluent/parallelconsumer/ParallelEoSStreamProcessor.class */
public class ParallelEoSStreamProcessor<K, V> extends AbstractParallelEoSStreamProcessor<K, V> implements ParallelStreamProcessor<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ParallelEoSStreamProcessor.class);

    public ParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> parallelConsumerOptions) {
        super(parallelConsumerOptions);
    }

    @Override // io.confluent.parallelconsumer.ParallelStreamProcessor
    public void poll(Consumer<PollContext<K, V>> consumer) {
        supervisorLoop(pollContextInternal -> {
            log.trace("asyncPoll - Consumed a consumerRecord ({}), executing void function...", pollContextInternal);
            UserFunctions.carefullyRun((Consumer<PollContext<K, V>>) consumer, pollContextInternal.getPollContext());
            log.trace("asyncPoll - user function finished ok.");
            return UniLists.of();
        }, obj -> {
            log.trace("Void callback applied.");
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.confluent.parallelconsumer.ParallelStreamProcessor
    public void pollAndProduceMany(Function<PollContext<K, V>, List<ProducerRecord<K, V>>> function, Consumer<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> consumer) {
        if (!getOptions().isProducerSupplied()) {
            throw new IllegalArgumentException("To use the produce flows you must supply a Producer in the options");
        }
        supervisorLoop(pollContextInternal -> {
            List<ProducerRecord<K, V>> list = (List) UserFunctions.carefullyRun((Function<PollContext<K, V>, RESULT>) function, pollContextInternal.getPollContext());
            if (list.isEmpty()) {
                log.debug("No result returned from function to send.");
            }
            log.trace("asyncPoll and Stream - Consumed a record ({}), and returning a derivative result record to be produced: {}", pollContextInternal, list);
            ArrayList arrayList = new ArrayList();
            log.trace("Producing {} messages in result...", Integer.valueOf(list.size()));
            try {
                for (ParallelConsumer.Tuple<ProducerRecord<K, V>, Future<RecordMetadata>> tuple : super.getProducerManager().get().produceMessages(list)) {
                    arrayList.add(new ParallelStreamProcessor.ConsumeProduceResult(pollContextInternal.getPollContext(), tuple.getLeft(), (RecordMetadata) TimeUtils.time(() -> {
                        return (RecordMetadata) ((Future) tuple.getRight()).get(this.options.getSendTimeout().toMillis(), TimeUnit.MILLISECONDS);
                    })));
                }
                return arrayList;
            } catch (Exception e) {
                throw new InternalRuntimeError("Error while waiting for produce results", e);
            }
        }, consumer);
    }

    @Override // io.confluent.parallelconsumer.ParallelStreamProcessor
    public void pollAndProduceMany(Function<PollContext<K, V>, List<ProducerRecord<K, V>>> function) {
        pollAndProduceMany(function, consumeProduceResult -> {
            log.trace("No-op user callback");
        });
    }

    @Override // io.confluent.parallelconsumer.ParallelStreamProcessor
    public void pollAndProduce(Function<PollContext<K, V>, ProducerRecord<K, V>> function) {
        pollAndProduce(function, consumeProduceResult -> {
            log.trace("No-op user callback");
        });
    }

    @Override // io.confluent.parallelconsumer.ParallelStreamProcessor
    public void pollAndProduce(Function<PollContext<K, V>, ProducerRecord<K, V>> function, Consumer<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> consumer) {
        pollAndProduceMany(pollContext -> {
            return UniLists.of((ProducerRecord) function.apply(pollContext));
        }, consumer);
    }
}
