package org.eclipse.hono.commandrouter.impl.kafka;

import io.micrometer.core.instrument.Timer;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.util.Objects;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommand;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommandContext;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommandResponseSender;
import org.eclipse.hono.client.command.kafka.KafkaBasedInternalCommandSender;
import org.eclipse.hono.client.kafka.tracing.KafkaTracingHelper;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.commandrouter.CommandRouterMetrics;
import org.eclipse.hono.commandrouter.CommandTargetMapper;
import org.eclipse.hono.commandrouter.impl.AbstractMappingAndDelegatingCommandHandler;
import org.eclipse.hono.util.MessagingType;

/* loaded from: input_file:org/eclipse/hono/commandrouter/impl/kafka/KafkaBasedMappingAndDelegatingCommandHandler.class */
public class KafkaBasedMappingAndDelegatingCommandHandler extends AbstractMappingAndDelegatingCommandHandler<KafkaBasedCommandContext> {
    private final KafkaBasedCommandResponseSender kafkaBasedCommandResponseSender;

    public KafkaBasedMappingAndDelegatingCommandHandler(Vertx vertx, TenantClient tenantClient, KafkaCommandProcessingQueue kafkaCommandProcessingQueue, CommandTargetMapper commandTargetMapper, KafkaBasedInternalCommandSender kafkaBasedInternalCommandSender, KafkaBasedCommandResponseSender kafkaBasedCommandResponseSender, CommandRouterMetrics commandRouterMetrics, Tracer tracer) {
        super(vertx, tenantClient, kafkaCommandProcessingQueue, commandTargetMapper, kafkaBasedInternalCommandSender, commandRouterMetrics, tracer);
        this.kafkaBasedCommandResponseSender = (KafkaBasedCommandResponseSender) Objects.requireNonNull(kafkaBasedCommandResponseSender);
    }

    @Override // org.eclipse.hono.commandrouter.impl.AbstractMappingAndDelegatingCommandHandler
    protected final MessagingType getMessagingType() {
        return MessagingType.kafka;
    }

    @Override // org.eclipse.hono.commandrouter.impl.AbstractMappingAndDelegatingCommandHandler
    public Future<Void> start() {
        return CompositeFuture.all(super.start(), this.kafkaBasedCommandResponseSender.start()).mapEmpty();
    }

    @Override // org.eclipse.hono.commandrouter.impl.AbstractMappingAndDelegatingCommandHandler
    public Future<Void> stop() {
        return CompositeFuture.join(super.stop(), this.kafkaBasedCommandResponseSender.stop()).mapEmpty();
    }

    public Future<Void> mapAndDelegateIncomingCommandMessage(KafkaConsumerRecord<String, Buffer> kafkaConsumerRecord) {
        Objects.requireNonNull(kafkaConsumerRecord);
        Timer.Sample startTimer = getMetrics().startTimer();
        try {
            KafkaBasedCommand from = KafkaBasedCommand.from(kafkaConsumerRecord);
            Span createSpan = createSpan(from.getTenant(), from.getDeviceId(), KafkaTracingHelper.extractSpanContext(this.tracer, kafkaConsumerRecord));
            KafkaTracingHelper.setRecordTags(createSpan, kafkaConsumerRecord);
            KafkaBasedCommandContext kafkaBasedCommandContext = new KafkaBasedCommandContext(from, this.kafkaBasedCommandResponseSender, createSpan);
            from.logToSpan(createSpan);
            if (from.isValid()) {
                this.log.trace("received valid command record [{}]", from);
                return mapAndDelegateIncomingCommand(kafkaBasedCommandContext, startTimer);
            }
            this.log.debug("received invalid command record [{}]", from);
            return this.tenantClient.get(from.getTenant(), createSpan.context()).compose(tenantObject -> {
                kafkaBasedCommandContext.put("tenant-config", tenantObject);
                return Future.failedFuture("command is invalid");
            }).onComplete(asyncResult -> {
                kafkaBasedCommandContext.reject("malformed command message");
                reportInvalidCommand(kafkaBasedCommandContext, startTimer);
            }).mapEmpty();
        } catch (IllegalArgumentException e) {
            this.log.debug("command record is invalid", e);
            return Future.failedFuture("command record is invalid");
        }
    }
}
