/*
 * Decompiled with CFR 0.152.
 */
package org.fabric3.binding.zeromq.runtime.broker;

import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.fabric3.api.annotation.monitor.Monitor;
import org.fabric3.api.binding.zeromq.model.SocketAddressDefinition;
import org.fabric3.api.binding.zeromq.model.ZeroMQMetadata;
import org.fabric3.api.host.runtime.HostInfo;
import org.fabric3.api.model.type.contract.DataType;
import org.fabric3.binding.zeromq.runtime.BrokerException;
import org.fabric3.binding.zeromq.runtime.MessagingMonitor;
import org.fabric3.binding.zeromq.runtime.ZeroMQPubSubBroker;
import org.fabric3.binding.zeromq.runtime.broker.SpecifiedPort;
import org.fabric3.binding.zeromq.runtime.context.ContextManager;
import org.fabric3.binding.zeromq.runtime.handler.PublisherHandler;
import org.fabric3.binding.zeromq.runtime.management.ZeroMQManagementService;
import org.fabric3.binding.zeromq.runtime.message.NonReliableQueuedPublisher;
import org.fabric3.binding.zeromq.runtime.message.NonReliableSingleThreadPublisher;
import org.fabric3.binding.zeromq.runtime.message.NonReliableSubscriber;
import org.fabric3.binding.zeromq.runtime.message.Publisher;
import org.fabric3.binding.zeromq.runtime.message.Subscriber;
import org.fabric3.spi.container.channel.ChannelConnection;
import org.fabric3.spi.container.channel.EventStream;
import org.fabric3.spi.container.channel.EventStreamHandler;
import org.fabric3.spi.container.channel.HandlerCreationException;
import org.fabric3.spi.container.channel.TransformerHandlerFactory;
import org.fabric3.spi.federation.addressing.AddressAnnouncement;
import org.fabric3.spi.federation.addressing.AddressCache;
import org.fabric3.spi.federation.addressing.AddressEvent;
import org.fabric3.spi.federation.addressing.AddressListener;
import org.fabric3.spi.federation.addressing.SocketAddress;
import org.fabric3.spi.host.Port;
import org.fabric3.spi.host.PortAllocationException;
import org.fabric3.spi.host.PortAllocator;
import org.fabric3.spi.model.physical.ParameterTypeHelper;
import org.fabric3.spi.model.physical.PhysicalEventStreamDefinition;
import org.fabric3.spi.model.type.java.JavaType;
import org.fabric3.spi.runtime.event.EventService;
import org.fabric3.spi.runtime.event.Fabric3EventListener;
import org.fabric3.spi.runtime.event.RuntimeStop;
import org.oasisopen.sca.annotation.Init;
import org.oasisopen.sca.annotation.Property;
import org.oasisopen.sca.annotation.Reference;
import org.oasisopen.sca.annotation.Service;

@Service(value={ZeroMQPubSubBroker.class})
public class ZeroMQPubSubBrokerImpl
implements ZeroMQPubSubBroker,
Fabric3EventListener<RuntimeStop> {
    private static final JavaType BYTES = new JavaType(byte[].class);
    private static final JavaType TWO_DIMENSIONAL_BYTES = new JavaType(byte[][].class);
    private static final String ZMQ = "zmq";
    private ContextManager manager;
    private AddressCache addressCache;
    private PortAllocator allocator;
    private TransformerHandlerFactory handlerFactory;
    private ZeroMQManagementService managementService;
    private EventService eventService;
    private ExecutorService executorService;
    private HostInfo info;
    private MessagingMonitor monitor;
    private String hostAddress;
    private long pollTimeout = 10000L;
    private Map<String, Subscriber> subscribers = new HashMap<String, Subscriber>();
    private Map<String, PublisherHolder> publishers = new HashMap<String, PublisherHolder>();

    public ZeroMQPubSubBrokerImpl(@Reference ContextManager manager, @Reference AddressCache addressCache, @Reference PortAllocator allocator, @Reference TransformerHandlerFactory handlerFactory, @Reference ZeroMQManagementService managementService, @Reference EventService eventService, @Reference(name="executorService") ExecutorService executorService, @Reference HostInfo info, @Monitor MessagingMonitor monitor) throws UnknownHostException {
        this.manager = manager;
        this.addressCache = addressCache;
        this.allocator = allocator;
        this.handlerFactory = handlerFactory;
        this.managementService = managementService;
        this.eventService = eventService;
        this.executorService = executorService;
        this.info = info;
        this.monitor = monitor;
        this.hostAddress = InetAddress.getLocalHost().getHostAddress();
    }

    @Property(required=false)
    public void setPollTimeout(long timeout) {
        this.pollTimeout = timeout;
    }

    @Property(required=false)
    public void setHost(String host) {
        this.hostAddress = host;
    }

    @Init
    public void init() {
        this.eventService.subscribe(RuntimeStop.class, (Fabric3EventListener)this);
    }

    @Override
    public void subscribe(URI subscriberId, ZeroMQMetadata metadata, ChannelConnection connection, ClassLoader loader) throws BrokerException {
        String id;
        String channelName = metadata.getChannelName();
        Subscriber subscriber = this.subscribers.get(channelName);
        if (subscriber == null) {
            ArrayList<SocketAddress> addresses;
            boolean refresh;
            id = subscriberId.toString();
            EventStreamHandler head = this.createSubscriberHandlers(connection, loader);
            head.setNext(connection.getEventStream().getHeadHandler());
            if (metadata.getSocketAddresses() != null) {
                refresh = false;
                addresses = new ArrayList();
                for (SocketAddressDefinition addressDefinition : metadata.getSocketAddresses()) {
                    SpecifiedPort port = new SpecifiedPort(addressDefinition.getPort());
                    String specifiedHost = addressDefinition.getHost();
                    if ("localhost".equals(specifiedHost)) {
                        specifiedHost = this.hostAddress;
                    }
                    SocketAddress socketAddress = new SocketAddress("synthetic", "synthetic", "tcp", specifiedHost, (Port)port);
                    addresses.add(socketAddress);
                }
            } else {
                refresh = true;
                addresses = this.addressCache.getActiveAddresses(channelName);
            }
            subscriber = new NonReliableSubscriber(id, this.manager, addresses, head, metadata, this.executorService);
            subscriber.incrementConnectionCount();
            subscriber.start();
            if (refresh) {
                this.addressCache.subscribe(channelName, (AddressListener)subscriber);
            }
            this.subscribers.put(channelName, subscriber);
            this.managementService.register(channelName, subscriberId, subscriber);
        } else {
            subscriber.incrementConnectionCount();
        }
        id = subscriberId.getPath().substring(1) + "/" + subscriberId.getFragment();
        this.monitor.onSubscribe(id);
    }

    @Override
    public void unsubscribe(URI subscriberId, ZeroMQMetadata metadata) {
        String channelName = metadata.getChannelName();
        Subscriber subscriber = this.subscribers.get(channelName);
        if (subscriber == null) {
            throw new IllegalStateException("Subscriber not found: " + subscriberId);
        }
        subscriber.decrementConnectionCount();
        if (!subscriber.hasConnections()) {
            this.subscribers.remove(channelName);
            subscriber.stop();
        }
        this.managementService.unregister(channelName, subscriberId);
        String id = subscriberId.getPath().substring(1) + "/" + subscriberId.getFragment();
        this.monitor.onUnsubscribe(id);
    }

    @Override
    public void connect(String connectionId, ZeroMQMetadata metadata, boolean dedicatedThread, ChannelConnection connection, ClassLoader loader) throws BrokerException {
        String channelName = metadata.getChannelName();
        PublisherHolder holder = this.publishers.get(channelName);
        if (holder == null) {
            try {
                SocketAddress address;
                String runtimeName = this.info.getRuntimeName();
                String zone = this.info.getZoneName();
                List addresses = metadata.getSocketAddresses();
                if (addresses != null && !addresses.isEmpty()) {
                    if (addresses.size() != 1) {
                        throw new BrokerException("Invalid number of socket addresses: " + addresses.size());
                    }
                    SocketAddressDefinition addressDefinition = (SocketAddressDefinition)addresses.get(0);
                    int portDefinition = addressDefinition.getPort();
                    Port port = this.allocator.reserve(channelName, ZMQ, portDefinition);
                    String specifiedHost = addressDefinition.getHost();
                    if ("localhost".equals(specifiedHost)) {
                        specifiedHost = this.hostAddress;
                    }
                    address = new SocketAddress(runtimeName, zone, "tcp", specifiedHost, port);
                } else {
                    Port port = this.allocator.allocate(channelName, ZMQ);
                    address = new SocketAddress(runtimeName, zone, "tcp", this.hostAddress, port);
                }
                Publisher publisher = dedicatedThread ? new NonReliableSingleThreadPublisher(this.manager, address, metadata) : new NonReliableQueuedPublisher(this.manager, address, metadata, this.pollTimeout, this.monitor);
                this.attachConnection(connection, publisher, loader);
                AddressAnnouncement event = new AddressAnnouncement(channelName, AddressAnnouncement.Type.ACTIVATED, address);
                this.addressCache.publish((AddressEvent)event);
                publisher.start();
                holder = new PublisherHolder(publisher, address);
                holder.getConnectionIds().add(connectionId);
                this.publishers.put(channelName, holder);
                this.managementService.register(channelName, publisher);
            }
            catch (PortAllocationException e) {
                throw new BrokerException("Error creating connection to " + channelName, e);
            }
        } else {
            Publisher publisher = holder.getPublisher();
            this.attachConnection(connection, publisher, loader);
            holder.getConnectionIds().add(connectionId);
        }
    }

    @Override
    public void release(String connectionId, ZeroMQMetadata metadata) throws BrokerException {
        String channelName = metadata.getChannelName();
        PublisherHolder holder = this.publishers.get(channelName);
        if (holder == null) {
            throw new BrokerException("Publisher not found for " + channelName);
        }
        Publisher publisher = holder.getPublisher();
        holder.getConnectionIds().remove(connectionId);
        if (holder.getConnectionIds().isEmpty()) {
            this.publishers.remove(channelName);
            publisher.stop();
            SocketAddress address = holder.getAddress();
            AddressAnnouncement event = new AddressAnnouncement(channelName, AddressAnnouncement.Type.REMOVED, address);
            this.addressCache.publish((AddressEvent)event);
            this.managementService.unregister(channelName);
        }
        this.allocator.release(channelName);
    }

    @Override
    public void startAll() {
        for (Subscriber subscriber : this.subscribers.values()) {
            subscriber.start();
        }
        for (PublisherHolder holder : this.publishers.values()) {
            holder.getPublisher().start();
        }
    }

    @Override
    public void stopAll() {
        for (Subscriber subscriber : this.subscribers.values()) {
            subscriber.stop();
        }
        for (PublisherHolder holder : this.publishers.values()) {
            holder.getPublisher().stop();
        }
    }

    public void onEvent(RuntimeStop event) {
        this.stopAll();
    }

    private void attachConnection(ChannelConnection connection, Publisher publisher, ClassLoader loader) throws BrokerException {
        EventStream stream = connection.getEventStream();
        try {
            DataType dataType = this.getEventType(stream, loader);
            EventStreamHandler transformer = dataType.getType().equals(byte[][].class) ? this.handlerFactory.createHandler(dataType, (DataType)TWO_DIMENSIONAL_BYTES, Collections.emptyList(), loader) : this.handlerFactory.createHandler(dataType, (DataType)BYTES, Collections.emptyList(), loader);
            stream.addHandler(transformer);
        }
        catch (ClassNotFoundException e) {
            throw new BrokerException("Error loading event type", e);
        }
        catch (HandlerCreationException e) {
            throw new BrokerException(e);
        }
        stream.addHandler((EventStreamHandler)new PublisherHandler(publisher));
    }

    private EventStreamHandler createSubscriberHandlers(ChannelConnection connection, ClassLoader loader) throws BrokerException {
        try {
            DataType dataType = this.getEventTypeForConnection(connection, loader);
            EventStreamHandler head = dataType.getType().equals(byte[][].class) ? this.handlerFactory.createHandler((DataType)TWO_DIMENSIONAL_BYTES, dataType, Collections.emptyList(), loader) : this.handlerFactory.createHandler((DataType)BYTES, dataType, Collections.emptyList(), loader);
            return head;
        }
        catch (HandlerCreationException e) {
            throw new BrokerException(e);
        }
    }

    private DataType getEventType(EventStream stream, ClassLoader loader) throws ClassNotFoundException {
        List eventTypes = stream.getDefinition().getEventTypes();
        Class type = eventTypes.isEmpty() ? Object.class : ParameterTypeHelper.loadClass((String)((String)eventTypes.get(0)), (ClassLoader)loader);
        return new JavaType(type);
    }

    private DataType getEventTypeForConnection(ChannelConnection connection, ClassLoader loader) throws BrokerException {
        PhysicalEventStreamDefinition eventStreamDefinition = connection.getEventStream().getDefinition();
        if (!eventStreamDefinition.getEventTypes().isEmpty()) {
            try {
                String eventType = (String)eventStreamDefinition.getEventTypes().get(0);
                Class type = ParameterTypeHelper.loadClass((String)eventType, (ClassLoader)loader);
                return new JavaType(type);
            }
            catch (ClassNotFoundException e) {
                throw new BrokerException(e);
            }
        }
        return new JavaType(Object.class);
    }

    private class PublisherHolder {
        private List<String> connectionIds = new ArrayList<String>();
        private Publisher publisher;
        private SocketAddress address;

        private PublisherHolder(Publisher publisher, SocketAddress address) {
            this.publisher = publisher;
            this.address = address;
        }

        public List<String> getConnectionIds() {
            return this.connectionIds;
        }

        public Publisher getPublisher() {
            return this.publisher;
        }

        public SocketAddress getAddress() {
            return this.address;
        }
    }
}

