package org.eclipse.hono.cli;

import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.proton.ProtonConnection;
import java.util.ArrayList;
import javax.annotation.PostConstruct;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.HonoClient;
import org.eclipse.hono.util.MessageHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

@Profile({"receiver"})
@Component
/* loaded from: input_file:org/eclipse/hono/cli/Receiver.class */
public class Receiver extends AbstractClient {
    private static final String TYPE_TELEMETRY = "telemetry";
    private static final String TYPE_EVENT = "event";
    private static final String TYPE_ALL = "all";

    @Value("${message.type}")
    protected String messageType;

    @PostConstruct
    Future<CompositeFuture> start() {
        return this.client.connect(this::onDisconnect).compose(this::createConsumer).setHandler(this::handleCreateConsumerStatus);
    }

    private CompositeFuture createConsumer(HonoClient honoClient) {
        Handler handler = r8 -> {
            this.LOG.info("close handler of consumer is called");
            this.vertx.setTimer(this.connectionRetryInterval, l -> {
                this.LOG.info("attempting to re-open the consumer link ...");
                createConsumer(honoClient);
            });
        };
        ArrayList arrayList = new ArrayList();
        if (this.messageType.equals(TYPE_EVENT) || this.messageType.equals(TYPE_ALL)) {
            arrayList.add(honoClient.createEventConsumer(this.tenantId, message -> {
                handleMessage(TYPE_EVENT, message);
            }, handler));
        }
        if (this.messageType.equals(TYPE_TELEMETRY) || this.messageType.equals(TYPE_ALL)) {
            arrayList.add(honoClient.createTelemetryConsumer(this.tenantId, message2 -> {
                handleMessage(TYPE_TELEMETRY, message2);
            }, handler));
        }
        if (arrayList.isEmpty()) {
            arrayList.add(Future.failedFuture("Invalid message type. Valid types are telemetry, event or all"));
        }
        return CompositeFuture.all(arrayList);
    }

    private void onDisconnect(ProtonConnection protonConnection) {
        this.vertx.setTimer(this.connectionRetryInterval, l -> {
            this.LOG.info("attempting to re-connect to Hono ...");
            this.client.connect(this::onDisconnect).compose(this::createConsumer);
        });
    }

    private void handleMessage(String str, Message message) {
        this.LOG.info("received {} message [device: {}, content-type: {}]: {}", new Object[]{str, MessageHelper.getDeviceId(message), message.getContentType(), MessageHelper.getPayload(message)});
        if (message.getApplicationProperties() != null) {
            this.LOG.info("... with application properties: {}", message.getApplicationProperties().getValue());
        }
    }

    private void handleCreateConsumerStatus(AsyncResult<CompositeFuture> asyncResult) {
        if (asyncResult.succeeded()) {
            this.LOG.info("Receiver [tenant: {}, mode: {}] created successfully, hit ctrl-c to exit", this.tenantId, this.messageType);
        } else {
            this.LOG.error("Error occurred during initialization of receiver: {}", asyncResult.cause().getMessage());
            this.vertx.close();
        }
    }
}
