package io.micronaut.configuration.kafka.processor;

import io.micronaut.configuration.kafka.Acknowledgement;
import io.micronaut.configuration.kafka.ConsumerAware;
import io.micronaut.configuration.kafka.ConsumerRegistry;
import io.micronaut.configuration.kafka.ProducerRegistry;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.OffsetStrategy;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.configuration.kafka.bind.ConsumerRecordBinderRegistry;
import io.micronaut.configuration.kafka.bind.batch.BatchConsumerRecordsBinderRegistry;
import io.micronaut.configuration.kafka.config.AbstractKafkaConsumerConfiguration;
import io.micronaut.configuration.kafka.config.DefaultKafkaConsumerConfiguration;
import io.micronaut.configuration.kafka.config.KafkaDefaultConfiguration;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerException;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler;
import io.micronaut.configuration.kafka.serde.SerdeRegistry;
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Blocking;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.bind.DefaultExecutableBinder;
import io.micronaut.core.bind.annotation.Bindable;
import io.micronaut.core.naming.NameUtils;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.ArgumentUtils;
import io.micronaut.core.util.ArrayUtils;
import io.micronaut.core.util.CollectionUtils;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.BeanDefinition;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.messaging.annotation.Body;
import io.micronaut.messaging.annotation.SendTo;
import io.micronaut.messaging.exceptions.MessagingSystemException;
import io.micronaut.runtime.ApplicationConfiguration;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
import javax.annotation.PreDestroy;
import javax.inject.Named;
import javax.inject.Singleton;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Requires(beans = {KafkaDefaultConfiguration.class})
/* loaded from: input_file:io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.class */
public class KafkaConsumerProcessor implements ExecutableMethodProcessor<KafkaListener>, AutoCloseable, ConsumerRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProcessor.class);
    private final ExecutorService executorService;
    private final ApplicationConfiguration applicationConfiguration;
    private final BeanContext beanContext;
    private final AbstractKafkaConsumerConfiguration defaultConsumerConfiguration;
    private final ConsumerRecordBinderRegistry binderRegistry;
    private final SerdeRegistry serdeRegistry;
    private final Scheduler executorScheduler;
    private final KafkaListenerExceptionHandler exceptionHandler;
    private final ProducerRegistry producerRegistry;
    private final BatchConsumerRecordsBinderRegistry batchBinderRegistry;
    private final Map<String, Consumer> consumers = new ConcurrentHashMap();
    private final Map<String, Consumer> pausedConsumers = new ConcurrentHashMap();
    private final Set<String> paused = new ConcurrentSkipListSet();
    private final AtomicInteger clientIdGenerator = new AtomicInteger(10);

    public KafkaConsumerProcessor(@Named("consumer") ExecutorService executorService, ApplicationConfiguration applicationConfiguration, BeanContext beanContext, AbstractKafkaConsumerConfiguration abstractKafkaConsumerConfiguration, ConsumerRecordBinderRegistry consumerRecordBinderRegistry, BatchConsumerRecordsBinderRegistry batchConsumerRecordsBinderRegistry, SerdeRegistry serdeRegistry, ProducerRegistry producerRegistry, KafkaListenerExceptionHandler kafkaListenerExceptionHandler) {
        this.executorService = executorService;
        this.applicationConfiguration = applicationConfiguration;
        this.beanContext = beanContext;
        this.defaultConsumerConfiguration = abstractKafkaConsumerConfiguration;
        this.binderRegistry = consumerRecordBinderRegistry;
        this.batchBinderRegistry = batchConsumerRecordsBinderRegistry;
        this.serdeRegistry = serdeRegistry;
        this.executorScheduler = Schedulers.from(executorService);
        this.producerRegistry = producerRegistry;
        this.exceptionHandler = kafkaListenerExceptionHandler;
        this.beanContext.getBeanDefinitions(Qualifiers.byType(new Class[]{KafkaListener.class})).forEach(beanDefinition -> {
            if (beanDefinition.isSingleton()) {
                try {
                    beanContext.getBean(beanDefinition.getBeanType());
                } catch (Exception e) {
                    throw new MessagingSystemException("Error creating bean for @KafkaListener of type [" + beanDefinition.getBeanType() + "]: " + e.getMessage(), e);
                }
            }
        });
    }

    @Override // io.micronaut.configuration.kafka.ConsumerRegistry
    @Nonnull
    public <K, V> Consumer<K, V> getConsumer(@Nonnull String str) {
        ArgumentUtils.requireNonNull("id", str);
        Consumer<K, V> consumer = this.consumers.get(str);
        if (consumer == null) {
            throw new IllegalArgumentException("No consumer found for ID: " + str);
        }
        return consumer;
    }

    @Override // io.micronaut.configuration.kafka.ConsumerRegistry
    @Nonnull
    public Set<String> getConsumerIds() {
        return Collections.unmodifiableSet(this.consumers.keySet());
    }

    @Override // io.micronaut.configuration.kafka.ConsumerRegistry
    public boolean isPaused(@Nonnull String str) {
        return StringUtils.isNotEmpty(str) && this.consumers.containsKey(str) && this.paused.contains(str) && this.pausedConsumers.containsKey(str);
    }

    @Override // io.micronaut.configuration.kafka.ConsumerRegistry
    public void pause(@Nonnull String str) {
        if (!StringUtils.isNotEmpty(str) || !this.consumers.containsKey(str)) {
            throw new IllegalArgumentException("No consumer found for ID: " + str);
        }
        this.paused.add(str);
    }

    @Override // io.micronaut.configuration.kafka.ConsumerRegistry
    public void resume(@Nonnull String str) {
        if (!StringUtils.isNotEmpty(str) || !this.consumers.containsKey(str)) {
            throw new IllegalArgumentException("No consumer found for ID: " + str);
        }
        this.paused.remove(str);
    }

    public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> executableMethod) {
        String str;
        List<AnnotationValue> declaredAnnotationValuesByType = executableMethod.getDeclaredAnnotationValuesByType(Topic.class);
        AnnotationValue annotation = executableMethod.getAnnotation(KafkaListener.class);
        if (CollectionUtils.isEmpty(declaredAnnotationValuesByType)) {
            declaredAnnotationValuesByType = beanDefinition.getDeclaredAnnotationValuesByType(Topic.class);
        }
        if (annotation == null || CollectionUtils.isEmpty(declaredAnnotationValuesByType)) {
            return;
        }
        Duration duration = (Duration) executableMethod.getValue(KafkaListener.class, "pollTimeout", Duration.class).orElse(Duration.ofMillis(100L));
        Duration duration2 = (Duration) executableMethod.getValue(KafkaListener.class, "sessionTimeout", Duration.class).orElse(null);
        Duration duration3 = (Duration) executableMethod.getValue(KafkaListener.class, "heartbeatInterval", Duration.class).orElse(null);
        boolean isTrue = executableMethod.isTrue(KafkaListener.class, "batch");
        Optional findFirst = Arrays.stream(executableMethod.getArguments()).filter(argument -> {
            return Consumer.class.isAssignableFrom(argument.getType());
        }).findFirst();
        Optional findFirst2 = Arrays.stream(executableMethod.getArguments()).filter(argument2 -> {
            return Acknowledgement.class.isAssignableFrom(argument2.getType()) || io.micronaut.messaging.Acknowledgement.class.isAssignableFrom(argument2.getType());
        }).findFirst();
        String str2 = (String) annotation.stringValue("groupId").orElse(null);
        Class beanType = beanDefinition.getBeanType();
        if (StringUtils.isEmpty(str2)) {
            str2 = (String) this.applicationConfiguration.getName().orElse(beanType.getName());
        }
        boolean isTrue2 = annotation.isTrue("uniqueGroupId");
        String str3 = str2;
        if (isTrue2) {
            str3 = str3 + "_" + UUID.randomUUID().toString();
        }
        String str4 = (String) annotation.stringValue("clientId").orElse(null);
        if (StringUtils.isEmpty(str4)) {
            str4 = (String) this.applicationConfiguration.getName().map(str5 -> {
                return str5 + '-' + NameUtils.hyphenate(beanType.getSimpleName());
            }).orElse(null);
        }
        OffsetStrategy offsetStrategy = (OffsetStrategy) annotation.enumValue("offsetStrategy", OffsetStrategy.class).orElse(OffsetStrategy.AUTO);
        int orElse = annotation.intValue("threads").orElse(1);
        DefaultKafkaConsumerConfiguration defaultKafkaConsumerConfiguration = new DefaultKafkaConsumerConfiguration((AbstractKafkaConsumerConfiguration) this.beanContext.findBean(AbstractKafkaConsumerConfiguration.class, Qualifiers.byName(str2)).orElse(this.defaultConsumerConfiguration));
        Properties config = defaultKafkaConsumerConfiguration.getConfig();
        if (annotation.getRequiredValue("offsetReset", OffsetReset.class) == OffsetReset.EARLIEST) {
            config.putIfAbsent("auto.offset.reset", OffsetReset.EARLIEST.name().toLowerCase());
        }
        config.putIfAbsent("enable.auto.commit", String.valueOf(offsetStrategy == OffsetStrategy.AUTO));
        if (duration3 != null) {
            config.putIfAbsent("heartbeat.interval.ms", String.valueOf(duration3.toMillis()));
        }
        if (duration2 != null) {
            config.putIfAbsent("session.timeout.ms", String.valueOf(duration2.toMillis()));
        }
        config.put("group.id", isTrue2 ? str3 : str2);
        if (str4 != null) {
            config.put("client.id", str4);
        }
        config.putAll(annotation.getProperties("properties", "name"));
        configureDeserializers(executableMethod, defaultKafkaConsumerConfiguration);
        if (LOG.isDebugEnabled()) {
            Optional<Deserializer<K>> keyDeserializer = defaultKafkaConsumerConfiguration.getKeyDeserializer();
            if (keyDeserializer.isPresent()) {
                LOG.debug("Using key deserializer [{}] for Kafka listener: {}", keyDeserializer.get(), executableMethod);
            } else {
                LOG.debug("Using key deserializer [{}] for Kafka listener: {}", config.getProperty("key.deserializer"), executableMethod);
            }
            Optional<Deserializer<V>> valueDeserializer = defaultKafkaConsumerConfiguration.getValueDeserializer();
            if (valueDeserializer.isPresent()) {
                LOG.debug("Using value deserializer [{}] for Kafka listener: {}", valueDeserializer.get(), executableMethod);
            } else {
                LOG.debug("Using value deserializer [{}] for Kafka listener: {}", config.getProperty("value.deserializer"), executableMethod);
            }
        }
        for (int i = 0; i < orElse; i++) {
            if (str4 != null) {
                str = orElse > 1 ? str4 + '-' + this.clientIdGenerator.incrementAndGet() : str4;
                config.put("client.id", str);
            } else {
                str = "kafka-consumer-" + this.clientIdGenerator.incrementAndGet();
            }
            Consumer consumer = (Consumer) this.beanContext.createBean(Consumer.class, new Object[]{defaultKafkaConsumerConfiguration});
            this.consumers.put(str, consumer);
            Object bean = this.beanContext.getBean(beanType);
            if (bean instanceof ConsumerAware) {
                ((ConsumerAware) bean).setKafkaConsumer(consumer);
            }
            for (AnnotationValue annotationValue : declaredAnnotationValuesByType) {
                String[] stringValues = annotationValue.stringValues();
                String[] stringValues2 = annotationValue.stringValues("patterns");
                boolean isNotEmpty = ArrayUtils.isNotEmpty(stringValues);
                boolean isNotEmpty2 = ArrayUtils.isNotEmpty(stringValues2);
                if (!isNotEmpty && !isNotEmpty2) {
                    throw new MessagingSystemException("Either a topic or a topic must be specified for method: " + executableMethod);
                }
                if (isNotEmpty) {
                    List asList = Arrays.asList(stringValues);
                    if (bean instanceof ConsumerRebalanceListener) {
                        consumer.subscribe(asList, (ConsumerRebalanceListener) bean);
                    } else {
                        consumer.subscribe(asList);
                    }
                    LOG.info("Kafka listener [{}] subscribed to topics: {}", executableMethod, asList);
                }
                if (isNotEmpty2) {
                    for (String str6 : stringValues2) {
                        try {
                            Pattern compile = Pattern.compile(str6);
                            if (bean instanceof ConsumerRebalanceListener) {
                                consumer.subscribe(compile, (ConsumerRebalanceListener) bean);
                            } else {
                                consumer.subscribe(compile);
                            }
                            LOG.info("Kafka listener [{}] subscribed to topics pattern: {}", executableMethod, str6);
                        } catch (Exception e) {
                            throw new MessagingSystemException("Invalid topic pattern [" + str6 + "] for method [" + executableMethod + "]: " + e.getMessage(), e);
                        }
                    }
                }
            }
            String str7 = str;
            this.executorService.submit(() -> {
                Flowable<?> just;
                boolean z;
                try {
                    boolean z2 = findFirst2.isPresent() || offsetStrategy == OffsetStrategy.SYNC_PER_RECORD || offsetStrategy == OffsetStrategy.ASYNC_PER_RECORD;
                    HashMap hashMap = new HashMap(2);
                    findFirst.ifPresent(argument3 -> {
                        hashMap.put(argument3, consumer);
                    });
                    boolean z3 = false;
                    while (true) {
                        if (!z3) {
                            try {
                                if (this.paused.contains(str7)) {
                                    z3 = true;
                                    LOG.debug("Pausing Kafka consumption for Consumer [{}] from topic partition: {}", str7, consumer.paused());
                                    consumer.pause(consumer.assignment());
                                    this.pausedConsumers.put(str7, consumer);
                                }
                            } catch (WakeupException e2) {
                                throw e2;
                            } catch (Throwable th) {
                                handleException(consumer, bean, null, th);
                            }
                        }
                        ConsumerRecords poll = consumer.poll(duration);
                        boolean z4 = false;
                        if (z3 && !this.paused.contains(str7)) {
                            LOG.debug("Resuming Kafka consumption for Consumer [{}] from topic partition: {}", str7, consumer.paused());
                            consumer.resume(consumer.paused());
                            this.pausedConsumers.remove(str7);
                            z3 = false;
                        }
                        HashMap hashMap2 = z2 ? new HashMap() : null;
                        if (poll != null && poll.count() > 0) {
                            if (isTrue) {
                                Object invoke = new DefaultExecutableBinder(hashMap).bind(executableMethod, this.batchBinderRegistry, poll).invoke(bean);
                                if (invoke != null) {
                                    if (invoke.getClass().isArray()) {
                                        invoke = Arrays.asList((Object[]) invoke);
                                    }
                                    boolean isConvertibleToPublisher = Publishers.isConvertibleToPublisher(invoke);
                                    Flowable fromIterable = invoke instanceof Iterable ? Flowable.fromIterable((Iterable) invoke) : isConvertibleToPublisher ? (Flowable) Publishers.convertPublisher(invoke, Flowable.class) : Flowable.just(invoke);
                                    Iterator it = poll.iterator();
                                    boolean z5 = !isConvertibleToPublisher || executableMethod.hasAnnotation(Blocking.class);
                                    if (z5) {
                                        fromIterable.blockingSubscribe(obj -> {
                                            if (it.hasNext()) {
                                                handleResultFlowable(annotation, bean, executableMethod, consumer, (ConsumerRecord) it.next(), Flowable.just(obj), z5);
                                            }
                                        });
                                    } else {
                                        fromIterable.forEach(obj2 -> {
                                            if (it.hasNext()) {
                                                handleResultFlowable(annotation, bean, executableMethod, consumer, (ConsumerRecord) it.next(), Flowable.just(obj2), z5);
                                            }
                                        });
                                    }
                                }
                            } else {
                                DefaultExecutableBinder defaultExecutableBinder = new DefaultExecutableBinder(hashMap);
                                Iterator it2 = poll.iterator();
                                while (it2.hasNext()) {
                                    ConsumerRecord<?, ?> consumerRecord = (ConsumerRecord) it2.next();
                                    LOG.trace("Kafka consumer [{}] received record: {}", executableMethod, consumerRecord);
                                    if (z2) {
                                        hashMap2.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1, (String) null));
                                    }
                                    if (findFirst2.isPresent()) {
                                        hashMap.put(findFirst2.get(), () -> {
                                            consumer.commitSync(hashMap2);
                                        });
                                    }
                                    try {
                                        Object invoke2 = defaultExecutableBinder.bind(executableMethod, this.binderRegistry, consumerRecord).invoke(bean);
                                        if (invoke2 != null) {
                                            if (Publishers.isConvertibleToPublisher(invoke2)) {
                                                just = (Flowable) Publishers.convertPublisher(invoke2, Flowable.class);
                                                z = executableMethod.hasAnnotation(Blocking.class);
                                            } else {
                                                just = Flowable.just(invoke2);
                                                z = true;
                                            }
                                            handleResultFlowable(annotation, bean, executableMethod, consumer, consumerRecord, just, z);
                                        }
                                        if (offsetStrategy == OffsetStrategy.SYNC_PER_RECORD) {
                                            try {
                                                consumer.commitSync(hashMap2);
                                            } catch (CommitFailedException e3) {
                                                handleException(consumer, bean, consumerRecord, e3);
                                            }
                                        } else if (offsetStrategy == OffsetStrategy.ASYNC_PER_RECORD) {
                                            consumer.commitAsync(hashMap2, resolveCommitCallback(bean));
                                        }
                                    } catch (Throwable th2) {
                                        handleException(consumer, bean, consumerRecord, th2);
                                        z4 = true;
                                    }
                                }
                            }
                            if (!z4) {
                                if (offsetStrategy == OffsetStrategy.SYNC) {
                                    try {
                                        consumer.commitSync();
                                    } catch (CommitFailedException e4) {
                                        handleException(consumer, bean, null, e4);
                                    }
                                } else if (offsetStrategy == OffsetStrategy.ASYNC) {
                                    consumer.commitAsync(resolveCommitCallback(bean));
                                }
                            }
                        }
                    }
                } catch (WakeupException e5) {
                    try {
                        try {
                            if (offsetStrategy != OffsetStrategy.DISABLED) {
                                consumer.commitSync();
                            }
                        } catch (Throwable th3) {
                            LOG.warn("Error committing Kafka offsets on shutdown: {}", th3.getMessage(), th3);
                            consumer.close();
                        }
                    } catch (Throwable th4) {
                        consumer.close();
                        throw th4;
                    }
                } catch (Throwable th5) {
                    try {
                        try {
                            if (offsetStrategy != OffsetStrategy.DISABLED) {
                                consumer.commitSync();
                            }
                            consumer.close();
                        } catch (Throwable th6) {
                            LOG.warn("Error committing Kafka offsets on shutdown: {}", th6.getMessage(), th6);
                            consumer.close();
                            throw th5;
                        }
                        throw th5;
                    } finally {
                        consumer.close();
                    }
                }
            });
        }
    }

    @Override // java.lang.AutoCloseable
    @PreDestroy
    public void close() {
        Iterator<Consumer> it = this.consumers.values().iterator();
        while (it.hasNext()) {
            it.next().wakeup();
        }
        this.consumers.clear();
    }

    private void handleException(Consumer consumer, Object obj, ConsumerRecord<?, ?> consumerRecord, Throwable th) {
        handleException(obj, new KafkaListenerException(th, obj, (Consumer<?, ?>) consumer, consumerRecord));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void handleException(Object obj, KafkaListenerException kafkaListenerException) {
        if (obj instanceof KafkaListenerExceptionHandler) {
            ((KafkaListenerExceptionHandler) obj).handle(kafkaListenerException);
        } else {
            this.exceptionHandler.handle(kafkaListenerException);
        }
    }

    private void handleResultFlowable(AnnotationValue<KafkaListener> annotationValue, Object obj, ExecutableMethod<?, ?> executableMethod, Consumer consumer, ConsumerRecord<?, ?> consumerRecord, Flowable<?> flowable, boolean z) {
        Flowable onErrorResumeNext = flowable.subscribeOn(this.executorScheduler).flatMap(obj2 -> {
            String[] stringValues = executableMethod.stringValues(SendTo.class);
            if (!ArrayUtils.isNotEmpty(stringValues)) {
                return Flowable.empty();
            }
            Object key = consumerRecord.key();
            if (obj2 == null) {
                return Flowable.empty();
            }
            String str = (String) annotationValue.stringValue("groupId").orElse(null);
            Producer mo7getProducer = this.producerRegistry.mo7getProducer(StringUtils.isNotEmpty(str) ? str : null, Argument.of(key != null ? key.getClass() : byte[].class), Argument.of(obj2.getClass()));
            return Flowable.create(flowableEmitter -> {
                for (String str2 : stringValues) {
                    mo7getProducer.send(new ProducerRecord(str2, (Integer) null, key, obj2, consumerRecord.headers()), (recordMetadata, exc) -> {
                        if (exc != null) {
                            flowableEmitter.onError(exc);
                        } else {
                            flowableEmitter.onNext(recordMetadata);
                        }
                    });
                }
                flowableEmitter.onComplete();
            }, BackpressureStrategy.ERROR);
        }).onErrorResumeNext(th -> {
            handleException(obj, new KafkaListenerException("Error occurred processing record [" + consumerRecord + "] with Kafka reactive consumer [" + executableMethod + "]: " + th.getMessage(), th, obj, consumer, consumerRecord));
            if (annotationValue.isTrue("redelivery")) {
                LOG.debug("Attempting redelivery of record [{}] following error", consumerRecord);
                Object key = consumerRecord.key();
                Object value = consumerRecord.value();
                if (key != null && value != null) {
                    String str = (String) annotationValue.stringValue("groupId").orElse(null);
                    Producer mo7getProducer = this.producerRegistry.mo7getProducer(StringUtils.isNotEmpty(str) ? str : null, Argument.of(key.getClass()), Argument.of(value.getClass()));
                    ProducerRecord producerRecord = new ProducerRecord(consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), key, value, consumerRecord.headers());
                    return Flowable.create(flowableEmitter -> {
                        mo7getProducer.send(producerRecord, (recordMetadata, exc) -> {
                            if (exc != null) {
                                handleException(obj, new KafkaListenerException("Redelivery failed for record [" + consumerRecord + "] with Kafka reactive consumer [" + executableMethod + "]: " + th.getMessage(), th, obj, consumer, consumerRecord));
                                flowableEmitter.onComplete();
                            } else {
                                flowableEmitter.onNext(recordMetadata);
                                flowableEmitter.onComplete();
                            }
                        });
                    }, BackpressureStrategy.ERROR);
                }
            }
            return Flowable.empty();
        });
        if (z) {
            onErrorResumeNext.blockingSubscribe(recordMetadata -> {
                LOG.trace("Method [{}] produced record metadata: {}", executableMethod, recordMetadata);
            });
        } else {
            onErrorResumeNext.subscribe(recordMetadata2 -> {
                LOG.trace("Method [{}] produced record metadata: {}", executableMethod, recordMetadata2);
            });
        }
    }

    private Argument findBodyArgument(ExecutableMethod<?, ?> executableMethod) {
        return (Argument) Arrays.stream(executableMethod.getArguments()).filter(argument -> {
            return argument.getType() == ConsumerRecord.class || argument.getAnnotationMetadata().hasAnnotation(Body.class);
        }).findFirst().orElseGet(() -> {
            return (Argument) Arrays.stream(executableMethod.getArguments()).filter(argument2 -> {
                return !argument2.getAnnotationMetadata().hasStereotype(Bindable.class);
            }).findFirst().orElse(null);
        });
    }

    private void configureDeserializers(ExecutableMethod<?, ?> executableMethod, DefaultKafkaConsumerConfiguration defaultKafkaConsumerConfiguration) {
        Properties config = defaultKafkaConsumerConfiguration.getConfig();
        Argument findBodyArgument = findBodyArgument(executableMethod);
        if (!config.containsKey("key.deserializer") && !defaultKafkaConsumerConfiguration.getKeyDeserializer().isPresent()) {
            Optional findFirst = Arrays.stream(executableMethod.getArguments()).filter(argument -> {
                return argument.isAnnotationPresent(KafkaKey.class);
            }).findFirst();
            if (findFirst.isPresent()) {
                defaultKafkaConsumerConfiguration.setKeyDeserializer(this.serdeRegistry.pickDeserializer((Argument) findFirst.get()));
            } else if (findBodyArgument == null || !ConsumerRecord.class.isAssignableFrom(findBodyArgument.getType())) {
                defaultKafkaConsumerConfiguration.setKeyDeserializer(new ByteArrayDeserializer());
            } else {
                Optional typeVariable = findBodyArgument.getTypeVariable("K");
                if (typeVariable.isPresent()) {
                    defaultKafkaConsumerConfiguration.setKeyDeserializer(this.serdeRegistry.pickDeserializer((Argument) typeVariable.get()));
                } else {
                    defaultKafkaConsumerConfiguration.setKeyDeserializer(new ByteArrayDeserializer());
                }
            }
        }
        if (config.containsKey("value.deserializer") || defaultKafkaConsumerConfiguration.getValueDeserializer().isPresent()) {
            return;
        }
        if (findBodyArgument == null) {
            defaultKafkaConsumerConfiguration.setValueDeserializer(new StringDeserializer());
            return;
        }
        if (!ConsumerRecord.class.isAssignableFrom(findBodyArgument.getType())) {
            defaultKafkaConsumerConfiguration.setValueDeserializer(this.serdeRegistry.pickDeserializer(executableMethod.isTrue(KafkaListener.class, "batch") ? (Argument) findBodyArgument.getFirstTypeVariable().orElse(findBodyArgument) : findBodyArgument));
            return;
        }
        Optional typeVariable2 = findBodyArgument.getTypeVariable("V");
        if (typeVariable2.isPresent()) {
            defaultKafkaConsumerConfiguration.setValueDeserializer(this.serdeRegistry.pickDeserializer((Argument) typeVariable2.get()));
        } else {
            defaultKafkaConsumerConfiguration.setValueDeserializer(new StringDeserializer());
        }
    }

    private OffsetCommitCallback resolveCommitCallback(Object obj) {
        return (map, exc) -> {
            if (obj instanceof OffsetCommitCallback) {
                ((OffsetCommitCallback) obj).onComplete(map, exc);
            } else if (exc != null) {
                LOG.error("Error asynchronously committing Kafka offsets [{}]: {}", new Object[]{map, exc.getMessage(), exc});
            }
        };
    }

    public /* bridge */ /* synthetic */ void process(BeanDefinition beanDefinition, Object obj) {
        process((BeanDefinition<?>) beanDefinition, (ExecutableMethod<?, ?>) obj);
    }
}
