package org.eclipse.hono.service;

import io.vertx.core.Handler;
import java.util.Objects;
import org.eclipse.hono.client.amqp.config.ClientConfigProperties;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.notification.amqp.ProtonBasedNotificationReceiver;
import org.eclipse.hono.client.notification.kafka.KafkaBasedNotificationReceiver;
import org.eclipse.hono.client.notification.kafka.NotificationKafkaConsumerConfigProperties;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.notification.NotificationConstants;
import org.eclipse.hono.notification.NotificationEventBusSupport;
import org.eclipse.hono.notification.NotificationReceiver;
import org.eclipse.hono.service.util.ServiceClientAdapter;

/* loaded from: input_file:org/eclipse/hono/service/NotificationSupportingServiceApplication.class */
public abstract class NotificationSupportingServiceApplication extends AbstractServiceApplication {
    protected NotificationReceiver notificationReceiver(NotificationKafkaConsumerConfigProperties notificationKafkaConsumerConfigProperties, ClientConfigProperties clientConfigProperties) {
        KafkaBasedNotificationReceiver protonBasedNotificationReceiver;
        if (!this.appConfig.isKafkaMessagingDisabled() && notificationKafkaConsumerConfigProperties.isConfigured()) {
            protonBasedNotificationReceiver = new KafkaBasedNotificationReceiver(this.vertx, notificationKafkaConsumerConfigProperties);
        } else {
            if (this.appConfig.isAmqpMessagingDisabled() || !clientConfigProperties.isHostConfigured()) {
                throw new IllegalStateException("at least one of Kafka or AMQP messaging must be configured");
            }
            ClientConfigProperties clientConfigProperties2 = new ClientConfigProperties(clientConfigProperties);
            clientConfigProperties2.setServerRole("Notification");
            protonBasedNotificationReceiver = new ProtonBasedNotificationReceiver(HonoConnection.newConnection(this.vertx, clientConfigProperties2, this.tracer));
        }
        if (protonBasedNotificationReceiver instanceof ServiceClient) {
            this.healthCheckServer.registerHealthCheckResources(ServiceClientAdapter.forClient((ServiceClient) protonBasedNotificationReceiver));
        }
        Handler notificationSender = NotificationEventBusSupport.getNotificationSender(this.vertx);
        KafkaBasedNotificationReceiver kafkaBasedNotificationReceiver = protonBasedNotificationReceiver;
        NotificationConstants.DEVICE_REGISTRY_NOTIFICATION_TYPES.forEach(notificationType -> {
            Objects.requireNonNull(notificationSender);
            kafkaBasedNotificationReceiver.registerConsumer(notificationType, (v1) -> {
                r2.handle(v1);
            });
        });
        return protonBasedNotificationReceiver;
    }
}
