package org.opendaylight.federationmessagequeue.impl;

import akka.osgi.BundleDelegatingClassLoader;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import org.objenesis.strategy.StdInstantiatorStrategy;
import org.opendaylight.messagequeue.AbstractFederationMessage;
import org.opendaylight.messagequeue.IGeneralFederationConsumer;
import org.opendaylight.messagequeue.IMessageBusClient;
import org.osgi.framework.FrameworkUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opendaylight/federationmessagequeue/impl/RabbitMessageBus.class */
public class RabbitMessageBus implements IMessageBusClient {
    private static final Logger LOG = LoggerFactory.getLogger(RabbitMessageBus.class);
    private final Map<String, MessageBusConnectionData> queueNameToConnectionData = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/opendaylight/federationmessagequeue/impl/RabbitMessageBus$MessageBusConnectionData.class */
    public static class MessageBusConnectionData {
        public String brokerIp;
        public Connection conn;
        public Channel channel;

        MessageBusConnectionData(String str, Connection connection, Channel channel) {
            this.brokerIp = str;
            this.conn = connection;
            this.channel = channel;
        }
    }

    public boolean createQueue(String str, String str2) {
        return createQueue(str, str2, 5672, "guest", "guest");
    }

    public boolean createQueue(String str, String str2, int i, String str3, String str4) {
        LOG.info("Creating connection for queue {} on broker {}", str, str2);
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(str2);
        connectionFactory.setPort(i);
        connectionFactory.setUsername(str3);
        connectionFactory.setPassword(str4);
        connectionFactory.setAutomaticRecoveryEnabled(true);
        try {
            Connection newConnection = connectionFactory.newConnection();
            LOG.info("Created connection to broker {}:{} for user {} ", new Object[]{str2, Integer.valueOf(i), str3});
            Channel createChannel = newConnection.createChannel();
            createChannel.queueDeclare(str, false, false, false, (Map) null);
            LOG.info("Declared queue {} on broker {}", str, str2);
            this.queueNameToConnectionData.put(str, new MessageBusConnectionData(str2, newConnection, createChannel));
            return true;
        } catch (IOException | TimeoutException e) {
            LOG.warn("Failed creating queue {} on broker {}:{} for user {} because: {}", new Object[]{str, str2, Integer.valueOf(i), str3, e.getMessage()});
            return false;
        }
    }

    public void init() {
        LOG.info("starting {}", getClass().getSimpleName());
    }

    public void close() {
        LOG.info("closing {}", getClass().getSimpleName());
    }

    public void destroyQueue(String str) {
        LOG.info("Started delete of queue {}", str);
        MessageBusConnectionData messageBusConnectionData = this.queueNameToConnectionData.get(str);
        if (messageBusConnectionData == null) {
            LOG.warn("Cancelled deletion of queue {} because queueName not found in queueNameToConnectionData", str);
            return;
        }
        Channel channel = messageBusConnectionData.channel;
        String str2 = messageBusConnectionData.brokerIp;
        Connection connection = messageBusConnectionData.conn;
        try {
            if (channel != null) {
                try {
                    channel.queueDelete(str);
                    LOG.info("Deleted queue {} successfully", str);
                } catch (IOException e) {
                    LOG.warn("Failed to delete queue {} msg: {}", str, e.getMessage());
                }
                channel.close();
            } else {
                LOG.warn("Null channel while deleting queue {} on broker {}", str, str2);
            }
            if (connection != null) {
                connection.close();
            } else {
                LOG.warn("Null connection while deleting queue {} on broker {}", str, str2);
            }
        } catch (IOException | TimeoutException e2) {
            LOG.warn("Failed to close channel while deleting queue {} on broker {}", new Object[]{str, str2, e2});
        }
        this.queueNameToConnectionData.remove(str);
    }

    public String attachHandler(String str, IGeneralFederationConsumer iGeneralFederationConsumer) {
        MessageBusConnectionData messageBusConnectionData = this.queueNameToConnectionData.get(str);
        if (messageBusConnectionData == null) {
            LOG.warn("AttachHandler failed - queue {} not found in the active connection map}", str);
            return null;
        }
        Channel channel = messageBusConnectionData.channel;
        try {
            return channel.basicConsume(str, true, createRabbitConsumer(iGeneralFederationConsumer, channel));
        } catch (IOException e) {
            LOG.warn("Failed to consume from queue {} on broker {}", new Object[]{str, messageBusConnectionData.brokerIp, e});
            return null;
        }
    }

    private Consumer createRabbitConsumer(final IGeneralFederationConsumer iGeneralFederationConsumer, Channel channel) {
        return new DefaultConsumer(channel) { // from class: org.opendaylight.federationmessagequeue.impl.RabbitMessageBus.1
            public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                Kryo kryo = new Kryo();
                kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
                Input input = new Input(new ByteArrayInputStream(bArr));
                try {
                    kryo.setClassLoader(new BundleDelegatingClassLoader(FrameworkUtil.getBundle(RabbitMessageBus.class).getBundleContext().getBundle(), Thread.currentThread().getContextClassLoader()));
                    Object readClassAndObject = kryo.readClassAndObject(input);
                    if (readClassAndObject instanceof AbstractFederationMessage) {
                        RabbitCounters.received_msg.inc();
                        iGeneralFederationConsumer.consumeMsg((AbstractFederationMessage) readClassAndObject);
                    } else {
                        RabbitMessageBus.LOG.error("Received an object not of type AbstractFederationMessage, type was: {}", readClassAndObject.getClass().getName());
                    }
                    RabbitMessageBus.LOG.trace("Deserialized {}", readClassAndObject);
                } catch (Throwable th) {
                    RabbitMessageBus.LOG.error("Failed in readObject: " + th.getMessage(), th);
                }
            }
        };
    }

    public synchronized void sendMsg(AbstractFederationMessage abstractFederationMessage, String str) {
        MessageBusConnectionData messageBusConnectionData = this.queueNameToConnectionData.get(str);
        if (messageBusConnectionData == null) {
            LOG.error("sendMsg - unknown queue name {}", str);
            LOG.trace("Dropped msg {}", abstractFederationMessage);
            return;
        }
        Channel channel = messageBusConnectionData.channel;
        LOG.trace("Sending msg to queue {}, msg {}", str, abstractFederationMessage);
        createQueueIfNeeded(str, messageBusConnectionData, channel);
        try {
            channel.basicPublish("", str, (AMQP.BasicProperties) null, serializeUsingKryo(abstractFederationMessage));
            RabbitCounters.sent_msg.inc();
            LOG.debug("Sent msg to {} on broker {}", str, messageBusConnectionData.brokerIp);
        } catch (IOException e) {
            LOG.error("Failed to send message to queue {} on broker {} because {}", new Object[]{str, messageBusConnectionData.brokerIp, e.getMessage()});
        }
    }

    private void createQueueIfNeeded(String str, MessageBusConnectionData messageBusConnectionData, Channel channel) {
        try {
            channel.queueDeclare(str, false, false, false, (Map) null);
        } catch (IOException e) {
            LOG.warn("Failed to declare queue {} on broker {}", new Object[]{str, messageBusConnectionData.brokerIp, e});
        }
    }

    private byte[] serializeUsingKryo(AbstractFederationMessage abstractFederationMessage) {
        Kryo kryo = new Kryo();
        kryo.getInstantiatorStrategy().setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Output output = new Output(byteArrayOutputStream);
        kryo.writeClassAndObject(output, abstractFederationMessage);
        output.close();
        return byteArrayOutputStream.toByteArray();
    }

    public void detachHandler(String str, String str2) {
        MessageBusConnectionData messageBusConnectionData = this.queueNameToConnectionData.get(str);
        if (messageBusConnectionData == null) {
            LOG.warn("unknown queue name {} couldn't detach handler", str);
            return;
        }
        Channel channel = messageBusConnectionData.channel;
        try {
            LOG.info("Cancelling queue handler {} " + str2);
            channel.basicCancel(str2);
        } catch (IOException e) {
            LOG.error("Detaching queue handler failed", e);
        }
    }
}
