/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConnectionFactoryConfigurator;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.Header;
import io.debezium.server.BaseChangeConsumer;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Named;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named(value="rabbitmq")
@Dependent
public class RabbitMqStreamChangeConsumer
extends BaseChangeConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMqStreamChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.rabbitmq.";
    private static final String PROP_CONNECTION_PREFIX = "debezium.sink.rabbitmq.connection.";
    @ConfigProperty(name="debezium.sink.rabbitmq.exchange", defaultValue="")
    Optional<String> exchange;
    @ConfigProperty(name="debezium.sink.rabbitmq.routingKey", defaultValue="")
    Optional<String> routingKey;
    @ConfigProperty(name="debezium.sink.rabbitmq.autoCreateRoutingKey", defaultValue="false")
    Boolean autoCreateRoutingKey;
    @ConfigProperty(name="debezium.sink.rabbitmq.routingKeyDurable", defaultValue="true")
    Boolean routingKeyDurable;
    @ConfigProperty(name="debezium.sink.rabbitmq.routingKeyFromTopicName", defaultValue="false")
    Boolean routingKeyFromTopicName;
    @ConfigProperty(name="debezium.sink.rabbitmq.deliveryMode", defaultValue="2")
    int deliveryMode;
    @ConfigProperty(name="debezium.sink.rabbitmq.ackTimeout", defaultValue="30000")
    int ackTimeout;
    @ConfigProperty(name="debezium.sink.rabbitmq.null.value", defaultValue="default")
    String nullValue;
    Connection connection;
    Channel channel;

    @PostConstruct
    void connect() {
        Config config = ConfigProvider.getConfig();
        ConnectionFactory factory = new ConnectionFactory();
        Map<String, String> configProperties = this.getConfigSubset(config, PROP_CONNECTION_PREFIX).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue() == null ? null : entry.getValue().toString()));
        ConnectionFactoryConfigurator.load((ConnectionFactory)factory, configProperties, (String)"");
        LOGGER.info("Using connection to {}:{}", (Object)factory.getHost(), (Object)factory.getPort());
        try {
            this.connection = factory.newConnection();
            this.channel = this.connection.createChannel();
            this.channel.confirmSelect();
            if (!this.routingKeyFromTopicName.booleanValue() && this.autoCreateRoutingKey.booleanValue()) {
                String routingKeyName = this.routingKey.orElse("");
                LOGGER.info("Creating queue for routing key named '{}'", (Object)routingKeyName);
                this.channel.queueDeclare(routingKeyName, this.routingKeyDurable.booleanValue(), false, false, null);
            }
        }
        catch (IOException | TimeoutException e) {
            throw new DebeziumException((Throwable)e);
        }
    }

    @PreDestroy
    void close() {
        try {
            if (this.channel != null) {
                this.channel.close();
            }
            if (this.connection != null) {
                this.connection.close();
            }
        }
        catch (IOException | TimeoutException e) {
            throw new DebeziumException((Throwable)e);
        }
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer) throws InterruptedException {
        for (ChangeEvent<Object, Object> record : records) {
            LOGGER.trace("Received event '{}'", record);
            String routingKeyName = this.routingKey.orElse(this.routingKeyFromTopicName != false ? this.streamNameMapper.map(record.destination()) : "");
            String exchangeName = this.exchange.orElse(this.streamNameMapper.map(record.destination()));
            try {
                if (this.routingKeyFromTopicName.booleanValue() && this.autoCreateRoutingKey.booleanValue()) {
                    LOGGER.trace("Creating queue for routing key named '{}'", (Object)routingKeyName);
                    this.channel.queueDeclare(routingKeyName, this.routingKeyDurable.booleanValue(), false, false, null);
                }
                Object value = record.value() != null ? record.value() : this.nullValue;
                this.channel.basicPublish(exchangeName, routingKeyName, new AMQP.BasicProperties.Builder().deliveryMode(Integer.valueOf(this.deliveryMode)).headers(this.convertRabbitMqHeaders(record)).build(), this.getBytes(value));
            }
            catch (IOException e) {
                throw new DebeziumException((Throwable)e);
            }
            committer.markProcessed(record);
        }
        try {
            this.channel.waitForConfirmsOrDie((long)this.ackTimeout);
        }
        catch (IOException | TimeoutException e) {
            throw new DebeziumException((Throwable)e);
        }
        LOGGER.trace("Sent messages");
        committer.markBatchFinished();
    }

    public boolean supportsTombstoneEvents() {
        return false;
    }

    private Map<String, Object> convertRabbitMqHeaders(ChangeEvent<Object, Object> record) {
        List headers = record.headers();
        HashMap<String, Object> rabbitMqHeaders = new HashMap<String, Object>();
        for (Header header : headers) {
            rabbitMqHeaders.put(header.getKey(), header.getValue());
        }
        return rabbitMqHeaders;
    }
}

