package org.objectweb.proactive.extensions.amqp.remoteobject;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import org.apache.log4j.Logger;
import org.objectweb.proactive.core.ProActiveException;
import org.objectweb.proactive.core.body.reply.Reply;
import org.objectweb.proactive.core.body.request.Request;
import org.objectweb.proactive.core.remoteobject.RemoteRemoteObject;
import org.objectweb.proactive.core.util.converter.ByteToObjectConverter;
import org.objectweb.proactive.core.util.converter.ObjectToByteConverter;
import org.objectweb.proactive.core.util.log.ProActiveLogger;
import org.objectweb.proactive.extensions.amqp.AMQPConfig;

/* loaded from: input_file:org/objectweb/proactive/extensions/amqp/remoteobject/AbstractAMQPRemoteObject.class */
public abstract class AbstractAMQPRemoteObject implements RemoteRemoteObject, Serializable {
    private static final Logger logger = ProActiveLogger.getLogger(AMQPConfig.Loggers.AMQP_REMOTE_OBJECT);
    private final String rpcExchangeName;
    private final long replyTimeout;
    protected final URI remoteObjectURL;
    protected final String queueName;

    public AbstractAMQPRemoteObject(URI uri, String str, long j) throws ProActiveException, IOException {
        this.remoteObjectURL = uri;
        this.queueName = AMQPUtils.computeQueueNameFromURI(uri);
        this.rpcExchangeName = str;
        this.replyTimeout = j;
    }

    protected abstract RpcReusableChannel getRpcReusableChannel() throws IOException;

    protected abstract void checkTargetObjectExists() throws IOException;

    @Override // org.objectweb.proactive.core.remoteobject.RemoteRemoteObject
    public final Reply receiveMessage(Request request) throws IOException, ProActiveException {
        QueueingConsumer.Delivery nextDelivery;
        RpcReusableChannel rpcReusableChannel = getRpcReusableChannel();
        try {
            if (logger.isDebugEnabled()) {
                logger.debug(String.format("AMQP RO sending %s to %s, on exchange %s, queue %s", request.getMethodName(), this.remoteObjectURL, this.rpcExchangeName, this.queueName));
            }
            rpcReusableChannel.getChannel().basicPublish(this.rpcExchangeName, this.queueName, new AMQP.BasicProperties.Builder().replyTo(rpcReusableChannel.getReplyQueue()).build(), ObjectToByteConverter.ProActiveObjectStream.convert(request));
            while (true) {
                nextDelivery = rpcReusableChannel.getReplyQueueConsumer().nextDelivery(this.replyTimeout);
                if (nextDelivery != null) {
                    break;
                }
                checkTargetObjectExists();
            }
            Reply reply = (Reply) ByteToObjectConverter.ProActiveObjectStream.convert(nextDelivery.getBody());
            if (logger.isDebugEnabled()) {
                logger.debug(String.format("AMQP RO received response of message %s to %s, on exchange %s, queue %s", request.getMethodName(), this.remoteObjectURL, this.rpcExchangeName, this.queueName));
            }
            rpcReusableChannel.returnChannel();
            return reply;
        } catch (Throwable th) {
            rpcReusableChannel.close();
            throw new IOException(String.format("AMQP cannot send %s to %s, on exchange %s, queue %s", request.getMethodName(), this.remoteObjectURL, this.rpcExchangeName, this.queueName), th);
        }
    }
}
