package io.opentelemetry.instrumentation.reactor;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.instrumentation.api.tracer.async.AsyncSpanEndStrategy;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategy.class */
public enum ReactorAsyncSpanEndStrategy implements AsyncSpanEndStrategy {
    INSTANCE;

    /* loaded from: input_file:io/opentelemetry/instrumentation/reactor/ReactorAsyncSpanEndStrategy$EndOnFirstNotificationConsumer.class */
    private static final class EndOnFirstNotificationConsumer extends AtomicBoolean implements Runnable, Consumer<Throwable> {
        private final BaseTracer tracer;
        private final Context context;

        public EndOnFirstNotificationConsumer(BaseTracer baseTracer, Context context) {
            super(false);
            this.tracer = baseTracer;
            this.context = context;
        }

        public <T> void onSuccess(T t) {
            accept((Throwable) null);
        }

        @Override // java.lang.Runnable
        public void run() {
            accept((Throwable) null);
        }

        @Override // java.util.function.Consumer
        public void accept(Throwable th) {
            if (compareAndSet(false, true)) {
                if (th != null) {
                    this.tracer.endExceptionally(this.context, th);
                } else {
                    this.tracer.end(this.context);
                }
            }
        }
    }

    public boolean supports(Class<?> cls) {
        return cls == Publisher.class || cls == Mono.class || cls == Flux.class;
    }

    public Object end(BaseTracer baseTracer, Context context, Object obj) {
        EndOnFirstNotificationConsumer endOnFirstNotificationConsumer = new EndOnFirstNotificationConsumer(baseTracer, context);
        if (!(obj instanceof Mono)) {
            return Flux.from((Publisher) obj).doOnError(endOnFirstNotificationConsumer).doOnComplete(endOnFirstNotificationConsumer);
        }
        Mono doOnError = ((Mono) obj).doOnError(endOnFirstNotificationConsumer);
        Objects.requireNonNull(endOnFirstNotificationConsumer);
        return doOnError.doOnSuccess(endOnFirstNotificationConsumer::onSuccess);
    }
}
