package io.opentelemetry.instrumentation.kafkaclients.v2_6;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerRecordGetter;
import io.opentelemetry.instrumentation.kafka.internal.KafkaHeadersSetter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;

/* loaded from: input_file:io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.class */
public final class KafkaTelemetry {
    private static final Logger logger = Logger.getLogger(KafkaTelemetry.class.getName());
    private static final TextMapGetter<ConsumerRecord<?, ?>> GETTER = KafkaConsumerRecordGetter.INSTANCE;
    private static final TextMapSetter<Headers> SETTER = KafkaHeadersSetter.INSTANCE;
    private final OpenTelemetry openTelemetry;
    private final Instrumenter<ProducerRecord<?, ?>, RecordMetadata> producerInstrumenter;
    private final Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter;
    private final boolean producerPropagationEnabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry$ProducerCallback.class */
    public class ProducerCallback implements Callback {
        private final Callback callback;
        private final Context parentContext;
        private final Context context;
        private final ProducerRecord<?, ?> request;

        public ProducerCallback(Callback callback, Context context, Context context2, ProducerRecord<?, ?> producerRecord) {
            this.callback = callback;
            this.parentContext = context;
            this.context = context2;
            this.request = producerRecord;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            KafkaTelemetry.this.producerInstrumenter.end(this.context, this.request, recordMetadata, exc);
            if (this.callback != null) {
                Scope makeCurrent = this.parentContext.makeCurrent();
                try {
                    this.callback.onCompletion(recordMetadata, exc);
                    if (makeCurrent != null) {
                        makeCurrent.close();
                    }
                } catch (Throwable th) {
                    if (makeCurrent != null) {
                        try {
                            makeCurrent.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaTelemetry(OpenTelemetry openTelemetry, Instrumenter<ProducerRecord<?, ?>, RecordMetadata> instrumenter, Instrumenter<ConsumerRecord<?, ?>, Void> instrumenter2, boolean z) {
        this.openTelemetry = openTelemetry;
        this.producerInstrumenter = instrumenter;
        this.consumerProcessInstrumenter = instrumenter2;
        this.producerPropagationEnabled = z;
    }

    public static KafkaTelemetry create(OpenTelemetry openTelemetry) {
        return builder(openTelemetry).build();
    }

    public static KafkaTelemetryBuilder builder(OpenTelemetry openTelemetry) {
        return new KafkaTelemetryBuilder(openTelemetry);
    }

    private TextMapPropagator propagator() {
        return this.openTelemetry.getPropagators().getTextMapPropagator();
    }

    public <K, V> Producer<K, V> wrap(Producer<K, V> producer) {
        return (Producer) Proxy.newProxyInstance(KafkaTelemetry.class.getClassLoader(), new Class[]{Producer.class}, (obj, method, objArr) -> {
            if (!"send".equals(method.getName()) || method.getParameterCount() < 1 || method.getParameterTypes()[0] != ProducerRecord.class) {
                try {
                    return method.invoke(producer, objArr);
                } catch (InvocationTargetException e) {
                    throw e.getCause();
                }
            }
            ProducerRecord producerRecord = (ProducerRecord) objArr[0];
            Callback callback = (method.getParameterCount() < 2 || method.getParameterTypes()[1] != Callback.class) ? null : (Callback) objArr[1];
            Objects.requireNonNull(producer);
            return buildAndInjectSpan(producerRecord, callback, producer::send);
        });
    }

    public <K, V> Consumer<K, V> wrap(Consumer<K, V> consumer) {
        return (Consumer) Proxy.newProxyInstance(KafkaTelemetry.class.getClassLoader(), new Class[]{Consumer.class}, (obj, method, objArr) -> {
            try {
                Object invoke = method.invoke(consumer, objArr);
                if ("poll".equals(method.getName()) && (invoke instanceof ConsumerRecords)) {
                    buildAndFinishSpan((ConsumerRecords) invoke);
                }
                return invoke;
            } catch (InvocationTargetException e) {
                throw e.getCause();
            }
        });
    }

    public Map<String, ?> metricConfigProperties() {
        HashMap hashMap = new HashMap();
        hashMap.put("metric.reporters", OpenTelemetryMetricsReporter.class.getName());
        hashMap.put("opentelemetry.supplier", new OpenTelemetrySupplier(this.openTelemetry));
        hashMap.put("opentelemetry.instrumentation_name", "io.opentelemetry.kafka-clients-2.6");
        return Collections.unmodifiableMap(hashMap);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <K, V> void buildAndInjectSpan(ProducerRecord<K, V> producerRecord) {
        Context current = Context.current();
        if (this.producerInstrumenter.shouldStart(current, producerRecord)) {
            Context start = this.producerInstrumenter.start(current, producerRecord);
            if (this.producerPropagationEnabled) {
                try {
                    propagator().inject(start, producerRecord.headers(), SETTER);
                } catch (Throwable th) {
                    logger.log(Level.WARNING, "failed to inject span context. sending record second time?", th);
                }
            }
            this.producerInstrumenter.end(start, producerRecord, (Object) null, (Throwable) null);
        }
    }

    <K, V> Future<RecordMetadata> buildAndInjectSpan(ProducerRecord<K, V> producerRecord, Callback callback, BiFunction<ProducerRecord<K, V>, Callback, Future<RecordMetadata>> biFunction) {
        Context current = Context.current();
        if (!this.producerInstrumenter.shouldStart(current, producerRecord)) {
            return biFunction.apply(producerRecord, callback);
        }
        Context start = this.producerInstrumenter.start(current, producerRecord);
        Scope makeCurrent = start.makeCurrent();
        try {
            propagator().inject(start, producerRecord.headers(), SETTER);
            Future<RecordMetadata> apply = biFunction.apply(producerRecord, new ProducerCallback(callback, current, start, producerRecord));
            if (makeCurrent != null) {
                makeCurrent.close();
            }
            return apply;
        } catch (Throwable th) {
            if (makeCurrent != null) {
                try {
                    makeCurrent.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <K, V> void buildAndFinishSpan(ConsumerRecords<K, V> consumerRecords) {
        Context current = Context.current();
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            Context with = current.with(Span.fromContext(propagator().extract(current, consumerRecord, GETTER)));
            if (this.consumerProcessInstrumenter.shouldStart(with, consumerRecord)) {
                this.consumerProcessInstrumenter.end(this.consumerProcessInstrumenter.start(with, consumerRecord), consumerRecord, (Object) null, (Throwable) null);
            }
        }
    }
}
