package io.elastic.sailor.impl;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import io.elastic.sailor.AmqpService;
import io.elastic.sailor.Constants;
import io.elastic.sailor.MessagePublisher;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/elastic/sailor/impl/MessagePublisherImpl.class */
public class MessagePublisherImpl implements MessagePublisher {
    private static final Logger logger = LoggerFactory.getLogger(MessagePublisherImpl.class);
    private static final int WAIT_FOR_CONFIRM_DURATION = 5000;
    private String publishExchangeName;
    private AmqpService amqpService;
    private int publishMaxRetries;
    private long publishRetryDelay;
    private long publishMaxRetryDelay;
    private Channel publishChannel;

    @Inject
    public MessagePublisherImpl(@Named("ELASTICIO_PUBLISH_MESSAGES_TO") String str, @Named("ELASTICIO_AMQP_PUBLISH_RETRY_ATTEMPTS") int i, @Named("ELASTICIO_AMQP_PUBLISH_RETRY_DELAY") long j, @Named("ELASTICIO_AMQP_PUBLISH_MAX_RETRY_DELAY") long j2, AmqpService amqpService) {
        this.publishExchangeName = str;
        this.publishMaxRetries = i;
        this.publishRetryDelay = j;
        this.publishMaxRetryDelay = j2;
        this.amqpService = amqpService;
    }

    @Override // io.elastic.sailor.MessagePublisher
    public void publish(String str, byte[] bArr, AMQP.BasicProperties basicProperties) {
        logger.info("Pushing to exchange={}, routingKey={}", this.publishExchangeName, str);
        boolean z = true;
        int i = 0;
        while (z) {
            Channel publishChannel = getPublishChannel();
            try {
                publishChannel.basicPublish(this.publishExchangeName, str, getRetryProperties(basicProperties, i), bArr);
                z = !waitForConfirms(publishChannel);
                if (z) {
                    sleep(i + 1);
                    if (i >= this.publishMaxRetries) {
                        throw new IllegalStateException(String.format("Failed to publish the message to a queue after %s retries. The limit of %s retries reached.", Integer.valueOf(i), Integer.valueOf(this.publishMaxRetries)));
                    }
                    i++;
                }
                logger.info("Successfully published data to {}", this.publishExchangeName);
            } catch (IOException e) {
                throw new RuntimeException(String.format("Failed to publish message to exchange %s", this.publishExchangeName), e);
            }
        }
    }

    private AMQP.BasicProperties getRetryProperties(AMQP.BasicProperties basicProperties, int i) {
        if (i < 1) {
            return basicProperties;
        }
        HashMap hashMap = new HashMap();
        hashMap.putAll(basicProperties.getHeaders());
        hashMap.put(Constants.AMQP_HEADER_RETRY, Integer.valueOf(i));
        return basicProperties.builder().headers(hashMap).build();
    }

    private boolean waitForConfirms(Channel channel) {
        logger.info("Waiting for publish confirm");
        try {
            return channel.waitForConfirms(5000L);
        } catch (IllegalStateException e) {
            logger.info("Looks like publisher confirmation was asked on a non-Confirm channel. Please check if the publisher channel was created with publisher confirms enabled.");
            throw e;
        } catch (InterruptedException e2) {
            logger.error("Thread was interrupted while waiting for publisher confirmation");
            throw new RuntimeException(e2);
        } catch (TimeoutException e3) {
            throw new RuntimeException("Waiting for publisher confirmation timed out", e3);
        }
    }

    private void sleep(int i) {
        long calculateSleepDuration = calculateSleepDuration(i);
        logger.info("Published message to {} was not confirmed. Trying again in {} millis.", this.publishExchangeName, Long.valueOf(calculateSleepDuration));
        try {
            Thread.sleep(calculateSleepDuration);
        } catch (InterruptedException e) {
            logger.error("Thread was interrupted while sleeping");
            throw new RuntimeException(e);
        }
    }

    private long calculateSleepDuration(int i) {
        double pow = Math.pow(2.0d, i - 1) * this.publishRetryDelay;
        return pow > ((double) this.publishMaxRetryDelay) ? this.publishMaxRetryDelay : new Double(pow).longValue();
    }

    private Channel getPublishChannel() {
        if (this.publishChannel == null) {
            this.publishChannel = createPublishChannel();
        }
        return this.publishChannel;
    }

    private synchronized Channel createPublishChannel() {
        try {
            if (this.publishChannel == null) {
                this.publishChannel = this.amqpService.getConnection().createChannel();
                this.publishChannel.confirmSelect();
                logger.info("Opened publish channel");
            }
            return this.publishChannel;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
