package org.eclipse.hono.adapter.mqtt.impl;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.Json;
import io.vertx.mqtt.messages.MqttPublishMessage;
import java.util.Map;
import java.util.Objects;
import org.eclipse.hono.adapter.client.command.Command;
import org.eclipse.hono.adapter.client.command.CommandContext;
import org.eclipse.hono.adapter.mqtt.AbstractVertxBasedMqttProtocolAdapter;
import org.eclipse.hono.adapter.mqtt.MappedMessage;
import org.eclipse.hono.adapter.mqtt.MessageMapping;
import org.eclipse.hono.adapter.mqtt.MqttContext;
import org.eclipse.hono.adapter.mqtt.MqttProtocolAdapterProperties;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.util.ResourceIdentifier;

/* loaded from: input_file:org/eclipse/hono/adapter/mqtt/impl/VertxBasedMqttProtocolAdapter.class */
public final class VertxBasedMqttProtocolAdapter extends AbstractVertxBasedMqttProtocolAdapter<MqttProtocolAdapterProperties> {
    private static final String MAPPER_DATA = "mapper_data";
    private MessageMapping<MqttContext> messageMapping;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.eclipse.hono.adapter.mqtt.impl.VertxBasedMqttProtocolAdapter$1, reason: invalid class name */
    /* loaded from: input_file:org/eclipse/hono/adapter/mqtt/impl/VertxBasedMqttProtocolAdapter$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType = new int[MetricsTags.EndpointType.values().length];

        static {
            try {
                $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[MetricsTags.EndpointType.TELEMETRY.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[MetricsTags.EndpointType.EVENT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[MetricsTags.EndpointType.COMMAND.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public String getTypeName() {
        return "hono-mqtt";
    }

    public void setMessageMapping(MessageMapping<MqttContext> messageMapping) {
        Objects.requireNonNull(messageMapping);
        this.messageMapping = messageMapping;
    }

    @Override // org.eclipse.hono.adapter.mqtt.AbstractVertxBasedMqttProtocolAdapter
    protected Future<Void> onPublishedMessage(MqttContext mqttContext) {
        return mapTopic(mqttContext).compose(resourceIdentifier -> {
            return validateAddress(resourceIdentifier, mqttContext.authenticatedDevice());
        }).compose(resourceIdentifier2 -> {
            return mapMessage(mqttContext, resourceIdentifier2);
        }).compose(mappedMessage -> {
            return uploadMessage(mqttContext, mappedMessage.getTargetAddress(), MqttPublishMessage.create(mqttContext.message().messageId(), mqttContext.message().qosLevel(), mqttContext.message().isDup(), mqttContext.message().isRetain(), mqttContext.message().topicName(), mappedMessage.getPayload().getByteBuf()));
        }).recover(th -> {
            this.log.debug("discarding message [topic: {}] from {}", new Object[]{mqttContext.message().topicName(), mqttContext.authenticatedDevice(), th});
            return Future.failedFuture(th);
        });
    }

    private Future<MappedMessage> mapMessage(MqttContext mqttContext, ResourceIdentifier resourceIdentifier) {
        return getRegistrationAssertion(resourceIdentifier.getTenantId(), resourceIdentifier.getResourceId(), mqttContext.authenticatedDevice(), mqttContext.getTracingContext()).compose(registrationAssertion -> {
            return this.messageMapping.mapDownstreamMessage(mqttContext, resourceIdentifier, registrationAssertion);
        }).map(mappedMessage -> {
            mqttContext.put(MAPPER_DATA, mappedMessage.getAdditionalProperties());
            return mappedMessage;
        });
    }

    @Override // org.eclipse.hono.adapter.mqtt.AbstractVertxBasedMqttProtocolAdapter
    protected void customizeDownstreamMessageProperties(Map<String, Object> map, MqttContext mqttContext) {
        Object obj = mqttContext.get(MAPPER_DATA);
        if (obj instanceof Map) {
            ((Map) obj).entrySet().stream().filter(entry -> {
                return entry.getKey() instanceof String;
            }).forEach(entry2 -> {
                String str = (String) entry2.getKey();
                Object value = entry2.getValue();
                if (value instanceof String) {
                    map.put(str, value);
                } else {
                    map.put(str, Json.encode(value));
                }
            });
        }
    }

    Future<ResourceIdentifier> mapTopic(MqttContext mqttContext) {
        Promise promise = Promise.promise();
        ResourceIdentifier resourceIdentifier = mqttContext.topic();
        MqttQoS qosLevel = mqttContext.message().qosLevel();
        switch (AnonymousClass1.$SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[MetricsTags.EndpointType.fromString(resourceIdentifier.getEndpoint()).ordinal()]) {
            case 1:
                if (!MqttQoS.EXACTLY_ONCE.equals(qosLevel)) {
                    promise.complete(resourceIdentifier);
                    break;
                } else {
                    promise.fail(new ClientErrorException(400, "QoS 2 not supported for telemetry messages"));
                    break;
                }
            case 2:
                if (!MqttQoS.AT_LEAST_ONCE.equals(qosLevel)) {
                    promise.fail(new ClientErrorException(400, "Only QoS 1 supported for event messages"));
                    break;
                } else {
                    promise.complete(resourceIdentifier);
                    break;
                }
            case 3:
                if (!MqttQoS.EXACTLY_ONCE.equals(qosLevel)) {
                    promise.complete(resourceIdentifier);
                    break;
                } else {
                    promise.fail(new ClientErrorException(400, "QoS 2 not supported for command response messages"));
                    break;
                }
            default:
                this.log.debug("no such endpoint [{}]", resourceIdentifier.getEndpoint());
                promise.fail(new ClientErrorException(404, "no such endpoint"));
                break;
        }
        return promise.future();
    }

    @Override // org.eclipse.hono.adapter.mqtt.AbstractVertxBasedMqttProtocolAdapter
    protected Future<Buffer> getCommandPayload(CommandContext commandContext) {
        Command command = commandContext.getCommand();
        return getRegistrationClient().assertRegistration(command.getTenant(), command.getGatewayOrDeviceId(), (String) null, commandContext.getTracingContext()).compose(registrationAssertion -> {
            return this.messageMapping.mapUpstreamMessage(registrationAssertion, command);
        });
    }
}
