package io.bitcoinsv.jcl.net.protocol.handlers.message;

import io.bitcoinsv.jcl.net.network.PeerAddress;
import io.bitcoinsv.jcl.net.network.events.DisablePeerBigMessagesRequest;
import io.bitcoinsv.jcl.net.network.events.DisconnectPeerRequest;
import io.bitcoinsv.jcl.net.network.events.EnablePeerBigMessagesRequest;
import io.bitcoinsv.jcl.net.network.events.NetStartEvent;
import io.bitcoinsv.jcl.net.network.events.NetStopEvent;
import io.bitcoinsv.jcl.net.network.events.PeerDisconnectedEvent;
import io.bitcoinsv.jcl.net.network.events.PeerNIOStreamConnectedEvent;
import io.bitcoinsv.jcl.net.network.streams.StreamDataEvent;
import io.bitcoinsv.jcl.net.network.streams.StreamErrorEvent;
import io.bitcoinsv.jcl.net.protocol.events.control.BroadcastMsgBodyRequest;
import io.bitcoinsv.jcl.net.protocol.events.control.BroadcastMsgRequest;
import io.bitcoinsv.jcl.net.protocol.events.control.PeerMsgReadyEvent;
import io.bitcoinsv.jcl.net.protocol.events.control.SendMsgBodyRequest;
import io.bitcoinsv.jcl.net.protocol.events.control.SendMsgListRequest;
import io.bitcoinsv.jcl.net.protocol.events.control.SendMsgRequest;
import io.bitcoinsv.jcl.net.protocol.events.data.MsgReceivedEvent;
import io.bitcoinsv.jcl.net.protocol.events.data.MsgSentEvent;
import io.bitcoinsv.jcl.net.protocol.messages.common.BitcoinMsg;
import io.bitcoinsv.jcl.net.protocol.messages.common.BitcoinMsgBuilder;
import io.bitcoinsv.jcl.net.protocol.messages.common.Message;
import io.bitcoinsv.jcl.net.protocol.serialization.common.MsgSerializersFactory;
import io.bitcoinsv.jcl.net.protocol.streams.MessageStream;
import io.bitcoinsv.jcl.net.protocol.streams.deserializer.Deserializer;
import io.bitcoinsv.jcl.net.protocol.streams.deserializer.DeserializerStream;
import io.bitcoinsv.jcl.tools.config.RuntimeConfig;
import io.bitcoinsv.jcl.tools.handlers.HandlerImpl;
import io.bitcoinsv.jcl.tools.log.LoggerUtil;
import io.bitcoinsv.jcl.tools.thread.ThreadUtils;
import java.math.BigInteger;
import java.util.concurrent.ExecutorService;

/* loaded from: input_file:io/bitcoinsv/jcl/net/protocol/handlers/message/MessageHandlerImpl.class */
public class MessageHandlerImpl extends HandlerImpl<PeerAddress, MessagePeerInfo> implements MessageHandler {
    private LoggerUtil logger;
    private MessageHandlerConfig config;
    private MessageHandlerState state;
    private Deserializer deserializer;
    private ExecutorService dedicateConnsExecutor;

    public MessageHandlerImpl(String str, RuntimeConfig runtimeConfig, MessageHandlerConfig messageHandlerConfig) {
        super(str, runtimeConfig);
        this.state = MessageHandlerState.builder().build();
        this.config = messageHandlerConfig;
        this.logger = new LoggerUtil(str, MessageHandler.HANDLER_ID, getClass());
        this.deserializer = Deserializer.getInstance(runtimeConfig, messageHandlerConfig.getDeserializerConfig());
        if (messageHandlerConfig.isRawTxsEnabled()) {
            MsgSerializersFactory.enableRawSerializers();
        }
        this.dedicateConnsExecutor = ThreadUtils.getFixedThreadExecutorService("JclDeserializer", messageHandlerConfig.getMaxNumberDedicatedConnections());
    }

    private void registerForEvents() {
        this.eventBus.subscribe(NetStartEvent.class, event -> {
            onNetStart((NetStartEvent) event);
        });
        this.eventBus.subscribe(NetStopEvent.class, event2 -> {
            onNetStop((NetStopEvent) event2);
        });
        this.eventBus.subscribe(SendMsgRequest.class, event3 -> {
            onSendMsgReq((SendMsgRequest) event3);
        });
        this.eventBus.subscribe(SendMsgBodyRequest.class, event4 -> {
            onSendMsgBodyReq((SendMsgBodyRequest) event4);
        });
        this.eventBus.subscribe(SendMsgListRequest.class, event5 -> {
            onSendMsgListReq((SendMsgListRequest) event5);
        });
        this.eventBus.subscribe(BroadcastMsgRequest.class, event6 -> {
            onBroadcastReq((BroadcastMsgRequest) event6);
        });
        this.eventBus.subscribe(BroadcastMsgBodyRequest.class, event7 -> {
            onBroadcastReq((BroadcastMsgBodyRequest) event7);
        });
        this.eventBus.subscribe(PeerNIOStreamConnectedEvent.class, event8 -> {
            onPeerStreamConnected((PeerNIOStreamConnectedEvent) event8);
        });
        this.eventBus.subscribe(PeerDisconnectedEvent.class, event9 -> {
            onPeerDisconnected((PeerDisconnectedEvent) event9);
        });
        this.eventBus.subscribe(EnablePeerBigMessagesRequest.class, event10 -> {
            onEnablePeerBigMessages((EnablePeerBigMessagesRequest) event10);
        });
        this.eventBus.subscribe(DisablePeerBigMessagesRequest.class, event11 -> {
            onDisablePeerBigMessages((DisablePeerBigMessagesRequest) event11);
        });
    }

    private void onNetStart(NetStartEvent netStartEvent) {
        this.logger.debug("Starting...");
    }

    private void onNetStop(NetStopEvent netStopEvent) {
        this.logger.debug("Stop.");
    }

    private void onSendMsgReq(SendMsgRequest sendMsgRequest) {
        send(sendMsgRequest.getPeerAddress(), sendMsgRequest.getBtcMsg());
    }

    private void onSendMsgBodyReq(SendMsgBodyRequest sendMsgBodyRequest) {
        send(sendMsgBodyRequest.getPeerAddress(), sendMsgBodyRequest.getMsgBody());
    }

    private void onSendMsgListReq(SendMsgListRequest sendMsgListRequest) {
        PeerAddress peerAddress = sendMsgListRequest.getPeerAddress();
        sendMsgListRequest.getBtcMsgs().forEach(bitcoinMsg -> {
            send(peerAddress, (BitcoinMsg<?>) bitcoinMsg);
        });
    }

    private void onBroadcastReq(BroadcastMsgRequest broadcastMsgRequest) {
        broadcast(broadcastMsgRequest.getBtcMsg());
    }

    private void onBroadcastReq(BroadcastMsgBodyRequest broadcastMsgBodyRequest) {
        broadcast(broadcastMsgBodyRequest.getMsgBody());
    }

    private void onPeerStreamConnected(PeerNIOStreamConnectedEvent peerNIOStreamConnectedEvent) {
        PeerAddress peerAddress = peerNIOStreamConnectedEvent.getStream().getPeerAddress();
        MessageStream messageStream = new MessageStream(this.eventBus.getExecutor(), this.runtimeConfig, this.config.getBasicConfig(), this.deserializer, peerNIOStreamConnectedEvent.getStream(), this.dedicateConnsExecutor);
        messageStream.init();
        messageStream.input().onData(streamDataEvent -> {
            onStreamMsgReceived(peerAddress, (BitcoinMsg) streamDataEvent.getData());
        });
        messageStream.input().onClose(streamCloseEvent -> {
            onStreamClosed(peerAddress);
        });
        messageStream.input().onError(streamErrorEvent -> {
            onStreamError(peerAddress, streamErrorEvent);
        });
        if (this.config.getPreSerializer() != null) {
            ((DeserializerStream) messageStream.input()).setPreSerializer(this.config.getPreSerializer());
        }
        this.handlerInfo.put(peerNIOStreamConnectedEvent.getStream().getPeerAddress(), new MessagePeerInfo(messageStream));
        this.eventBus.publish(new PeerMsgReadyEvent(messageStream));
        this.logger.trace(peerNIOStreamConnectedEvent.getStream().getPeerAddress(), " Peer Stream Connected");
    }

    private void onPeerDisconnected(PeerDisconnectedEvent peerDisconnectedEvent) {
        this.handlerInfo.remove(peerDisconnectedEvent.getPeerAddress());
    }

    private void onStreamMsgReceived(PeerAddress peerAddress, BitcoinMsg<?> bitcoinMsg) {
        this.logger.trace(peerAddress, bitcoinMsg.getHeader().getCommand().toUpperCase() + " Msg received.");
        String findErrorInMsg = findErrorInMsg(bitcoinMsg);
        if (findErrorInMsg != null) {
            this.logger.trace(peerAddress, " ERROR In incoming msg :: " + findErrorInMsg);
            this.eventBus.publish(new DisconnectPeerRequest(peerAddress, findErrorInMsg));
        } else {
            this.eventBus.publish(EventFactory.buildIncomingEvent(peerAddress, bitcoinMsg));
            this.eventBus.publish(new MsgReceivedEvent(peerAddress, bitcoinMsg));
            updateState(1L, 0L);
        }
    }

    private void onStreamClosed(PeerAddress peerAddress) {
        this.handlerInfo.remove(peerAddress);
    }

    private void onStreamError(PeerAddress peerAddress, StreamErrorEvent streamErrorEvent) {
        this.logger.trace(peerAddress, "Error detected in Stream, requesting disconnection... ");
        this.eventBus.publish(new DisconnectPeerRequest(peerAddress));
    }

    private void onEnablePeerBigMessages(EnablePeerBigMessagesRequest enablePeerBigMessagesRequest) {
        MessagePeerInfo messagePeerInfo = (MessagePeerInfo) this.handlerInfo.get(enablePeerBigMessagesRequest.getPeerAddress());
        if (messagePeerInfo != null) {
            ((DeserializerStream) messagePeerInfo.getStream().input()).upgradeBufferSize();
        }
    }

    private void onDisablePeerBigMessages(DisablePeerBigMessagesRequest disablePeerBigMessagesRequest) {
        MessagePeerInfo messagePeerInfo = (MessagePeerInfo) this.handlerInfo.get(disablePeerBigMessagesRequest.getPeerAddress());
        if (messagePeerInfo != null) {
            ((DeserializerStream) messagePeerInfo.getStream().input()).resetBufferSize();
        }
    }

    @Override // io.bitcoinsv.jcl.tools.handlers.HandlerImpl, io.bitcoinsv.jcl.tools.handlers.Handler
    public void init() {
        registerForEvents();
    }

    @Override // io.bitcoinsv.jcl.net.protocol.handlers.message.MessageHandler
    public void send(PeerAddress peerAddress, BitcoinMsg<?> bitcoinMsg) {
        if (!this.handlerInfo.containsKey(peerAddress)) {
            this.logger.trace(peerAddress, " Request to Send Msg Discarded (unknown Peer)");
            return;
        }
        ((MessagePeerInfo) this.handlerInfo.get(peerAddress)).getStream().output().send(new StreamDataEvent<>(bitcoinMsg));
        this.logger.trace(peerAddress.toString() + " :: " + bitcoinMsg.getHeader().getCommand() + " Msg sent.");
        this.eventBus.publish(EventFactory.buildOutcomingEvent(peerAddress, bitcoinMsg));
        this.eventBus.publish(new MsgSentEvent(peerAddress, bitcoinMsg));
        updateState(0L, 1L);
    }

    @Override // io.bitcoinsv.jcl.net.protocol.handlers.message.MessageHandler
    public void send(PeerAddress peerAddress, Message message) {
        if (this.handlerInfo.containsKey(peerAddress)) {
            send(peerAddress, new BitcoinMsgBuilder(this.config.getBasicConfig(), message).build());
        } else {
            this.logger.trace(peerAddress, " Request to Send Msg Body Discarded (unknown Peer)");
        }
    }

    @Override // io.bitcoinsv.jcl.net.protocol.handlers.message.MessageHandler
    public void broadcast(BitcoinMsg<?> bitcoinMsg) {
        this.handlerInfo.values().forEach(messagePeerInfo -> {
            messagePeerInfo.getStream().output().send(new StreamDataEvent<>(bitcoinMsg));
        });
        updateState(0L, this.handlerInfo.size());
    }

    @Override // io.bitcoinsv.jcl.net.protocol.handlers.message.MessageHandler
    public void broadcast(Message message) {
        BitcoinMsg build = new BitcoinMsgBuilder(this.config.getBasicConfig(), message).build();
        this.handlerInfo.values().forEach(messagePeerInfo -> {
            messagePeerInfo.getStream().output().send(new StreamDataEvent<>(build));
        });
        updateState(0L, this.handlerInfo.size());
    }

    public synchronized void updateState(long j, long j2) {
        this.state = this.state.toBuilder().numMsgsIn(this.state.getNumMsgsIn().add(BigInteger.valueOf(j))).numMsgsOut(this.state.getNumMsgsOut().add(BigInteger.valueOf(j2))).deserializerState(this.deserializer.getState()).build();
    }

    private String findErrorInMsg(BitcoinMsg<?> bitcoinMsg) {
        if (bitcoinMsg == null) {
            return "Msg is Empty";
        }
        if (bitcoinMsg.getHeader().getMagic() != this.config.getBasicConfig().getMagicPackage()) {
            return "Network Id is incorrect";
        }
        return null;
    }

    @Override // io.bitcoinsv.jcl.tools.handlers.Handler
    public MessageHandlerConfig getConfig() {
        return this.config;
    }

    @Override // io.bitcoinsv.jcl.tools.handlers.Handler
    public MessageHandlerState getState() {
        return this.state;
    }
}
