package org.kinotic.continuum.gateway.internal.endpoints.mqtt;

import io.vertx.ext.stomp.lite.StompServerConnection;
import io.vertx.ext.stomp.lite.frame.Frame;
import org.kinotic.continuum.core.api.event.Event;
import org.kinotic.continuum.gateway.internal.endpoints.stomp.GatewayUtils;
import org.kinotic.continuum.gateway.internal.endpoints.stomp.StompSubscriptionEventSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.BaseSubscriber;

/* loaded from: input_file:org/kinotic/continuum/gateway/internal/endpoints/mqtt/MqttSubscriptionEventSubscriber.class */
public class MqttSubscriptionEventSubscriber extends BaseSubscriber<Event<byte[]>> {
    private static final Logger log = LoggerFactory.getLogger(StompSubscriptionEventSubscriber.class);
    private final String destination;
    private final String subscriptionId;
    private final StompServerConnection connection;

    public MqttSubscriptionEventSubscriber(String str, String str2, StompServerConnection stompServerConnection) {
        this.destination = str;
        this.subscriptionId = str2;
        this.connection = stompServerConnection;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void hookOnNext(Event<byte[]> event) {
        try {
            Frame eventToStompFrame = GatewayUtils.eventToStompFrame(event);
            eventToStompFrame.getHeaders().put("subscription", this.subscriptionId);
            if (log.isTraceEnabled()) {
                log.trace("Sending Frame\n" + eventToStompFrame.toString());
            }
            this.connection.write(eventToStompFrame);
        } catch (Exception e) {
            log.error("Unexpected Error in Handler " + e.getMessage(), e);
            log.error("Closing connection");
            dispose();
            this.connection.close();
        }
    }

    protected void hookOnError(Throwable th) {
        log.error("Error on event bus subscription for destination " + this.destination, th);
    }
}
