package io.rtr.conduit.amqp.impl;

import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MetricsCollector;
import io.rtr.conduit.amqp.AMQPConsumerCallback;
import io.rtr.conduit.amqp.AMQPMessageBundle;
import io.rtr.conduit.amqp.AbstractAMQPTransport;
import io.rtr.conduit.amqp.transport.TransportConnectionProperties;
import io.rtr.conduit.amqp.transport.TransportListenProperties;
import io.rtr.conduit.amqp.transport.TransportMessageBundle;
import io.rtr.conduit.amqp.transport.TransportPublishProperties;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/* loaded from: input_file:io/rtr/conduit/amqp/impl/AMQPTransport.class */
public class AMQPTransport extends AbstractAMQPTransport {
    private AMQPConnection conn;
    private boolean hasPrivateConnection;
    private Channel channel;
    static final String POISON = ".poison";

    /* JADX INFO: Access modifiers changed from: protected */
    public AMQPTransport(boolean z, String str, int i, MetricsCollector metricsCollector) {
        this(new AMQPConnection(z, str, i, metricsCollector), true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AMQPTransport(AMQPConnection aMQPConnection) {
        this(aMQPConnection, false);
    }

    private AMQPTransport(AMQPConnection aMQPConnection, boolean z) {
        this.conn = aMQPConnection;
        this.hasPrivateConnection = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Channel getChannel() {
        return this.channel;
    }

    @Override // io.rtr.conduit.amqp.transport.Transport
    protected boolean isConnectedImpl() {
        return this.conn.isConnected();
    }

    @Override // io.rtr.conduit.amqp.transport.Transport
    protected void connectImpl(TransportConnectionProperties transportConnectionProperties) throws IOException {
        if (this.hasPrivateConnection && !isConnected()) {
            try {
                this.conn.connect((AMQPConnectionProperties) transportConnectionProperties);
            } catch (TimeoutException e) {
                throw new IOException("Timed-out waiting for new connection", e);
            }
        }
        if (this.channel == null) {
            this.channel = this.conn.createChannel();
        }
    }

    @Override // io.rtr.conduit.amqp.transport.Transport
    protected void closeImpl() throws IOException {
        if (!this.hasPrivateConnection) {
            stop();
        } else if (this.conn.isConnected()) {
            this.conn.disconnect();
        }
    }

    @Override // io.rtr.conduit.amqp.AbstractAMQPTransport
    protected AMQPQueueConsumer getConsumer(Object obj, AMQPCommonListenProperties aMQPCommonListenProperties, String str) {
        return new AMQPQueueConsumer(getChannel(), (AMQPConsumerCallback) obj, aMQPCommonListenProperties.getThreshold(), str, aMQPCommonListenProperties.isPoisonQueueEnabled());
    }

    @Override // io.rtr.conduit.amqp.AbstractAMQPTransport
    protected AMQPCommonListenProperties getCommonListenProperties(TransportListenProperties transportListenProperties) {
        return (AMQPListenProperties) transportListenProperties;
    }

    @Override // io.rtr.conduit.amqp.AbstractAMQPTransport
    protected Object getConsumerCallback(TransportListenProperties transportListenProperties) {
        return ((AMQPListenProperties) transportListenProperties).getCallback();
    }

    @Override // io.rtr.conduit.amqp.transport.Transport
    protected void listenImpl(TransportListenProperties transportListenProperties) throws IOException {
        AMQPCommonListenProperties commonListenProperties = getCommonListenProperties(transportListenProperties);
        String queue = commonListenProperties.getQueue();
        String poisonPrefix = commonListenProperties.getPoisonPrefix();
        if (commonListenProperties.isDynamicQueueCreation()) {
            queue = createDynamicQueue(commonListenProperties.getExchange(), commonListenProperties.getDynamicQueueRoutingKey(), commonListenProperties.isPoisonQueueEnabled());
            poisonPrefix = "." + queue;
        } else if (commonListenProperties.isAutoCreateAndBind()) {
            autoCreateAndBind(commonListenProperties.getExchange(), commonListenProperties.getExchangeType(), commonListenProperties.getQueue(), commonListenProperties.getRoutingKey(), commonListenProperties.isPoisonQueueEnabled());
        }
        if (commonListenProperties.shouldPurgeOnConnect()) {
            this.channel.queuePurge(queue);
        }
        AMQPQueueConsumer consumer = getConsumer(getConsumerCallback(transportListenProperties), commonListenProperties, poisonPrefix);
        getChannel().basicQos(commonListenProperties.getPrefetchCount());
        getChannel().basicConsume(queue, false, consumer);
    }

    protected String createDynamicQueue(String str, String str2, boolean z) throws IOException {
        String queue = this.channel.queueDeclare().getQueue();
        this.channel.queueBind(queue, str, str2);
        if (z) {
            String str3 = ".poison." + queue;
            this.channel.queueDeclare(str3, true, true, true, new HashMap());
            this.channel.queueBind(str3, str, str2 + "." + queue + POISON);
        }
        return queue;
    }

    void autoCreateAndBind(String str, String str2, String str3, String str4, boolean z) throws IOException {
        this.channel.exchangeDeclare(str, str2, true);
        this.channel.queueDeclare(str3, true, false, false, (Map) null);
        this.channel.queueBind(str3, str, str4);
        if (z) {
            String str5 = str3 + POISON;
            this.channel.queueDeclare(str5, true, false, false, (Map) null);
            this.channel.queueBind(str5, str, str4 + POISON);
        }
    }

    @Override // io.rtr.conduit.amqp.transport.Transport
    protected void stopImpl() throws IOException {
        if (this.channel != null && this.channel.isOpen()) {
            try {
                this.channel.close();
            } catch (AlreadyClosedException e) {
            } catch (TimeoutException e2) {
                throw new IOException("Timed-out closing connection", e2);
            }
        }
        if (this.hasPrivateConnection) {
            this.conn.stopListening();
        }
    }

    @Override // io.rtr.conduit.amqp.transport.Transport
    protected boolean isStoppedImpl(int i) throws InterruptedException {
        return this.hasPrivateConnection ? this.conn.waitToStopListening(Duration.ofMillis(i)) : this.channel == null || !this.channel.isOpen();
    }

    @Override // io.rtr.conduit.amqp.transport.Transport
    protected boolean publishImpl(TransportMessageBundle transportMessageBundle, TransportPublishProperties transportPublishProperties) throws IOException, TimeoutException, InterruptedException {
        AMQPPublishProperties aMQPPublishProperties = (AMQPPublishProperties) transportPublishProperties;
        AMQPMessageBundle aMQPMessageBundle = (AMQPMessageBundle) transportMessageBundle;
        if (aMQPPublishProperties.isConfirmEnabled()) {
            this.channel.confirmSelect();
        }
        this.channel.basicPublish(aMQPPublishProperties.getExchange(), aMQPPublishProperties.getRoutingKey(), aMQPMessageBundle.getBasicProperties(), aMQPMessageBundle.getBody());
        if (aMQPPublishProperties.isConfirmEnabled()) {
            return this.channel.waitForConfirms(aMQPPublishProperties.getTimeout());
        }
        return true;
    }

    @Override // io.rtr.conduit.amqp.transport.Transport
    protected <E> boolean transactionalPublishImpl(Collection<E> collection, TransportPublishProperties transportPublishProperties) throws IOException, TimeoutException, InterruptedException {
        this.channel.txSelect();
        try {
            Iterator<E> it = collection.iterator();
            while (it.hasNext()) {
                if (!publishImpl((AMQPMessageBundle) it.next(), transportPublishProperties)) {
                    return false;
                }
            }
            if (0 != 0) {
                this.channel.txRollback();
                return true;
            }
            this.channel.txCommit();
            return true;
        } finally {
            if (1 != 0) {
                this.channel.txRollback();
            } else {
                this.channel.txCommit();
            }
        }
    }

    void setChannel(Channel channel) {
        this.channel = channel;
    }

    void setConnection(AMQPConnection aMQPConnection) {
        this.conn = aMQPConnection;
    }
}
