package org.eclipse.hono.service;

import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.noop.NoopTracerFactory;
import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonObject;
import java.util.Objects;
import java.util.Optional;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.tracing.MultiMapExtractAdapter;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.ConfigurationSupportingVerticle;
import org.eclipse.hono.util.EventBusMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/eclipse/hono/service/EventBusService.class */
public abstract class EventBusService<C> extends ConfigurationSupportingVerticle<C> {
    protected final Logger log = LoggerFactory.getLogger(getClass());
    protected Tracer tracer = NoopTracerFactory.create();
    private MessageConsumer<JsonObject> requestConsumer;

    @Autowired(required = false)
    public final void setTracer(Tracer tracer) {
        this.log.info("using OpenTracing Tracer implementation [{}]", tracer.getClass().getName());
        this.tracer = (Tracer) Objects.requireNonNull(tracer);
    }

    public final void start(Future<Void> future) {
        registerConsumer();
        doStart(future);
    }

    protected void doStart(Future<Void> future) {
        future.complete();
    }

    protected abstract String getEventBusAddress();

    public final void stop(Future<Void> future) {
        if (this.requestConsumer != null) {
            this.requestConsumer.unregister();
            this.log.info("unregistered Tenant API request consumer from event bus");
        }
        doStop(future);
    }

    protected void doStop(Future<Void> future) {
        future.complete();
    }

    private void registerConsumer() {
        this.requestConsumer = this.vertx.eventBus().consumer(getEventBusAddress());
        this.requestConsumer.handler(this::processRequestMessage);
        this.log.info("listening on event bus [address: {}] for requests", getEventBusAddress());
    }

    private void processRequestMessage(Message<JsonObject> message) {
        if (this.log.isTraceEnabled()) {
            this.log.trace("received request message: {}", ((JsonObject) message.body()).encodePrettily());
        }
        EventBusMessage fromJson = EventBusMessage.fromJson((JsonObject) message.body());
        fromJson.setSpanContext(this.tracer.extract(Format.Builtin.TEXT_MAP, new MultiMapExtractAdapter(message.headers())));
        processRequest(fromJson).recover(th -> {
            this.log.debug("cannot process request [operation: {}]: {}", fromJson.getOperation(), th.getMessage());
            return Future.succeededFuture(fromJson.getResponse(ServiceInvocationException.extractStatusCode(th)));
        }).map(eventBusMessage -> {
            if (eventBusMessage.getReplyToAddress() == null) {
                this.log.debug("sending response as direct reply to request [operation: {}]", fromJson.getOperation());
                message.reply(eventBusMessage.toJson());
                return null;
            }
            if (!eventBusMessage.hasResponseProperties()) {
                this.log.warn("discarding response lacking correlation ID or operation");
                return null;
            }
            this.log.debug("sending response [operation: {}, reply-to: {}]", fromJson.getOperation(), fromJson.getReplyToAddress());
            this.vertx.eventBus().send(fromJson.getReplyToAddress(), eventBusMessage.toJson());
            return null;
        });
    }

    protected abstract Future<EventBusMessage> processRequest(EventBusMessage eventBusMessage);

    /* JADX INFO: Access modifiers changed from: protected */
    public static final <T> T getTypesafeValueForField(Class<T> cls, JsonObject jsonObject, String str) {
        Objects.requireNonNull(cls);
        Objects.requireNonNull(jsonObject);
        Objects.requireNonNull(str);
        Object value = jsonObject.getValue(str);
        if (cls.isInstance(value)) {
            return cls.cast(value);
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static final <T> T removeTypesafeValueForField(Class<T> cls, JsonObject jsonObject, String str) {
        Objects.requireNonNull(cls);
        Objects.requireNonNull(jsonObject);
        Objects.requireNonNull(str);
        Object remove = jsonObject.remove(str);
        if (cls.isInstance(remove)) {
            return cls.cast(remove);
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final JsonObject getRequestPayload(JsonObject jsonObject) {
        return (JsonObject) Optional.ofNullable(jsonObject).map(jsonObject2 -> {
            if (jsonObject2.getValue("enabled") instanceof Boolean) {
                return jsonObject2;
            }
            this.log.trace("adding 'enabled=true' property to request payload");
            return jsonObject2.copy().put("enabled", Boolean.TRUE);
        }).orElse(new JsonObject().put("enabled", Boolean.TRUE));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Future<EventBusMessage> finishSpanOnFutureCompletion(Span span, Future<EventBusMessage> future) {
        return future.compose(eventBusMessage -> {
            Tags.HTTP_STATUS.set(span, eventBusMessage.getStatus());
            if (eventBusMessage.hasErrorStatus()) {
                Tags.ERROR.set(span, true);
            }
            span.finish();
            return Future.succeededFuture(eventBusMessage);
        }).recover(th -> {
            Tags.HTTP_STATUS.set(span, Integer.valueOf(ServiceInvocationException.extractStatusCode(th)));
            TracingHelper.logError(span, th);
            span.finish();
            return Future.failedFuture(th);
        });
    }
}
