package org.fabric3.binding.zeromq.runtime.broker;

import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.fabric3.api.annotation.monitor.Monitor;
import org.fabric3.binding.zeromq.common.ZeroMQMetadata;
import org.fabric3.binding.zeromq.runtime.BrokerException;
import org.fabric3.binding.zeromq.runtime.MessagingMonitor;
import org.fabric3.binding.zeromq.runtime.SocketAddress;
import org.fabric3.binding.zeromq.runtime.ZeroMQPubSubBroker;
import org.fabric3.binding.zeromq.runtime.context.ContextManager;
import org.fabric3.binding.zeromq.runtime.federation.AddressAnnouncement;
import org.fabric3.binding.zeromq.runtime.federation.AddressCache;
import org.fabric3.binding.zeromq.runtime.handler.AsyncFanOutHandler;
import org.fabric3.binding.zeromq.runtime.handler.DeserializingEventStreamHandler;
import org.fabric3.binding.zeromq.runtime.handler.PublisherHandler;
import org.fabric3.binding.zeromq.runtime.handler.SerializingEventStreamHandler;
import org.fabric3.binding.zeromq.runtime.management.ZeroMQManagementService;
import org.fabric3.binding.zeromq.runtime.message.NonReliablePublisher;
import org.fabric3.binding.zeromq.runtime.message.NonReliableSubscriber;
import org.fabric3.binding.zeromq.runtime.message.Publisher;
import org.fabric3.binding.zeromq.runtime.message.Subscriber;
import org.fabric3.host.runtime.HostInfo;
import org.fabric3.spi.channel.ChannelConnection;
import org.fabric3.spi.channel.EventStream;
import org.fabric3.spi.event.EventService;
import org.fabric3.spi.event.Fabric3EventListener;
import org.fabric3.spi.event.RuntimeStop;
import org.fabric3.spi.host.PortAllocationException;
import org.fabric3.spi.host.PortAllocator;
import org.oasisopen.sca.annotation.Init;
import org.oasisopen.sca.annotation.Property;
import org.oasisopen.sca.annotation.Reference;
import org.oasisopen.sca.annotation.Service;

@Service(ZeroMQPubSubBroker.class)
/* loaded from: input_file:org/fabric3/binding/zeromq/runtime/broker/ZeroMQPubSubBrokerImpl.class */
public class ZeroMQPubSubBrokerImpl implements ZeroMQPubSubBroker, Fabric3EventListener<RuntimeStop> {
    private static final String ZMQ = "zmq";
    private ContextManager manager;
    private AddressCache addressCache;
    private ExecutorService executorService;
    private PortAllocator allocator;
    private ZeroMQManagementService managementService;
    private EventService eventService;
    private HostInfo info;
    private MessagingMonitor monitor;
    private long pollTimeout = 10000;
    private Map<String, Subscriber> subscribers = new HashMap();
    private Map<String, PublisherHolder> publishers = new HashMap();
    private String host = InetAddress.getLocalHost().getHostAddress();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/fabric3/binding/zeromq/runtime/broker/ZeroMQPubSubBrokerImpl$PublisherHolder.class */
    public class PublisherHolder {
        private List<String> connectionIds;
        private Publisher publisher;
        private SocketAddress address;

        private PublisherHolder(Publisher publisher, SocketAddress socketAddress) {
            this.connectionIds = new ArrayList();
            this.publisher = publisher;
            this.address = socketAddress;
        }

        public List<String> getConnectionIds() {
            return this.connectionIds;
        }

        public Publisher getPublisher() {
            return this.publisher;
        }

        public SocketAddress getAddress() {
            return this.address;
        }
    }

    public ZeroMQPubSubBrokerImpl(@Reference ContextManager contextManager, @Reference AddressCache addressCache, @Reference ExecutorService executorService, @Reference PortAllocator portAllocator, @Reference ZeroMQManagementService zeroMQManagementService, @Reference EventService eventService, @Reference HostInfo hostInfo, @Monitor MessagingMonitor messagingMonitor) throws UnknownHostException {
        this.manager = contextManager;
        this.addressCache = addressCache;
        this.executorService = executorService;
        this.allocator = portAllocator;
        this.managementService = zeroMQManagementService;
        this.eventService = eventService;
        this.info = hostInfo;
        this.monitor = messagingMonitor;
    }

    @Property(required = false)
    public void setPollTimeout(long j) {
        this.pollTimeout = j;
    }

    @Property(required = false)
    public void setHost(String str) {
        this.host = str;
    }

    @Init
    public void init() {
        this.eventService.subscribe(RuntimeStop.class, this);
    }

    @Override // org.fabric3.binding.zeromq.runtime.ZeroMQPubSubBroker
    public void subscribe(URI uri, ZeroMQMetadata zeroMQMetadata, ChannelConnection channelConnection, ClassLoader classLoader) {
        String channelName = zeroMQMetadata.getChannelName();
        Subscriber subscriber = this.subscribers.get(channelName);
        if (subscriber == null) {
            AsyncFanOutHandler asyncFanOutHandler = new AsyncFanOutHandler(this.executorService);
            DeserializingEventStreamHandler deserializingEventStreamHandler = new DeserializingEventStreamHandler(classLoader);
            deserializingEventStreamHandler.setNext(asyncFanOutHandler);
            NonReliableSubscriber nonReliableSubscriber = new NonReliableSubscriber(uri.toString(), this.manager, this.addressCache.getActiveAddresses(channelName), deserializingEventStreamHandler, zeroMQMetadata, this.pollTimeout, this.monitor);
            nonReliableSubscriber.addConnection(uri, channelConnection);
            nonReliableSubscriber.start();
            this.addressCache.subscribe(channelName, nonReliableSubscriber);
            this.subscribers.put(channelName, nonReliableSubscriber);
            this.managementService.register(channelName, uri, nonReliableSubscriber);
        } else {
            subscriber.addConnection(uri, channelConnection);
        }
        this.monitor.onSubscribe(uri.getPath().substring(1) + "/" + uri.getFragment());
    }

    @Override // org.fabric3.binding.zeromq.runtime.ZeroMQPubSubBroker
    public void unsubscribe(URI uri, ZeroMQMetadata zeroMQMetadata) {
        String channelName = zeroMQMetadata.getChannelName();
        Subscriber subscriber = this.subscribers.get(channelName);
        if (subscriber == null) {
            throw new IllegalStateException("Subscriber not found: " + uri);
        }
        subscriber.removeConnection(uri);
        if (!subscriber.hasConnections()) {
            this.subscribers.remove(channelName);
            subscriber.stop();
        }
        this.managementService.unregister(channelName, uri);
        this.monitor.onUnsubscribe(uri.getPath().substring(1) + "/" + uri.getFragment());
    }

    @Override // org.fabric3.binding.zeromq.runtime.ZeroMQPubSubBroker
    public void connect(String str, ChannelConnection channelConnection, ZeroMQMetadata zeroMQMetadata) throws BrokerException {
        String channelName = zeroMQMetadata.getChannelName();
        PublisherHolder publisherHolder = this.publishers.get(channelName);
        if (publisherHolder != null) {
            attachConnection(channelConnection, publisherHolder.getPublisher());
            publisherHolder.getConnectionIds().add(str);
            return;
        }
        try {
            SocketAddress socketAddress = new SocketAddress(this.info.getRuntimeName(), "tcp", this.host, this.allocator.allocate(channelName, ZMQ));
            NonReliablePublisher nonReliablePublisher = new NonReliablePublisher(this.manager, socketAddress, zeroMQMetadata, this.pollTimeout, this.monitor);
            attachConnection(channelConnection, nonReliablePublisher);
            this.addressCache.publish(new AddressAnnouncement(channelName, AddressAnnouncement.Type.ACTIVATED, socketAddress));
            nonReliablePublisher.start();
            PublisherHolder publisherHolder2 = new PublisherHolder(nonReliablePublisher, socketAddress);
            publisherHolder2.getConnectionIds().add(str);
            this.publishers.put(channelName, publisherHolder2);
            this.managementService.register(channelName, nonReliablePublisher);
        } catch (PortAllocationException e) {
            throw new BrokerException("Error creating connection to " + channelName, e);
        }
    }

    @Override // org.fabric3.binding.zeromq.runtime.ZeroMQPubSubBroker
    public void release(String str, ZeroMQMetadata zeroMQMetadata) throws BrokerException {
        String channelName = zeroMQMetadata.getChannelName();
        PublisherHolder publisherHolder = this.publishers.get(channelName);
        if (publisherHolder == null) {
            throw new BrokerException("Publisher not found for " + channelName);
        }
        Publisher publisher = publisherHolder.getPublisher();
        publisherHolder.getConnectionIds().remove(str);
        if (publisherHolder.getConnectionIds().isEmpty()) {
            this.publishers.remove(channelName);
            publisher.stop();
            this.addressCache.publish(new AddressAnnouncement(channelName, AddressAnnouncement.Type.REMOVED, publisherHolder.getAddress()));
            this.managementService.unregister(channelName);
        }
        this.allocator.release(channelName);
    }

    @Override // org.fabric3.binding.zeromq.runtime.ZeroMQPubSubBroker
    public void startAll() {
        Iterator<Subscriber> it = this.subscribers.values().iterator();
        while (it.hasNext()) {
            it.next().start();
        }
        Iterator<PublisherHolder> it2 = this.publishers.values().iterator();
        while (it2.hasNext()) {
            it2.next().getPublisher().start();
        }
    }

    @Override // org.fabric3.binding.zeromq.runtime.ZeroMQPubSubBroker
    public void stopAll() {
        Iterator<Subscriber> it = this.subscribers.values().iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        Iterator<PublisherHolder> it2 = this.publishers.values().iterator();
        while (it2.hasNext()) {
            it2.next().getPublisher().stop();
        }
    }

    public void onEvent(RuntimeStop runtimeStop) {
        stopAll();
    }

    private void attachConnection(ChannelConnection channelConnection, Publisher publisher) {
        for (EventStream eventStream : channelConnection.getEventStreams()) {
            eventStream.addHandler(new SerializingEventStreamHandler());
            eventStream.addHandler(new PublisherHandler(publisher));
        }
    }
}
