package io.micronaut.configuration.kafka.intercept;

import io.micronaut.aop.InterceptedMethod;
import io.micronaut.aop.InterceptorBean;
import io.micronaut.aop.MethodInterceptor;
import io.micronaut.aop.MethodInvocationContext;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaPartition;
import io.micronaut.configuration.kafka.annotation.KafkaPartitionKey;
import io.micronaut.configuration.kafka.annotation.KafkaTimestamp;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.configuration.kafka.config.AbstractKafkaProducerConfiguration;
import io.micronaut.configuration.kafka.config.DefaultKafkaProducerConfiguration;
import io.micronaut.configuration.kafka.config.KafkaProducerConfiguration;
import io.micronaut.configuration.kafka.serde.SerdeRegistry;
import io.micronaut.context.BeanContext;
import io.micronaut.core.annotation.AnnotationMetadata;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.bind.annotation.Bindable;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.core.type.ReturnType;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.messaging.annotation.MessageHeader;
import io.micronaut.messaging.exceptions.MessagingClientException;
import jakarta.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Internal
@InterceptorBean({KafkaClient.class})
/* loaded from: input_file:io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.class */
class KafkaClientIntroductionAdvice implements MethodInterceptor<Object, Object>, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaClientIntroductionAdvice.class);
    private static final ContextSupplier NULL_SUPPLIER = obj -> {
        return null;
    };
    private final BeanContext beanContext;
    private final SerdeRegistry serdeRegistry;
    private final ConversionService conversionService;
    private final Map<ProducerKey, ProducerState> producerMap = new ConcurrentHashMap();

    /* renamed from: io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice$2, reason: invalid class name */
    /* loaded from: input_file:io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$io$micronaut$aop$InterceptedMethod$ResultType = new int[InterceptedMethod.ResultType.values().length];

        static {
            try {
                $SwitchMap$io$micronaut$aop$InterceptedMethod$ResultType[InterceptedMethod.ResultType.COMPLETION_STAGE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$micronaut$aop$InterceptedMethod$ResultType[InterceptedMethod.ResultType.PUBLISHER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$micronaut$aop$InterceptedMethod$ResultType[InterceptedMethod.ResultType.SYNCHRONOUS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice$ContextSupplier.class */
    public interface ContextSupplier<T> extends Function<MethodInvocationContext<?, ?>, T> {
        /* JADX WARN: Multi-variable type inference failed */
        default T get(MethodInvocationContext<?, ?> methodInvocationContext) {
            return apply(methodInvocationContext);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice$ProducerKey.class */
    public static final class ProducerKey {
        private final Object target;
        private final ExecutableMethod<?, ?> method;
        private final int hashCode;

        private ProducerKey(Object obj, ExecutableMethod<?, ?> executableMethod) {
            this.target = obj;
            this.method = executableMethod;
            this.hashCode = Objects.hash(obj, executableMethod);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ProducerKey producerKey = (ProducerKey) obj;
            return Objects.equals(this.target, producerKey.target) && Objects.equals(this.method, producerKey.method);
        }

        public int hashCode() {
            return this.hashCode;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice$ProducerState.class */
    public static final class ProducerState {
        private final Producer<?, ?> kafkaProducer;
        private final ContextSupplier<Object> keySupplier;
        private final ContextSupplier<String> topicSupplier;
        private final ContextSupplier<Object> valueSupplier;
        private final ContextSupplier<Long> timestampSupplier;
        private final ContextSupplier<Integer> partitionSupplier;
        private final ContextSupplier<Collection<Header>> headersSupplier;
        private final boolean transactional;
        private final String transactionalId;

        @Nullable
        private final Duration maxBlock;
        private final boolean isBatchSend;
        private final Argument<?> bodyArgument;

        private ProducerState(Producer<?, ?> producer, ContextSupplier<Object> contextSupplier, ContextSupplier<String> contextSupplier2, ContextSupplier<Object> contextSupplier3, ContextSupplier<Long> contextSupplier4, ContextSupplier<Integer> contextSupplier5, ContextSupplier<Collection<Header>> contextSupplier6, boolean z, @Nullable String str, @Nullable Duration duration, boolean z2, @Nullable Argument<?> argument) {
            this.kafkaProducer = producer;
            this.keySupplier = contextSupplier;
            this.topicSupplier = contextSupplier2;
            this.valueSupplier = contextSupplier3;
            this.timestampSupplier = contextSupplier4;
            this.partitionSupplier = contextSupplier5;
            this.headersSupplier = contextSupplier6;
            this.transactional = z;
            this.transactionalId = str;
            this.maxBlock = duration;
            this.isBatchSend = z2;
            this.bodyArgument = argument;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaClientIntroductionAdvice(BeanContext beanContext, SerdeRegistry serdeRegistry, ConversionService conversionService) {
        this.beanContext = beanContext;
        this.serdeRegistry = serdeRegistry;
        this.conversionService = conversionService;
    }

    public final Object intercept(MethodInvocationContext<Object, Object> methodInvocationContext) {
        if (!methodInvocationContext.hasAnnotation(KafkaClient.class)) {
            return methodInvocationContext.proceed();
        }
        if (!methodInvocationContext.hasAnnotation(KafkaClient.class)) {
            throw new IllegalStateException("No @KafkaClient annotation present on method: " + methodInvocationContext);
        }
        ProducerState producer = getProducer(methodInvocationContext);
        InterceptedMethod of = InterceptedMethod.of(methodInvocationContext, this.beanContext.getConversionService());
        try {
            Argument<?> returnTypeValue = of.returnTypeValue();
            if (Argument.OBJECT_ARGUMENT.equalsType(returnTypeValue)) {
                returnTypeValue = Argument.of(RecordMetadata.class);
            }
            switch (AnonymousClass2.$SwitchMap$io$micronaut$aop$InterceptedMethod$ResultType[of.resultType().ordinal()]) {
                case 1:
                    return of.handleResult(returnCompletableFuture(methodInvocationContext, producer, returnTypeValue));
                case 2:
                    return of.handleResult(returnPublisher(methodInvocationContext, producer, returnTypeValue));
                case 3:
                    return returnSynchronous(methodInvocationContext, producer);
                default:
                    return of.unsupported();
            }
        } catch (Exception e) {
            return of.handleException(e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v54, types: [java.lang.Iterable] */
    private Object returnSynchronous(MethodInvocationContext<Object, Object> methodInvocationContext, ProducerState producerState) {
        Object orElseGet;
        ReturnType returnType = methodInvocationContext.getReturnType();
        Class<?> type = returnType.getType();
        Argument<?> asArgument = returnType.asArgument();
        Object obj = producerState.valueSupplier.get(methodInvocationContext);
        if (obj != null && Publishers.isConvertibleToPublisher(obj.getClass())) {
            Flux<Object> buildSendFluxForReactiveValue = buildSendFluxForReactiveValue(methodInvocationContext, producerState, asArgument, obj);
            return Iterable.class.isAssignableFrom(type) ? this.conversionService.convert(buildSendFluxForReactiveValue.collectList().block(), asArgument).orElse(null) : Void.TYPE.isAssignableFrom(type) ? buildSendFluxForReactiveValue.next().block() : this.conversionService.convert(buildSendFluxForReactiveValue.blockFirst(), asArgument).orElse(null);
        }
        boolean z = producerState.transactional;
        Producer<?, ?> producer = producerState.kafkaProducer;
        if (z) {
            try {
                LOG.trace("Beginning transaction for producer: {}", producerState.transactionalId);
                producer.beginTransaction();
            } catch (Exception e) {
                if (z) {
                    LOG.trace("Aborting transaction for producer: {}", producerState.transactionalId);
                    producer.abortTransaction();
                }
                throw wrapException(methodInvocationContext, e);
            }
        }
        if (producerState.isBatchSend) {
            List singletonList = (obj == null || !obj.getClass().isArray()) ? obj instanceof Iterable ? (Iterable) obj : Collections.singletonList(obj) : Arrays.asList((Object[]) obj);
            ArrayList arrayList = new ArrayList();
            Iterator it = singletonList.iterator();
            while (it.hasNext()) {
                ProducerRecord<?, ?> buildProducerRecord = buildProducerRecord(methodInvocationContext, producerState, it.next());
                if (LOG.isTraceEnabled()) {
                    LOG.trace("@KafkaClient method [" + logMethod(methodInvocationContext) + "] Sending producer record: " + buildProducerRecord);
                }
                arrayList.add(producerState.maxBlock != null ? producer.send(buildProducerRecord).get(producerState.maxBlock.toMillis(), TimeUnit.MILLISECONDS) : producer.send(buildProducerRecord).get());
            }
            orElseGet = this.conversionService.convert(arrayList, asArgument).orElseGet(() -> {
                if (type == producerState.bodyArgument.getType()) {
                    return obj;
                }
                return null;
            });
        } else {
            ProducerRecord<?, ?> buildProducerRecord2 = buildProducerRecord(methodInvocationContext, producerState, obj);
            if (LOG.isTraceEnabled()) {
                LOG.trace("@KafkaClient method [{}] Sending producer record: {}", logMethod(methodInvocationContext), buildProducerRecord2);
            }
            orElseGet = this.conversionService.convert(producerState.maxBlock != null ? producer.send(buildProducerRecord2).get(producerState.maxBlock.toMillis(), TimeUnit.MILLISECONDS) : producer.send(buildProducerRecord2).get(), asArgument).orElseGet(() -> {
                if (type == producerState.bodyArgument.getType()) {
                    return obj;
                }
                return null;
            });
        }
        if (z) {
            LOG.trace("Committing transaction for producer: {}", producerState.transactionalId);
            producer.commitTransaction();
        }
        return orElseGet;
    }

    private Flux<Object> returnPublisher(MethodInvocationContext<Object, Object> methodInvocationContext, ProducerState producerState, Argument<?> argument) {
        Flux<Object> buildSendFlux;
        Object obj = producerState.valueSupplier.get(methodInvocationContext);
        if (obj != null && Publishers.isConvertibleToPublisher(obj.getClass())) {
            buildSendFlux = buildSendFluxForReactiveValue(methodInvocationContext, producerState, argument, obj);
        } else if (producerState.isBatchSend) {
            Object asList = (obj == null || !obj.getClass().isArray()) ? obj : Arrays.asList((Object[]) obj);
            buildSendFlux = (asList instanceof Iterable ? Flux.fromIterable((Iterable) asList) : Flux.just(asList)).flatMap(obj2 -> {
                return buildSendFlux(methodInvocationContext, producerState, obj2, argument);
            });
        } else {
            buildSendFlux = buildSendFlux(methodInvocationContext, producerState, obj, argument);
        }
        return buildSendFlux;
    }

    private CompletableFuture<Object> returnCompletableFuture(final MethodInvocationContext<Object, Object> methodInvocationContext, ProducerState producerState, Argument<?> argument) {
        final CompletableFuture<Object> completableFuture = new CompletableFuture<>();
        Object obj = producerState.valueSupplier.get(methodInvocationContext);
        if (obj != null && Publishers.isConvertibleToPublisher(obj.getClass())) {
            Flux<Object> buildSendFluxForReactiveValue = buildSendFluxForReactiveValue(methodInvocationContext, producerState, argument, obj);
            if (!Publishers.isSingle(obj.getClass())) {
                buildSendFluxForReactiveValue = buildSendFluxForReactiveValue.collectList().flux();
            }
            buildSendFluxForReactiveValue.subscribe(new Subscriber() { // from class: io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice.1
                boolean completed = false;

                public void onSubscribe(Subscription subscription) {
                    subscription.request(1L);
                }

                public void onNext(Object obj2) {
                    completableFuture.complete(obj2);
                    this.completed = true;
                }

                public void onError(Throwable th) {
                    completableFuture.completeExceptionally(KafkaClientIntroductionAdvice.this.wrapException(methodInvocationContext, th));
                }

                public void onComplete() {
                    if (this.completed) {
                        return;
                    }
                    completableFuture.complete(null);
                }
            });
        } else {
            ProducerRecord<?, ?> buildProducerRecord = buildProducerRecord(methodInvocationContext, producerState, obj);
            if (LOG.isTraceEnabled()) {
                LOG.trace("@KafkaClient method [" + logMethod(methodInvocationContext) + "] Sending producer record: " + buildProducerRecord);
            }
            boolean z = producerState.transactional;
            Producer<?, ?> producer = producerState.kafkaProducer;
            if (z) {
                try {
                    LOG.trace("Beginning transaction for producer: {}", producerState.transactionalId);
                    producer.beginTransaction();
                } catch (Exception e) {
                    if (z) {
                        LOG.trace("Aborting transaction for producer: {}", producerState.transactionalId);
                        producer.abortTransaction();
                    }
                    throw e;
                }
            }
            producer.send(buildProducerRecord, (recordMetadata, exc) -> {
                if (exc != null) {
                    completableFuture.completeExceptionally(wrapException(methodInvocationContext, exc));
                    return;
                }
                if (argument.equalsType(Argument.VOID_OBJECT)) {
                    completableFuture.complete(null);
                    return;
                }
                Optional convert = this.conversionService.convert(recordMetadata, argument);
                if (convert.isPresent()) {
                    completableFuture.complete(convert.get());
                } else if (argument.getType() == producerState.bodyArgument.getType()) {
                    completableFuture.complete(obj);
                }
            });
            if (z) {
                LOG.trace("Committing transaction for producer: {}", producerState.transactionalId);
                producer.commitTransaction();
            }
        }
        return completableFuture;
    }

    private Mono<RecordMetadata> producerSend(Producer<?, ?> producer, ProducerRecord producerRecord) {
        return Mono.create(monoSink -> {
            producer.send(producerRecord, (recordMetadata, exc) -> {
                if (exc != null) {
                    monoSink.error(exc);
                } else {
                    monoSink.success(recordMetadata);
                }
            });
        });
    }

    @Override // java.lang.AutoCloseable
    @PreDestroy
    public final void close() {
        try {
            Iterator<ProducerState> it = this.producerMap.values().iterator();
            while (it.hasNext()) {
                try {
                    it.next().kafkaProducer.close();
                } catch (Exception e) {
                    LOG.warn("Error closing Kafka producer: {}", e.getMessage(), e);
                }
            }
        } finally {
            this.producerMap.clear();
        }
    }

    private Flux<Object> buildSendFlux(MethodInvocationContext<Object, Object> methodInvocationContext, ProducerState producerState, Object obj, Argument<?> argument) {
        ProducerRecord<?, ?> buildProducerRecord = buildProducerRecord(methodInvocationContext, producerState, obj);
        return Flux.defer(() -> {
            boolean z = producerState.transactional;
            Producer<?, ?> producer = producerState.kafkaProducer;
            if (z) {
                LOG.trace("Committing transaction for producer: {}", producerState.transactionalId);
                producer.beginTransaction();
            }
            Mono onErrorMap = producerSend(producer, buildProducerRecord).map(recordMetadata -> {
                return convertResult(recordMetadata, argument, obj, producerState.bodyArgument);
            }).onErrorMap(th -> {
                return wrapException(methodInvocationContext, th);
            });
            return z ? addTransactionalProcessing(producerState, onErrorMap.flux()) : onErrorMap;
        });
    }

    private Flux<Object> buildSendFluxForReactiveValue(MethodInvocationContext<Object, Object> methodInvocationContext, ProducerState producerState, Argument<?> argument, Object obj) {
        Flux from = Flux.from((Publisher) Publishers.convertPublisher(this.beanContext.getConversionService(), obj, Publisher.class));
        if (Iterable.class.isAssignableFrom(argument.getType())) {
            argument = (Argument) argument.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT);
        }
        boolean z = producerState.transactional;
        Producer<?, ?> producer = producerState.kafkaProducer;
        if (z) {
            LOG.trace("Beginning transaction for producer: {}", producerState.transactionalId);
            producer.beginTransaction();
        }
        Argument<?> argument2 = argument;
        Flux<Object> flatMap = from.flatMap(obj2 -> {
            ProducerRecord<?, ?> buildProducerRecord = buildProducerRecord(methodInvocationContext, producerState, obj2);
            if (LOG.isTraceEnabled()) {
                LOG.trace("@KafkaClient method [{}] Sending producer record: {}", logMethod(methodInvocationContext), buildProducerRecord);
            }
            return producerSend(producer, buildProducerRecord).map(recordMetadata -> {
                return convertResult(recordMetadata, argument2, obj2, producerState.bodyArgument);
            }).onErrorMap(th -> {
                return wrapException(methodInvocationContext, th);
            });
        });
        if (z) {
            flatMap = addTransactionalProcessing(producerState, flatMap);
        }
        if (producerState.maxBlock != null) {
            flatMap = flatMap.timeout(producerState.maxBlock);
        }
        return flatMap;
    }

    private Flux<Object> addTransactionalProcessing(ProducerState producerState, Flux<Object> flux) {
        return flux.doOnError(th -> {
            LOG.trace("Aborting transaction for producer: {}", producerState.transactionalId);
            producerState.kafkaProducer.abortTransaction();
        }).doOnComplete(() -> {
            LOG.trace("Committing transaction for producer: {}", producerState.transactionalId);
            producerState.kafkaProducer.commitTransaction();
        });
    }

    private Object convertResult(RecordMetadata recordMetadata, Argument<?> argument, Object obj, Argument<?> argument2) {
        if (!argument.isVoid() && !RecordMetadata.class.isAssignableFrom(argument.getType())) {
            return argument.getType() == argument2.getType() ? obj : this.conversionService.convertRequired(recordMetadata, argument);
        }
        return recordMetadata;
    }

    private MessagingClientException wrapException(MethodInvocationContext<Object, Object> methodInvocationContext, Throwable th) {
        return new MessagingClientException("Exception sending producer record for method [" + methodInvocationContext + "]: " + th.getMessage(), th);
    }

    private ProducerRecord<?, ?> buildProducerRecord(MethodInvocationContext<Object, Object> methodInvocationContext, ProducerState producerState, Object obj) {
        Supplier supplier = () -> {
            return producerState.topicSupplier.get(methodInvocationContext);
        };
        Supplier supplier2 = () -> {
            return producerState.partitionSupplier.get(methodInvocationContext);
        };
        Supplier supplier3 = () -> {
            return producerState.timestampSupplier.get(methodInvocationContext);
        };
        Supplier supplier4 = () -> {
            return producerState.keySupplier.get(methodInvocationContext);
        };
        Collection<Header> collection = producerState.headersSupplier.get(methodInvocationContext);
        Optional ofNullable = Optional.ofNullable(obj);
        Class<ProducerRecord> cls = ProducerRecord.class;
        Objects.requireNonNull(ProducerRecord.class);
        Optional filter = ofNullable.filter(cls::isInstance);
        Class<ProducerRecord> cls2 = ProducerRecord.class;
        Objects.requireNonNull(ProducerRecord.class);
        return (ProducerRecord) filter.map(cls2::cast).map(producerRecord -> {
            return new ProducerRecord((String) Optional.ofNullable(producerRecord.topic()).filter(Predicate.not((v0) -> {
                return v0.isEmpty();
            })).orElseGet(supplier), (Integer) Optional.ofNullable(producerRecord.partition()).orElseGet(supplier2), (Long) Optional.ofNullable(producerRecord.timestamp()).orElseGet(supplier3), Optional.ofNullable(producerRecord.key()).orElseGet(supplier4), producerRecord.value(), collection == null ? producerRecord.headers() : Stream.concat(StreamSupport.stream(collection.spliterator(), false), Stream.of((Object[]) producerRecord.headers().toArray())).toList());
        }).orElseGet(() -> {
            return new ProducerRecord((String) supplier.get(), (Integer) supplier2.get(), (Long) supplier3.get(), supplier4.get(), obj, collection);
        });
    }

    private ProducerState getProducer(MethodInvocationContext<?, ?> methodInvocationContext) {
        return this.producerMap.computeIfAbsent(new ProducerKey(methodInvocationContext.getTarget(), methodInvocationContext.getExecutableMethod()), producerKey -> {
            AbstractKafkaProducerConfiguration abstractKafkaProducerConfiguration;
            String str = (String) methodInvocationContext.stringValue(KafkaClient.class).orElse(null);
            LinkedList linkedList = new LinkedList();
            List<AnnotationValue> annotationValuesByType = methodInvocationContext.getAnnotationValuesByType(MessageHeader.class);
            if (!annotationValuesByType.isEmpty()) {
                ArrayList arrayList = new ArrayList(annotationValuesByType.size());
                for (AnnotationValue annotationValue : annotationValuesByType) {
                    String str2 = (String) annotationValue.stringValue("name").orElse(null);
                    String str3 = (String) annotationValue.stringValue().orElse(null);
                    if (StringUtils.isNotEmpty(str2) && StringUtils.isNotEmpty(str3)) {
                        arrayList.add(new RecordHeader(str2, str3.getBytes(StandardCharsets.UTF_8)));
                    }
                }
                if (!arrayList.isEmpty()) {
                    linkedList.add(methodInvocationContext2 -> {
                        return arrayList;
                    });
                }
            }
            Argument argument = null;
            Argument argument2 = null;
            ContextSupplier[] contextSupplierArr = {methodInvocationContext3 -> {
                return (String) methodInvocationContext3.stringValue(Topic.class).filter((v0) -> {
                    return StringUtils.isNotEmpty(v0);
                }).orElseThrow(() -> {
                    return new MessagingClientException("No topic specified for method: " + methodInvocationContext);
                });
            }};
            ContextSupplier contextSupplier = NULL_SUPPLIER;
            ContextSupplier contextSupplier2 = NULL_SUPPLIER;
            ContextSupplier contextSupplier3 = NULL_SUPPLIER;
            BiFunction biFunction = (methodInvocationContext4, producer) -> {
                return null;
            };
            Argument[] arguments = methodInvocationContext.getArguments();
            for (int i = 0; i < arguments.length; i++) {
                int i2 = i;
                Argument argument3 = arguments[i];
                if (ProducerRecord.class.isAssignableFrom(argument3.getType()) || argument3.isAnnotationPresent(MessageBody.class)) {
                    argument2 = argument3.isAsyncOrReactive() ? (Argument) argument3.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT) : argument3;
                    contextSupplier2 = methodInvocationContext5 -> {
                        return methodInvocationContext5.getParameterValues()[i2];
                    };
                } else if (argument3.isAnnotationPresent(KafkaKey.class)) {
                    argument = argument3;
                    contextSupplier = methodInvocationContext6 -> {
                        return methodInvocationContext6.getParameterValues()[i2];
                    };
                } else if (argument3.isAnnotationPresent(Topic.class)) {
                    ContextSupplier contextSupplier4 = contextSupplierArr[0];
                    contextSupplierArr[0] = methodInvocationContext7 -> {
                        Object obj = methodInvocationContext7.getParameterValues()[i2];
                        if (obj != null) {
                            String obj2 = obj.toString();
                            if (StringUtils.isNotEmpty(obj2)) {
                                return obj2;
                            }
                        }
                        return (String) contextSupplier4.get(methodInvocationContext7);
                    };
                } else if (argument3.isAnnotationPresent(KafkaTimestamp.class)) {
                    contextSupplier3 = methodInvocationContext8 -> {
                        Object obj = methodInvocationContext8.getParameterValues()[i2];
                        if (obj instanceof Long) {
                            return (Long) obj;
                        }
                        return null;
                    };
                } else if (argument3.isAnnotationPresent(KafkaPartition.class)) {
                    biFunction = (methodInvocationContext9, producer2) -> {
                        Object obj = methodInvocationContext9.getParameterValues()[i2];
                        if (obj == null || !Integer.class.isAssignableFrom(obj.getClass())) {
                            return null;
                        }
                        return (Integer) obj;
                    };
                } else if (argument3.isAnnotationPresent(KafkaPartitionKey.class)) {
                    biFunction = (methodInvocationContext10, producer3) -> {
                        Object obj = methodInvocationContext10.getParameterValues()[i2];
                        if (obj == null) {
                            return null;
                        }
                        ByteArraySerializer pickSerializer = this.serdeRegistry.pickSerializer(argument3);
                        if (pickSerializer == null) {
                            pickSerializer = new ByteArraySerializer();
                        }
                        String str4 = (String) contextSupplierArr[0].get(methodInvocationContext10);
                        return Integer.valueOf(Utils.toPositive(Utils.murmur2(pickSerializer.serialize(str4, obj))) % producer3.partitionsFor(str4).size());
                    };
                } else if (argument3.isAnnotationPresent(MessageHeader.class)) {
                    AnnotationMetadata annotationMetadata = argument3.getAnnotationMetadata();
                    String str4 = (String) annotationMetadata.stringValue(MessageHeader.class, "name").orElseGet(() -> {
                        Optional stringValue = annotationMetadata.stringValue(MessageHeader.class);
                        Objects.requireNonNull(argument3);
                        return (String) stringValue.orElseGet(argument3::getName);
                    });
                    linkedList.add(methodInvocationContext11 -> {
                        Serializer pickSerializer;
                        Object obj = methodInvocationContext11.getParameterValues()[i2];
                        if (obj == null || (pickSerializer = this.serdeRegistry.pickSerializer(argument3)) == null) {
                            return Collections.emptySet();
                        }
                        try {
                            return Collections.singleton(new RecordHeader(str4, pickSerializer.serialize((String) null, obj)));
                        } catch (Exception e) {
                            throw new MessagingClientException("Cannot serialize header argument [" + argument3 + "] for method [" + methodInvocationContext11 + "]: " + e.getMessage(), e);
                        }
                    });
                } else if (argument3.isContainerType() && Header.class.isAssignableFrom(((Argument) argument3.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT)).getType())) {
                    linkedList.add(methodInvocationContext12 -> {
                        Collection collection = (Collection) methodInvocationContext12.getParameterValues()[i2];
                        return collection != null ? collection : Collections.emptySet();
                    });
                } else {
                    Class type = argument3.getType();
                    if (type == Headers.class || type == RecordHeaders.class) {
                        linkedList.add(methodInvocationContext13 -> {
                            Headers headers = (Headers) methodInvocationContext13.getParameterValues()[i2];
                            return headers != null ? headers : Collections.emptySet();
                        });
                    }
                }
            }
            if (argument2 == null) {
                int i3 = 0;
                while (true) {
                    if (i3 >= arguments.length) {
                        break;
                    }
                    int i4 = i3;
                    Argument argument4 = arguments[i3];
                    if (argument4.getAnnotationMetadata().hasStereotype(Bindable.class)) {
                        i3++;
                    } else {
                        argument2 = argument4.isAsyncOrReactive() ? (Argument) argument4.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT) : argument4;
                        contextSupplier2 = methodInvocationContext14 -> {
                            return methodInvocationContext14.getParameterValues()[i4];
                        };
                    }
                }
                if (argument2 == null) {
                    throw new MessagingClientException("No valid message body argument found for method: " + methodInvocationContext);
                }
            }
            if (str != null) {
                Optional findBean = this.beanContext.findBean(KafkaProducerConfiguration.class, Qualifiers.byName(str));
                abstractKafkaProducerConfiguration = findBean.isPresent() ? (AbstractKafkaProducerConfiguration) findBean.get() : (AbstractKafkaProducerConfiguration) this.beanContext.getBean(AbstractKafkaProducerConfiguration.class);
            } else {
                abstractKafkaProducerConfiguration = (AbstractKafkaProducerConfiguration) this.beanContext.getBean(AbstractKafkaProducerConfiguration.class);
            }
            DefaultKafkaProducerConfiguration defaultKafkaProducerConfiguration = new DefaultKafkaProducerConfiguration(abstractKafkaProducerConfiguration);
            Properties config = defaultKafkaProducerConfiguration.getConfig();
            String str5 = (String) methodInvocationContext.stringValue(KafkaClient.class, "transactionalId").filter((v0) -> {
                return StringUtils.isNotEmpty(v0);
            }).orElse(null);
            if (str != null) {
                config.putIfAbsent("client.id", str);
            }
            if (str5 != null) {
                config.putIfAbsent("transactional.id", str5);
            }
            methodInvocationContext.getValue(KafkaClient.class, "maxBlock", Duration.class).ifPresent(duration -> {
                config.put("max.block.ms", String.valueOf(duration.toMillis()));
            });
            Integer valueOf = Integer.valueOf(methodInvocationContext.intValue(KafkaClient.class, "acks").orElse(KafkaClient.Acknowledge.DEFAULT));
            if (valueOf.intValue() != Integer.MIN_VALUE) {
                config.put("acks", valueOf.intValue() == -1 ? "all" : String.valueOf(valueOf));
            }
            Optional map = methodInvocationContext.findAnnotation(KafkaClient.class).map(annotationValue2 -> {
                return annotationValue2.getProperties("properties", "name");
            });
            Objects.requireNonNull(config);
            map.ifPresent(config::putAll);
            LOG.debug("Creating new KafkaProducer.");
            if (!config.containsKey("key.serializer") && ((Serializer) defaultKafkaProducerConfiguration.getKeySerializer().orElse(null)) == null) {
                Serializer pickSerializer = argument != null ? this.serdeRegistry.pickSerializer(argument) : new ByteArraySerializer();
                LOG.debug("Using Kafka key serializer: {}", pickSerializer);
                defaultKafkaProducerConfiguration.setKeySerializer(pickSerializer);
            }
            boolean isTrue = methodInvocationContext.isTrue(KafkaClient.class, "batch");
            if (!config.containsKey("value.serializer") && ((Serializer) defaultKafkaProducerConfiguration.getValueSerializer().orElse(null)) == null) {
                Serializer pickSerializer2 = this.serdeRegistry.pickSerializer(isTrue ? (Argument) argument2.getFirstTypeVariable().orElse(argument2) : argument2);
                LOG.debug("Using Kafka value serializer: {}", pickSerializer2);
                defaultKafkaProducerConfiguration.setValueSerializer(pickSerializer2);
            }
            Producer producer4 = (Producer) this.beanContext.createBean(Producer.class, new Object[]{defaultKafkaProducerConfiguration});
            boolean isNotEmpty = StringUtils.isNotEmpty(str5);
            ContextSupplier contextSupplier5 = methodInvocationContext.isTrue(KafkaClient.class, "timestamp") ? methodInvocationContext15 -> {
                return Long.valueOf(System.currentTimeMillis());
            } : contextSupplier3;
            Duration duration2 = (Duration) methodInvocationContext.getValue(KafkaClient.class, "maxBlock", Duration.class).orElse(null);
            if (isNotEmpty) {
                producer4.initTransactions();
            }
            ContextSupplier contextSupplier6 = methodInvocationContext16 -> {
                if (linkedList.isEmpty()) {
                    return null;
                }
                ArrayList arrayList2 = new ArrayList(linkedList.size());
                Iterator it = linkedList.iterator();
                while (it.hasNext()) {
                    Iterator it2 = ((Iterable) ((ContextSupplier) it.next()).get(methodInvocationContext16)).iterator();
                    while (it2.hasNext()) {
                        arrayList2.add((Header) it2.next());
                    }
                }
                if (arrayList2.isEmpty()) {
                    return null;
                }
                return arrayList2;
            };
            BiFunction biFunction2 = biFunction;
            return new ProducerState(producer4, contextSupplier, contextSupplierArr[0], contextSupplier2, contextSupplier5, methodInvocationContext17 -> {
                return (Integer) biFunction2.apply(methodInvocationContext17, producer4);
            }, contextSupplier6, isNotEmpty, str5, duration2, isTrue, argument2);
        });
    }

    private static String logMethod(ExecutableMethod<?, ?> executableMethod) {
        return executableMethod.getDeclaringType().getSimpleName() + "#" + executableMethod.getName();
    }
}
