/*
 * Decompiled with CFR 0.152.
 */
package io.appform.dropwizard.actors.base;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import io.appform.dropwizard.actors.actor.ActorConfig;
import io.appform.dropwizard.actors.actor.DelayType;
import io.appform.dropwizard.actors.base.utils.NamingUtils;
import io.appform.dropwizard.actors.connectivity.RMQConnection;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnmanagedPublisher<Message> {
    private static final Logger log = LoggerFactory.getLogger(UnmanagedPublisher.class);
    private final String name;
    private final ActorConfig config;
    private final RMQConnection connection;
    private final ObjectMapper mapper;
    private final String queueName;
    private Channel publishChannel;

    public UnmanagedPublisher(String name, ActorConfig config, RMQConnection connection, ObjectMapper mapper) {
        this.name = NamingUtils.prefixWithNamespace(name);
        this.config = config;
        this.connection = connection;
        this.mapper = mapper;
        this.queueName = NamingUtils.queueName(config.getPrefix(), name);
    }

    public final void publishWithDelay(Message message, long delayMilliseconds) throws Exception {
        log.info("Publishing message to exchange with delay: {}", (Object)delayMilliseconds);
        if (!this.config.isDelayed()) {
            log.warn("Publishing delayed message to non-delayed queue queue:{}", (Object)this.queueName);
        }
        if (this.config.getDelayType() == DelayType.TTL) {
            this.publishChannel.basicPublish(this.ttlExchange(this.config), this.queueName, new AMQP.BasicProperties.Builder().expiration(String.valueOf(delayMilliseconds)).deliveryMode(Integer.valueOf(2)).build(), this.mapper().writeValueAsBytes(message));
        } else {
            this.publish(message, new AMQP.BasicProperties.Builder().headers(Collections.singletonMap("x-delay", delayMilliseconds)).deliveryMode(Integer.valueOf(2)).build());
        }
    }

    public final void publish(Message message) throws Exception {
        this.publish(message, MessageProperties.MINIMAL_PERSISTENT_BASIC);
    }

    public final void publish(Message message, AMQP.BasicProperties properties) throws Exception {
        String routingKey = this.config.isSharded() ? NamingUtils.getShardedQueueName(this.queueName, this.getShardId()) : this.queueName;
        this.publishChannel.basicPublish(this.config.getExchange(), routingKey, properties, this.mapper().writeValueAsBytes(message));
    }

    private final int getShardId() {
        return RandomUtils.nextInt((int)0, (int)this.config.getShardCount());
    }

    public final long pendingMessagesCount() {
        try {
            if (this.config.isSharded()) {
                long messageCount = 0L;
                for (int i = 0; i < this.config.getShardCount(); ++i) {
                    String shardedQueueName = NamingUtils.getShardedQueueName(this.queueName, i);
                    messageCount += this.publishChannel.messageCount(shardedQueueName);
                }
                return messageCount;
            }
            return this.publishChannel.messageCount(this.queueName);
        }
        catch (IOException e) {
            log.error("Issue getting message count. Will return max", (Throwable)e);
            return Long.MAX_VALUE;
        }
    }

    public final long pendingSidelineMessagesCount() {
        try {
            return this.publishChannel.messageCount(this.queueName + "_SIDELINE");
        }
        catch (IOException e) {
            log.error("Issue getting message count. Will return max", (Throwable)e);
            return Long.MAX_VALUE;
        }
    }

    public void start() throws Exception {
        String exchange = this.config.getExchange();
        String dlx = this.config.getExchange() + "_SIDELINE";
        if (this.config.isDelayed()) {
            this.ensureDelayedExchange(exchange);
        } else {
            this.ensureExchange(exchange);
        }
        this.ensureExchange(dlx);
        this.publishChannel = this.connection.newChannel();
        this.connection.ensure(this.queueName + "_SIDELINE", this.queueName, dlx, this.connection.rmqOpts(this.config));
        if (this.config.isSharded()) {
            int bound = this.config.getShardCount();
            for (int shardId = 0; shardId < bound; ++shardId) {
                this.connection.ensure(NamingUtils.getShardedQueueName(this.queueName, shardId), this.config.getExchange(), this.connection.rmqOpts(dlx, this.config));
            }
        } else {
            this.connection.ensure(this.queueName, this.config.getExchange(), this.connection.rmqOpts(dlx, this.config));
        }
        if (this.config.getDelayType() == DelayType.TTL) {
            this.connection.ensure(this.ttlQueue(this.queueName), this.queueName, this.ttlExchange(this.config), this.connection.rmqOpts(exchange, this.config));
        }
    }

    private void ensureExchange(String exchange) throws IOException {
        this.connection.channel().exchangeDeclare(exchange, "direct", true, false, (Map)ImmutableMap.builder().put((Object)"x-ha-policy", (Object)"all").put((Object)"ha-mode", (Object)"all").build());
        log.info("Created exchange: {}", (Object)exchange);
    }

    private void ensureDelayedExchange(String exchange) throws IOException {
        if (this.config.getDelayType() == DelayType.TTL) {
            this.ensureExchange(this.ttlExchange(this.config));
        } else {
            this.connection.channel().exchangeDeclare(exchange, "x-delayed-message", true, false, (Map)ImmutableMap.builder().put((Object)"x-ha-policy", (Object)"all").put((Object)"ha-mode", (Object)"all").put((Object)"x-delayed-type", (Object)"direct").build());
            log.info("Created delayed exchange: {}", (Object)exchange);
        }
    }

    private String ttlExchange(ActorConfig actorConfig) {
        return String.format("%s_TTL", actorConfig.getExchange());
    }

    private String ttlQueue(String queueName) {
        return String.format("%s_TTL", queueName);
    }

    public void stop() throws Exception {
        try {
            this.publishChannel.close();
            log.info("Publisher channel closed for [{}] with prefix [{}]", (Object)this.name, (Object)this.config.getPrefix());
        }
        catch (Exception e) {
            log.error(String.format("Error closing publisher channel for [%s] with prefix [%s]", this.name, this.config.getPrefix()), (Throwable)e);
            throw e;
        }
    }

    protected final RMQConnection connection() {
        return this.connection;
    }

    protected final ObjectMapper mapper() {
        return this.mapper;
    }
}

