/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator;
import io.quarkiverse.kafkastreamsprocessor.impl.configuration.TopologyConfigurationImpl;
import io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.LoggedRecord;
import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapGetter;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import java.nio.charset.Charset;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import lombok.Generated;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Priority(value=100)
@Dependent
public class TracingDecorator
extends AbstractProcessorDecorator {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TracingDecorator.class);
    private final OpenTelemetry openTelemetry;
    private final KafkaTextMapGetter textMapGetter;
    private final Tracer tracer;
    private final String applicationName;
    private final JsonFormat.Printer jsonPrinter;
    private ProcessorContext context;

    @Inject
    public TracingDecorator(OpenTelemetry openTelemetry, KafkaTextMapGetter textMapGetter, Tracer tracer, TopologyConfigurationImpl configuration) {
        this(openTelemetry, textMapGetter, tracer, configuration.getProcessorPayloadType().getName(), JsonFormat.printer());
    }

    public TracingDecorator(OpenTelemetry openTelemetry, KafkaTextMapGetter textMapGetter, Tracer tracer, String applicationName, JsonFormat.Printer jsonPrinter) {
        this.openTelemetry = openTelemetry;
        this.textMapGetter = textMapGetter;
        this.tracer = tracer;
        this.applicationName = applicationName;
        this.jsonPrinter = jsonPrinter;
    }

    public void init(ProcessorContext context) {
        this.getDelegate().init(context);
        this.context = context;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(Record record) {
        SpanBuilder spanBuilder = this.tracer.spanBuilder(this.applicationName);
        TextMapPropagator propagator = this.openTelemetry.getPropagators().getTextMapPropagator();
        try (Scope parentScope = null;){
            if (propagator.fields().stream().map(arg_0 -> ((Headers)record.headers()).lastHeader(arg_0)).anyMatch(Objects::nonNull)) {
                Context extractedContext = propagator.extract(Context.current(), (Object)record.headers(), (TextMapGetter)this.textMapGetter);
                spanBuilder.setParent(extractedContext);
                propagator.fields().forEach(arg_0 -> ((Headers)record.headers()).remove(arg_0));
                parentScope = extractedContext.makeCurrent();
            }
            Span span = spanBuilder.startSpan();
            try (Scope ignored = span.makeCurrent();){
                try {
                    this.getDelegate().process(record);
                    span.setStatus(StatusCode.OK);
                }
                catch (KafkaException e) {
                    span.recordException((Throwable)e);
                    span.setStatus(StatusCode.ERROR, e.getMessage());
                    this.logInputMessageMetadata(record);
                    throw e;
                }
                catch (RuntimeException e) {
                    log.error("Runtime error caught while processing the message", (Throwable)e);
                    span.recordException((Throwable)e);
                    span.setStatus(StatusCode.ERROR, e.getMessage());
                    this.logInputMessageMetadata(record);
                }
            }
            finally {
                span.end();
            }
        }
    }

    void logInputMessageMetadata(Record record) {
        if (log.isDebugEnabled()) {
            Map<String, String> headers = TracingDecorator.toMap(record.headers());
            LoggedRecord.LoggedRecordBuilder builder = LoggedRecord.builder().headers(headers).id(headers.get("uuid")).time(ZonedDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneOffset.UTC)).appId(this.context.applicationId());
            this.marshallValue(record, builder);
            this.extractMetadata(builder);
            log.debug("Input message is {}", (Object)builder.build());
        }
    }

    private static Map<String, String> toMap(Headers headers) {
        return StreamSupport.stream(headers.spliterator(), false).collect(Collectors.toMap(Header::key, header -> new String(header.value(), Charset.defaultCharset())));
    }

    private void marshallValue(Record<?, ?> record, LoggedRecord.LoggedRecordBuilder builder) {
        if (record.value() instanceof MessageOrBuilder) {
            try {
                builder.value(this.jsonPrinter.print((MessageOrBuilder)record.value()));
            }
            catch (InvalidProtocolBufferException e) {
                log.error("Could not unmarshal to JSON", (Throwable)e);
            }
        } else {
            builder.value(record.value().toString());
        }
    }

    private void extractMetadata(LoggedRecord.LoggedRecordBuilder builder) {
        this.context.recordMetadata().ifPresent(metadata -> builder.topic(metadata.topic()).partition(metadata.partition()));
    }

    private static interface Excludes {
        public <KOut, VOut> void init(ProcessorContext<KOut, VOut> var1);

        public <KIn, VIn> void process(Record<KIn, VIn> var1);
    }
}

