package org.apache.asterix.messaging;

import io.netty.util.collection.LongObjectHashMap;
import io.netty.util.collection.LongObjectMap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.config.MessagingProperties;
import org.apache.asterix.common.memory.ConcurrentFramePool;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
import org.apache.asterix.common.messaging.api.MessageFuture;
import org.apache.hyracks.api.comm.IChannelControlBlock;
import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/asterix/messaging/NCMessageBroker.class */
public class NCMessageBroker implements INCMessageBroker {
    private static final Logger LOGGER = LogManager.getLogger();
    private final NodeControllerService ncs;
    private final INcApplicationContext appContext;
    private final ConcurrentFramePool messagingFramePool;
    private final int maxMsgSize;
    private final LinkedBlockingQueue<INcAddressedMessage> receivedMsgsQ = new LinkedBlockingQueue<>();
    private final AtomicLong futureIdGenerator = new AtomicLong();
    private final LongObjectMap<MessageFuture> futureMap = new LongObjectHashMap();

    /* loaded from: input_file:org/apache/asterix/messaging/NCMessageBroker$MessageDeliveryService.class */
    private class MessageDeliveryService implements Runnable {
        private MessageDeliveryService() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                IMessage iMessage = null;
                try {
                    iMessage = (INcAddressedMessage) NCMessageBroker.this.receivedMsgsQ.take();
                    NCMessageBroker.this.receivedMessage(iMessage, null);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (Exception e2) {
                    if (NCMessageBroker.LOGGER.isWarnEnabled() && iMessage != null) {
                        NCMessageBroker.LOGGER.log(Level.WARN, "Could not process message : " + iMessage, e2);
                    } else if (NCMessageBroker.LOGGER.isWarnEnabled()) {
                        NCMessageBroker.LOGGER.log(Level.WARN, "Could not process message", e2);
                    }
                }
            }
        }
    }

    public NCMessageBroker(NodeControllerService nodeControllerService, MessagingProperties messagingProperties) {
        this.ncs = nodeControllerService;
        this.appContext = (INcApplicationContext) nodeControllerService.getApplicationContext();
        this.maxMsgSize = messagingProperties.getFrameSize();
        this.messagingFramePool = new ConcurrentFramePool(nodeControllerService.getId(), messagingProperties.getFrameSize() * messagingProperties.getFrameCount(), messagingProperties.getFrameSize());
        this.appContext.getThreadExecutor().execute(new MessageDeliveryService());
    }

    public void sendMessageToCC(CcId ccId, ICcAddressedMessage iCcAddressedMessage) throws Exception {
        this.ncs.sendApplicationMessageToCC(ccId, JavaSerializationUtils.serialize(iCcAddressedMessage), (DeploymentId) null);
    }

    public void sendMessageToPrimaryCC(ICcAddressedMessage iCcAddressedMessage) throws Exception {
        sendMessageToCC(this.ncs.getPrimaryCcId(), iCcAddressedMessage);
    }

    public void sendMessageToNC(String str, INcAddressedMessage iNcAddressedMessage) throws Exception {
        sendMessageToChannel(this.ncs.getMessagingNetworkManager().getMessagingChannel(str), iNcAddressedMessage);
    }

    public void queueReceivedMessage(INcAddressedMessage iNcAddressedMessage) {
        this.receivedMsgsQ.offer(iNcAddressedMessage);
    }

    public void receivedMessage(IMessage iMessage, String str) throws Exception {
        INcAddressedMessage iNcAddressedMessage = (INcAddressedMessage) iMessage;
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Received message: " + iNcAddressedMessage);
        }
        this.ncs.getExecutor().submit(() -> {
            try {
                iNcAddressedMessage.handle(this.appContext);
            } catch (Exception e) {
                LOGGER.log(Level.WARN, "Could not process message: {}", iMessage, e);
            }
        });
    }

    public ConcurrentFramePool getMessagingFramePool() {
        return this.messagingFramePool;
    }

    private void sendMessageToChannel(IChannelControlBlock iChannelControlBlock, INcAddressedMessage iNcAddressedMessage) throws IOException {
        byte[] serialize = JavaSerializationUtils.serialize(iNcAddressedMessage);
        if (serialize.length > this.maxMsgSize) {
            throw new HyracksDataException("Message exceeded maximum size");
        }
        ByteBuffer byteBuffer = this.messagingFramePool.get();
        if (byteBuffer == null) {
            throw new HyracksDataException("Could not get an empty buffer");
        }
        byteBuffer.clear();
        byteBuffer.put(serialize);
        byteBuffer.flip();
        iChannelControlBlock.getWriteInterface().getFullBufferAcceptor().accept(byteBuffer);
    }

    public MessageFuture registerMessageFuture() {
        long incrementAndGet = this.futureIdGenerator.incrementAndGet();
        MessageFuture messageFuture = new MessageFuture(incrementAndGet);
        synchronized (this.futureMap) {
            if (this.futureMap.containsKey(incrementAndGet)) {
                throw new IllegalStateException();
            }
            this.futureMap.put(incrementAndGet, messageFuture);
        }
        return messageFuture;
    }

    public MessageFuture deregisterMessageFuture(long j) {
        MessageFuture messageFuture;
        synchronized (this.futureMap) {
            messageFuture = (MessageFuture) this.futureMap.remove(j);
        }
        return messageFuture;
    }
}
