package org.apache.asterix.messaging;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.config.MessagingProperties;
import org.apache.asterix.common.memory.ConcurrentFramePool;
import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.hyracks.api.comm.IChannelControlBlock;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.nc.NodeControllerService;

/* loaded from: input_file:org/apache/asterix/messaging/NCMessageBroker.class */
public class NCMessageBroker implements INCMessageBroker {
    private static final Logger LOGGER = Logger.getLogger(NCMessageBroker.class.getName());
    private final NodeControllerService ncs;
    private final IAppRuntimeContext appContext;
    private final LinkedBlockingQueue<IApplicationMessage> receivedMsgsQ = new LinkedBlockingQueue<>();
    private final ConcurrentFramePool messagingFramePool;
    private final int maxMsgSize;

    /* loaded from: input_file:org/apache/asterix/messaging/NCMessageBroker$MessageDeliveryService.class */
    private class MessageDeliveryService implements Runnable {
        private MessageDeliveryService() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                IMessage iMessage = null;
                try {
                    iMessage = (IApplicationMessage) NCMessageBroker.this.receivedMsgsQ.take();
                    NCMessageBroker.this.receivedMessage(iMessage, null);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (Exception e2) {
                    if (NCMessageBroker.LOGGER.isLoggable(Level.WARNING) && iMessage != null) {
                        NCMessageBroker.LOGGER.log(Level.WARNING, "Could not process message : " + iMessage, (Throwable) e2);
                    } else if (NCMessageBroker.LOGGER.isLoggable(Level.WARNING)) {
                        NCMessageBroker.LOGGER.log(Level.WARNING, "Could not process message", (Throwable) e2);
                    }
                }
            }
        }
    }

    public NCMessageBroker(NodeControllerService nodeControllerService, MessagingProperties messagingProperties) {
        this.ncs = nodeControllerService;
        this.appContext = (IAppRuntimeContext) nodeControllerService.getApplicationContext();
        this.maxMsgSize = messagingProperties.getFrameSize();
        this.messagingFramePool = new ConcurrentFramePool(nodeControllerService.getId(), messagingProperties.getFrameSize() * messagingProperties.getFrameCount(), messagingProperties.getFrameSize());
        this.appContext.getThreadExecutor().execute(new MessageDeliveryService());
    }

    public void sendMessageToCC(IApplicationMessage iApplicationMessage) throws Exception {
        this.ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(iApplicationMessage), (DeploymentId) null);
    }

    public void sendMessageToNC(String str, IApplicationMessage iApplicationMessage) throws Exception {
        sendMessageToChannel(this.ncs.getMessagingNetworkManager().getMessagingChannel(str), iApplicationMessage);
    }

    public void queueReceivedMessage(IApplicationMessage iApplicationMessage) {
        this.receivedMsgsQ.offer(iApplicationMessage);
    }

    public void receivedMessage(IMessage iMessage, String str) throws Exception {
        IApplicationMessage iApplicationMessage = (IApplicationMessage) iMessage;
        if (LOGGER.isLoggable(Level.INFO)) {
            LOGGER.info("Received message: " + iApplicationMessage);
        }
        iApplicationMessage.handle(this.ncs);
    }

    public ConcurrentFramePool getMessagingFramePool() {
        return this.messagingFramePool;
    }

    private void sendMessageToChannel(IChannelControlBlock iChannelControlBlock, IApplicationMessage iApplicationMessage) throws IOException {
        byte[] serialize = JavaSerializationUtils.serialize(iApplicationMessage);
        if (serialize.length > this.maxMsgSize) {
            throw new HyracksDataException("Message exceded maximum size");
        }
        ByteBuffer byteBuffer = this.messagingFramePool.get();
        if (byteBuffer == null) {
            throw new HyracksDataException("Could not get an empty buffer");
        }
        byteBuffer.clear();
        byteBuffer.put(serialize);
        byteBuffer.flip();
        iChannelControlBlock.getWriteInterface().getFullBufferAcceptor().accept(byteBuffer);
    }
}
