/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.rabbitmq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.rabbitmq.RabbitMQSinkConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name="rabbitmq", type=IOType.SINK, help="A sink connector is used for moving messages from Pulsar to RabbitMQ.", configClass=RabbitMQSinkConfig.class)
public class RabbitMQSink
implements Sink<byte[]> {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQSink.class);
    private Connection rabbitMQConnection;
    private Channel rabbitMQChannel;
    private RabbitMQSinkConfig rabbitMQSinkConfig;
    private String exchangeName;
    private String defaultRoutingKey;

    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        this.rabbitMQSinkConfig = RabbitMQSinkConfig.load(config);
        this.rabbitMQSinkConfig.validate();
        ConnectionFactory connectionFactory = this.rabbitMQSinkConfig.createConnectionFactory();
        this.rabbitMQConnection = connectionFactory.newConnection(this.rabbitMQSinkConfig.getConnectionName());
        log.info("A new connection to {}:{} has been opened successfully.", (Object)this.rabbitMQConnection.getAddress().getCanonicalHostName(), (Object)this.rabbitMQConnection.getPort());
        this.exchangeName = this.rabbitMQSinkConfig.getExchangeName();
        this.defaultRoutingKey = this.rabbitMQSinkConfig.getRoutingKey();
        String exchangeType = this.rabbitMQSinkConfig.getExchangeType();
        this.rabbitMQChannel = this.rabbitMQConnection.createChannel();
        String queueName = this.rabbitMQSinkConfig.getQueueName();
        if (StringUtils.isNotEmpty((CharSequence)queueName)) {
            this.rabbitMQChannel.exchangeDeclare(this.exchangeName, BuiltinExchangeType.DIRECT, true);
            this.rabbitMQChannel.queueDeclare(this.rabbitMQSinkConfig.getQueueName(), true, false, false, null);
            this.rabbitMQChannel.queueBind(this.rabbitMQSinkConfig.getQueueName(), this.exchangeName, this.defaultRoutingKey);
        } else {
            this.rabbitMQChannel.exchangeDeclare(this.exchangeName, exchangeType, true);
        }
    }

    public void write(Record<byte[]> record) {
        byte[] value = (byte[])record.getValue();
        try {
            String routingKey = (String)record.getProperties().get("routingKey");
            this.rabbitMQChannel.basicPublish(this.exchangeName, StringUtils.isEmpty((CharSequence)routingKey) ? this.defaultRoutingKey : routingKey, null, value);
            record.ack();
        }
        catch (IOException e) {
            record.fail();
            log.warn("Failed to publish the message to RabbitMQ ", (Throwable)e);
        }
    }

    public void close() throws Exception {
        if (this.rabbitMQChannel != null) {
            this.rabbitMQChannel.close();
        }
        if (this.rabbitMQConnection != null) {
            this.rabbitMQConnection.close();
        }
    }
}

