package org.eclipse.hono.commandrouter.impl;

import io.micrometer.core.instrument.Timer;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.NoConsumerException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.InternalCommandSender;
import org.eclipse.hono.client.registry.DeviceDisabledOrNotRegisteredException;
import org.eclipse.hono.client.registry.TenantClient;
import org.eclipse.hono.client.registry.TenantDisabledOrNotRegisteredException;
import org.eclipse.hono.commandrouter.CommandRouterMetrics;
import org.eclipse.hono.commandrouter.CommandTargetMapper;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.tracing.TenantTraceSamplingHelper;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Futures;
import org.eclipse.hono.util.Lifecycle;
import org.eclipse.hono.util.MessagingType;
import org.eclipse.hono.util.TenantObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/commandrouter/impl/AbstractMappingAndDelegatingCommandHandler.class */
public abstract class AbstractMappingAndDelegatingCommandHandler<T extends CommandContext> implements Lifecycle {
    private static final Duration PROCESSING_TIMEOUT = Duration.ofSeconds(8);
    protected final Logger log = LoggerFactory.getLogger(getClass());
    protected final TenantClient tenantClient;
    protected final Tracer tracer;
    private final Vertx vertx;
    private final CommandTargetMapper commandTargetMapper;
    private final InternalCommandSender internalCommandSender;
    private final CommandRouterMetrics metrics;
    private final CommandProcessingQueue<T> commandQueue;

    public AbstractMappingAndDelegatingCommandHandler(Vertx vertx, TenantClient tenantClient, CommandProcessingQueue<T> commandProcessingQueue, CommandTargetMapper commandTargetMapper, InternalCommandSender internalCommandSender, CommandRouterMetrics commandRouterMetrics, Tracer tracer) {
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
        this.tenantClient = (TenantClient) Objects.requireNonNull(tenantClient);
        this.commandQueue = (CommandProcessingQueue) Objects.requireNonNull(commandProcessingQueue);
        this.commandTargetMapper = (CommandTargetMapper) Objects.requireNonNull(commandTargetMapper);
        this.internalCommandSender = (InternalCommandSender) Objects.requireNonNull(internalCommandSender);
        this.metrics = (CommandRouterMetrics) Objects.requireNonNull(commandRouterMetrics);
        this.tracer = (Tracer) Objects.requireNonNull(tracer);
    }

    public Future<Void> start() {
        return this.internalCommandSender.start();
    }

    public Future<Void> stop() {
        this.commandQueue.clear();
        return this.internalCommandSender.stop();
    }

    protected abstract MessagingType getMessagingType();

    /* JADX INFO: Access modifiers changed from: protected */
    public final CommandRouterMetrics getMetrics() {
        return this.metrics;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Future<Void> mapAndDelegateIncomingCommand(T t, Timer.Sample sample) {
        Objects.requireNonNull(t);
        Objects.requireNonNull(sample);
        this.commandQueue.add(t);
        Promise promise = Promise.promise();
        long timer = this.vertx.setTimer(PROCESSING_TIMEOUT.toMillis(), l -> {
            if (this.commandQueue.remove(t) || !t.isCompleted()) {
                this.log.info("command processing timed out after {}s [{}]", Long.valueOf(PROCESSING_TIMEOUT.toSeconds()), t.getCommand());
                TracingHelper.logError(t.getTracingSpan(), String.format("command processing timed out after %ds", Long.valueOf(PROCESSING_TIMEOUT.toSeconds())));
                ServerErrorException serverErrorException = new ServerErrorException(503, "command processing timed out");
                t.release(serverErrorException);
                promise.tryFail(serverErrorException);
            }
        });
        mapAndDelegateIncomingCommandInternal(t, sample).onComplete(asyncResult -> {
            this.vertx.cancelTimer(timer);
            if (asyncResult.failed()) {
                this.commandQueue.remove(t);
            }
            Futures.tryHandleResult(promise, asyncResult);
        });
        return promise.future();
    }

    private Future<Void> mapAndDelegateIncomingCommandInternal(T t, Timer.Sample sample) {
        Command command = t.getCommand();
        if (this.log.isTraceEnabled()) {
            this.log.trace("determine command target gateway/adapter for [{}]", command);
        }
        Future future = this.tenantClient.get(command.getTenant(), t.getTracingContext());
        return future.compose(tenantObject -> {
            TenantTraceSamplingHelper.applyTraceSamplingPriority(tenantObject, (String) null, t.getTracingSpan());
            t.put("tenant-config", tenantObject);
            MessagingType messagingType = (MessagingType) Optional.ofNullable((JsonObject) tenantObject.getProperty("ext", JsonObject.class)).map(jsonObject -> {
                return jsonObject.getString("messaging-type");
            }).map(MessagingType::valueOf).orElse(null);
            if (messagingType != null && getMessagingType() != messagingType) {
                this.log.info("command received via {} but tenant is configured to use {} [{}]", new Object[]{getMessagingType(), messagingType, t.getCommand()});
                t.getTracingSpan().log(String.format("command received via %s but tenant is configured to use %s", getMessagingType(), messagingType));
            }
            return this.commandTargetMapper.getTargetGatewayAndAdapterInstance(command.getTenant(), command.getDeviceId(), t.getTracingContext());
        }).recover(th -> {
            Throwable serverErrorException;
            if (future.failed() && ServiceInvocationException.extractStatusCode(th) == 404) {
                serverErrorException = new TenantDisabledOrNotRegisteredException(command.getTenant(), 404);
            } else if (th instanceof DeviceDisabledOrNotRegisteredException) {
                serverErrorException = th;
            } else if (ServiceInvocationException.extractStatusCode(th) == 404) {
                this.log.debug("no target adapter instance found for command with device id " + command.getDeviceId(), th);
                serverErrorException = new NoConsumerException("no target adapter instance found");
            } else {
                this.log.debug("error getting target gateway and adapter instance for command with device id " + command.getDeviceId(), th);
                serverErrorException = new ServerErrorException(503, "error getting target gateway and adapter instance", th);
            }
            if (serverErrorException instanceof ClientErrorException) {
                t.reject(serverErrorException);
            } else {
                t.release(serverErrorException);
            }
            reportCommandProcessingError(command, (TenantObject) future.result(), serverErrorException, sample);
            return Future.failedFuture(th);
        }).compose(jsonObject -> {
            String string = jsonObject.getString("adapter-instance-id");
            String string2 = jsonObject.getString("device-id");
            String str = string2.equals(command.getDeviceId()) ? null : string2;
            if (Objects.isNull(str)) {
                this.log.trace("determined target adapter instance [{}] for [{}] (command not mapped to gateway)", string, command);
            } else {
                command.setGatewayId(str);
                this.log.trace("determined target gateway [{}] and adapter instance [{}] for [{}]", new Object[]{str, string, command});
                t.getTracingSpan().log("determined target gateway [" + str + "]");
            }
            return this.commandQueue.applySendCommandAction(t, () -> {
                return sendCommandInternal(t, string, (TenantObject) future.result(), sample);
            });
        });
    }

    private Future<Void> sendCommandInternal(T t, String str, TenantObject tenantObject, Timer.Sample sample) {
        return this.internalCommandSender.sendCommand(t, str).onFailure(th -> {
            reportCommandProcessingError(t.getCommand(), tenantObject, th, sample);
        });
    }

    protected void reportCommandProcessingError(Command command, TenantObject tenantObject, Throwable th, Timer.Sample sample) {
        this.metrics.reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, command.getTenant(), tenantObject, MetricsTags.ProcessingOutcome.from(th), command.getPayloadSize(), sample);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void reportInvalidCommand(CommandContext commandContext, Timer.Sample sample) {
        Command command = commandContext.getCommand();
        this.tenantClient.get(command.getTenant(), commandContext.getTracingContext()).recover(th -> {
            return Future.succeededFuture((Object) null);
        }).onSuccess(tenantObject -> {
            this.metrics.reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, command.getTenant(), tenantObject, MetricsTags.ProcessingOutcome.UNPROCESSABLE, command.getPayloadSize(), sample);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Span createSpan(String str, String str2, SpanContext spanContext) {
        return TracingHelper.buildChildSpan(this.tracer, spanContext, "map and delegate command", getClass().getSimpleName()).withTag(Tags.SPAN_KIND.getKey(), "consumer").withTag(TracingHelper.TAG_TENANT_ID, str).withTag(TracingHelper.TAG_DEVICE_ID, str2).start();
    }
}
