/*
 * Decompiled with CFR 0.152.
 */
package io.fabric8.gateway.handlers.detecting.protocol.mqtt;

import io.fabric8.gateway.SocketWrapper;
import io.fabric8.gateway.handlers.detecting.Protocol;
import io.fabric8.gateway.handlers.detecting.protocol.BufferSupport;
import io.fabric8.gateway.handlers.detecting.protocol.mqtt.MqttProtocolDecoder;
import io.fabric8.gateway.handlers.loadbalancer.ConnectionParameters;
import java.net.ProtocolException;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.codec.CONNECT;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.vertx.java.core.Handler;
import org.vertx.java.core.buffer.Buffer;

public class MqttProtocol
implements Protocol {
    private static final transient Logger LOG = LoggerFactory.getLogger(MqttProtocol.class);
    static final Buffer HEAD_MAGIC = new Buffer(new byte[]{16});
    static final Buffer MQTT31_TAIL_MAGIC = new Buffer(new byte[]{0, 6, 77, 81, 73, 115, 100, 112});
    static final Buffer MQTT311_TAIL_MAGIC = new Buffer(new byte[]{0, 4, 77, 81, 84, 84});
    int maxMessageLength = 0x6400000;
    private static final String[] SCHEMES = new String[]{"mqtt", "mqtt+nio"};

    @Override
    public String getProtocolName() {
        return "mqtt";
    }

    @Override
    public String[] getProtocolSchemes() {
        return SCHEMES;
    }

    @Override
    public int getMaxIdentificationLength() {
        return 13;
    }

    @Override
    public boolean matches(Buffer header) {
        if (header.length() < 10) {
            return false;
        }
        return BufferSupport.startsWith(header, HEAD_MAGIC) && (BufferSupport.indexOf(header, 2, MQTT31_TAIL_MAGIC) < 6 || BufferSupport.indexOf(header, 2, MQTT311_TAIL_MAGIC) < 6);
    }

    static void append(Buffer self, MQTTFrame value) {
        MQTTFrame frame = value;
        self.appendByte(frame.header());
        int remaining = 0;
        for (org.fusesource.hawtbuf.Buffer buffer : frame.buffers) {
            remaining += buffer.length;
        }
        do {
            byte digit = (byte)(remaining & 0x7F);
            if ((remaining >>>= 7) > 0) {
                digit = (byte)(digit | 0x80);
            }
            self.appendByte(digit);
        } while (remaining > 0);
        for (org.fusesource.hawtbuf.Buffer buffer : frame.buffers) {
            self.appendBytes(buffer.toByteArray());
        }
    }

    @Override
    public void snoopConnectionParameters(final SocketWrapper socket, final Buffer received, final Handler<ConnectionParameters> handler) {
        final MqttProtocolDecoder h = new MqttProtocolDecoder(this);
        h.errorHandler(new Handler<String>(){

            public void handle(String error) {
                LOG.info("STOMP protocol decoding error: " + error);
                socket.close();
            }
        });
        h.codecHandler(new Handler<MQTTFrame>(){

            public void handle(MQTTFrame event) {
                try {
                    if (event.messageType() == 1) {
                        CONNECT connect = new CONNECT().decode(event);
                        ConnectionParameters parameters = new ConnectionParameters();
                        if (connect.clientId() != null) {
                            parameters.protocolClientId = connect.clientId().toString();
                        }
                        if (connect.userName() != null) {
                            parameters.protocolUser = connect.userName().toString();
                            if (parameters.protocolUser.contains("/")) {
                                String[] parts = parameters.protocolUser.split("/", 2);
                                parameters.protocolVirtualHost = parts[0];
                                parameters.protocolUser = parts[1];
                                connect.userName(new UTF8Buffer(parameters.protocolUser));
                                Buffer tail = received.getBuffer((int)h.getBytesDecoded(), received.length());
                                BufferSupport.setLength(received, 0);
                                MqttProtocol.append(received, connect.encode());
                                received.appendBuffer(tail);
                            }
                        }
                        handler.handle((Object)parameters);
                    } else {
                        LOG.info("Expected a CONNECT frame");
                        socket.close();
                    }
                }
                catch (ProtocolException e) {
                    LOG.info("Invalid MQTT frame: " + e, (Throwable)e);
                    socket.close();
                }
            }
        });
        socket.readStream().dataHandler((Handler)h);
        h.handle(received);
    }
}

