/*
 * Decompiled with CFR 0.152.
 */
package org.fabric3.binding.zeromq.runtime.broker;

import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.fabric3.api.annotation.monitor.Monitor;
import org.fabric3.api.binding.zeromq.model.SocketAddressDefinition;
import org.fabric3.api.binding.zeromq.model.ZeroMQMetadata;
import org.fabric3.api.host.runtime.HostInfo;
import org.fabric3.api.model.type.contract.DataType;
import org.fabric3.binding.zeromq.runtime.BrokerException;
import org.fabric3.binding.zeromq.runtime.MessagingMonitor;
import org.fabric3.binding.zeromq.runtime.ZeroMQWireBroker;
import org.fabric3.binding.zeromq.runtime.broker.SpecifiedPort;
import org.fabric3.binding.zeromq.runtime.context.ContextManager;
import org.fabric3.binding.zeromq.runtime.interceptor.OneWayInterceptor;
import org.fabric3.binding.zeromq.runtime.interceptor.RequestReplyInterceptor;
import org.fabric3.binding.zeromq.runtime.interceptor.UnwrappingInterceptor;
import org.fabric3.binding.zeromq.runtime.interceptor.WrappingInterceptor;
import org.fabric3.binding.zeromq.runtime.management.ZeroMQManagementService;
import org.fabric3.binding.zeromq.runtime.message.AbstractReceiver;
import org.fabric3.binding.zeromq.runtime.message.DelegatingOneWaySender;
import org.fabric3.binding.zeromq.runtime.message.DynamicOneWaySender;
import org.fabric3.binding.zeromq.runtime.message.NonReliableOneWayReceiver;
import org.fabric3.binding.zeromq.runtime.message.NonReliableOneWaySender;
import org.fabric3.binding.zeromq.runtime.message.NonReliableRequestReplyReceiver;
import org.fabric3.binding.zeromq.runtime.message.NonReliableRequestReplySender;
import org.fabric3.binding.zeromq.runtime.message.OneWaySender;
import org.fabric3.binding.zeromq.runtime.message.Receiver;
import org.fabric3.binding.zeromq.runtime.message.RequestReplySender;
import org.fabric3.binding.zeromq.runtime.message.Sender;
import org.fabric3.spi.container.invocation.CallbackReference;
import org.fabric3.spi.container.invocation.WorkContext;
import org.fabric3.spi.container.wire.Interceptor;
import org.fabric3.spi.container.wire.InterceptorCreationException;
import org.fabric3.spi.container.wire.InvocationChain;
import org.fabric3.spi.container.wire.TransformerInterceptorFactory;
import org.fabric3.spi.federation.addressing.AddressAnnouncement;
import org.fabric3.spi.federation.addressing.AddressCache;
import org.fabric3.spi.federation.addressing.AddressEvent;
import org.fabric3.spi.federation.addressing.AddressListener;
import org.fabric3.spi.federation.addressing.SocketAddress;
import org.fabric3.spi.host.Port;
import org.fabric3.spi.host.PortAllocationException;
import org.fabric3.spi.host.PortAllocator;
import org.fabric3.spi.model.physical.ParameterTypeHelper;
import org.fabric3.spi.model.physical.PhysicalOperationDefinition;
import org.fabric3.spi.model.type.java.JavaType;
import org.fabric3.spi.runtime.event.EventService;
import org.fabric3.spi.runtime.event.Fabric3EventListener;
import org.fabric3.spi.runtime.event.RuntimeStop;
import org.oasisopen.sca.annotation.Init;
import org.oasisopen.sca.annotation.Property;
import org.oasisopen.sca.annotation.Reference;
import org.oasisopen.sca.annotation.Service;

@Service(value={ZeroMQWireBroker.class})
public class ZeroMQWireBrokerImpl
implements ZeroMQWireBroker,
DynamicOneWaySender,
Fabric3EventListener<RuntimeStop> {
    private static final JavaType BYTE_TYPE = new JavaType(byte[].class);
    private static final JavaType EMPTY_TYPE = new JavaType(Void.class);
    List<DataType> TRANSPORT_TYPES;
    private static final String ZMQ = "zmq";
    private ContextManager manager;
    private AddressCache addressCache;
    private PortAllocator allocator;
    private EventService eventService;
    private HostInfo info;
    private ZeroMQManagementService managementService;
    private ExecutorService executorService;
    private MessagingMonitor monitor;
    private long pollTimeout = 10000000L;
    private TransformerInterceptorFactory interceptorFactory;
    private String host;
    private String hostAddress;
    private Map<String, SenderHolder> senders = new HashMap<String, SenderHolder>();
    private Map<String, Receiver> receivers = new HashMap<String, Receiver>();

    public ZeroMQWireBrokerImpl(@Reference ContextManager manager, @Reference AddressCache addressCache, @Reference PortAllocator allocator, @Reference(name="executorService") ExecutorService executorService, @Reference ZeroMQManagementService managementService, @Reference EventService eventService, @Reference TransformerInterceptorFactory interceptorFactory, @Reference HostInfo info, @Monitor MessagingMonitor monitor) throws UnknownHostException {
        this.manager = manager;
        this.addressCache = addressCache;
        this.allocator = allocator;
        this.executorService = executorService;
        this.managementService = managementService;
        this.eventService = eventService;
        this.interceptorFactory = interceptorFactory;
        this.info = info;
        this.monitor = monitor;
        this.host = InetAddress.getLocalHost().getHostAddress();
        this.TRANSPORT_TYPES = new ArrayList<DataType>();
        this.TRANSPORT_TYPES.add((DataType)BYTE_TYPE);
        this.hostAddress = InetAddress.getLocalHost().getHostAddress();
    }

    @Property(required=false)
    public void setPollTimeout(long timeout) {
        this.pollTimeout = timeout * 1000L;
    }

    @Property(required=false)
    public void setHost(String host) {
        this.host = host;
    }

    @Init
    public void init() {
        this.eventService.subscribe(RuntimeStop.class, (Fabric3EventListener)this);
    }

    @Override
    public void connectToSender(String id, URI uri, List<InvocationChain> chains, ZeroMQMetadata metadata, ClassLoader loader) throws BrokerException {
        SenderHolder holder;
        if (ZMQ.equals(uri.getScheme())) {
            DelegatingOneWaySender sender = new DelegatingOneWaySender(id, this, metadata);
            holder = new SenderHolder(sender);
        } else {
            holder = this.senders.get(uri.toString());
        }
        if (holder == null) {
            boolean oneWay = this.isOneWay(chains, uri);
            holder = this.createSender(uri.toString(), oneWay, metadata);
            this.managementService.registerSender(id, holder.getSender());
        }
        int chainsSize = chains.size();
        for (int i = 0; i < chainsSize; ++i) {
            InvocationChain chain = chains.get(i);
            try {
                PhysicalOperationDefinition physicalOperation = chain.getPhysicalOperation();
                List<DataType> sourceTypes = this.createTypes(physicalOperation, loader);
                Interceptor interceptor = this.interceptorFactory.createInterceptor(physicalOperation, sourceTypes, this.TRANSPORT_TYPES, loader, loader);
                chain.addInterceptor(interceptor);
                chain.addInterceptor((Interceptor)new UnwrappingInterceptor());
            }
            catch (InterceptorCreationException e) {
                throw new BrokerException(e);
            }
            Interceptor interceptor = this.createInterceptor(holder, i);
            chain.addInterceptor(interceptor);
        }
        holder.getIds().add(id);
    }

    @Override
    public void releaseSender(String id, URI uri) throws BrokerException {
        SenderHolder holder = this.senders.get(uri.toString());
        if (holder == null) {
            if (!ZMQ.equals(uri.getScheme())) {
                throw new BrokerException("Sender not found for " + uri);
            }
            return;
        }
        holder.getIds().remove(id);
        if (holder.getIds().isEmpty()) {
            this.senders.remove(uri.toString());
            Sender sender = holder.getSender();
            sender.stop();
            this.managementService.unregisterSender(id);
        }
    }

    @Override
    public void connectToReceiver(URI uri, List<InvocationChain> chains, ZeroMQMetadata metadata, ClassLoader loader) throws BrokerException {
        if (this.receivers.containsKey(uri.toString())) {
            throw new BrokerException("Receiver already defined for " + uri);
        }
        try {
            SocketAddress address;
            String endpointId = uri.toString();
            String runtimeName = this.info.getRuntimeName();
            String zone = this.info.getZoneName();
            if (metadata.getSocketAddresses() != null && !metadata.getSocketAddresses().isEmpty()) {
                if (metadata.getSocketAddresses().size() != 1) {
                    throw new BrokerException("Only one socket address can be specified");
                }
                SocketAddressDefinition addressDefinition = (SocketAddressDefinition)metadata.getSocketAddresses().get(0);
                String specifiedHost = addressDefinition.getHost();
                if ("localhost".equals(specifiedHost)) {
                    specifiedHost = this.hostAddress;
                }
                int portNumber = addressDefinition.getPort();
                Port port = this.allocator.reserve(endpointId, ZMQ, portNumber);
                address = new SocketAddress(runtimeName, zone, "tcp", specifiedHost, port);
            } else {
                Port port = this.allocator.allocate(endpointId, ZMQ);
                address = new SocketAddress(runtimeName, zone, "tcp", this.host, port);
            }
            this.addTransformer(chains, loader);
            boolean oneWay = this.isOneWay(chains, uri);
            AbstractReceiver receiver = oneWay ? new NonReliableOneWayReceiver(this.manager, address, chains, this.executorService, metadata, this.monitor) : new NonReliableRequestReplyReceiver(this.manager, address, chains, this.executorService, this.pollTimeout, metadata, this.monitor);
            receiver.start();
            AddressAnnouncement event = new AddressAnnouncement(endpointId, AddressAnnouncement.Type.ACTIVATED, address);
            this.addressCache.publish((AddressEvent)event);
            this.receivers.put(uri.toString(), receiver);
            String id = this.createReceiverId(uri);
            this.managementService.registerReceiver(id, receiver);
            this.monitor.onProvisionEndpoint(id);
        }
        catch (PortAllocationException e) {
            throw new BrokerException("Error allocating port for " + uri, e);
        }
    }

    @Override
    public void releaseReceiver(URI uri) throws BrokerException {
        Receiver receiver = this.receivers.remove(uri.toString());
        if (receiver == null) {
            throw new BrokerException("Receiver not found for " + uri);
        }
        String endpointId = uri.toString();
        SocketAddress address = receiver.getAddress();
        AddressAnnouncement event = new AddressAnnouncement(endpointId, AddressAnnouncement.Type.REMOVED, address);
        this.addressCache.publish((AddressEvent)event);
        receiver.stop();
        this.allocator.release(endpointId);
        String id = this.createReceiverId(uri);
        this.managementService.unregisterReceiver(id);
        this.monitor.onRemoveEndpoint(id);
    }

    @Override
    public void send(byte[] message, int index, WorkContext context, ZeroMQMetadata metadata) {
        Sender sender;
        CallbackReference callbackReference = context.peekCallbackReference();
        if (callbackReference == null) {
            this.monitor.error("Callback reference not found");
            return;
        }
        String callback = callbackReference.getServiceUri();
        SenderHolder holder = this.senders.get(callback);
        if (holder == null) {
            holder = this.createSender(callback, true, metadata);
        }
        if ((sender = holder.getSender()) instanceof OneWaySender) {
            ((OneWaySender)sender).send(message, index, context);
        } else {
            this.monitor.error("Callback sender is not a one-way type: " + holder.getClass().getName());
        }
    }

    @Override
    public void startAll() {
        for (Receiver receiver : this.receivers.values()) {
            receiver.start();
        }
        for (SenderHolder holder : this.senders.values()) {
            holder.getSender().start();
        }
    }

    @Override
    public void stopAll() {
        for (Receiver receiver : this.receivers.values()) {
            receiver.stop();
        }
        for (SenderHolder holder : this.senders.values()) {
            holder.getSender().stop();
        }
    }

    @Override
    public void start() {
    }

    @Override
    public void stop() {
    }

    public String getId() {
        return "ZeroMQWireBroker";
    }

    public void onUpdate(List<SocketAddress> addresses) {
    }

    public void onEvent(RuntimeStop event) {
        this.stopAll();
    }

    private SenderHolder createSender(String endpointId, boolean oneWay, ZeroMQMetadata metadata) {
        boolean refresh;
        ArrayList<SocketAddress> addresses = new ArrayList();
        if (metadata.getSocketAddresses() != null) {
            refresh = false;
            for (SocketAddressDefinition addressDefinition : metadata.getSocketAddresses()) {
                SpecifiedPort port = new SpecifiedPort(addressDefinition.getPort());
                String specifiedHost = addressDefinition.getHost();
                if ("localhost".equals(specifiedHost)) {
                    specifiedHost = this.hostAddress;
                }
                SocketAddress socketAddress = new SocketAddress("synthetic", "synthetic", "tcp", specifiedHost, (Port)port);
                addresses.add(socketAddress);
            }
        } else {
            refresh = true;
            addresses = this.addressCache.getActiveAddresses(endpointId);
        }
        Sender sender = oneWay ? new NonReliableOneWaySender(endpointId, this.manager, addresses, this.pollTimeout, metadata, this.monitor) : new NonReliableRequestReplySender(endpointId, this.manager, addresses, this.pollTimeout, metadata, this.monitor);
        SenderHolder holder = new SenderHolder(sender);
        sender.start();
        if (refresh) {
            this.addressCache.subscribe(endpointId, (AddressListener)sender);
        }
        this.senders.put(endpointId, holder);
        return holder;
    }

    private boolean isOneWay(List<InvocationChain> chains, URI uri) {
        if (chains.size() < 1) {
            throw new AssertionError((Object)("Contract must have at least one operation: " + uri));
        }
        return chains.get(0).getPhysicalOperation().isOneWay();
    }

    private void addTransformer(List<InvocationChain> chains, ClassLoader loader) throws BrokerException {
        for (InvocationChain chain : chains) {
            try {
                PhysicalOperationDefinition physicalOperation = chain.getPhysicalOperation();
                List<DataType> targetTypes = this.createTypes(physicalOperation, loader);
                Interceptor interceptor = this.interceptorFactory.createInterceptor(physicalOperation, this.TRANSPORT_TYPES, targetTypes, loader, loader);
                chain.addInterceptor((Interceptor)new WrappingInterceptor());
                chain.addInterceptor(interceptor);
            }
            catch (InterceptorCreationException e) {
                throw new BrokerException(e);
            }
        }
    }

    private List<DataType> createTypes(PhysicalOperationDefinition physicalOperation, ClassLoader loader) throws BrokerException {
        try {
            ArrayList<DataType> dataTypes = new ArrayList<DataType>();
            if (physicalOperation.getSourceParameterTypes().isEmpty()) {
                dataTypes.add((DataType)EMPTY_TYPE);
            } else {
                List types = ParameterTypeHelper.loadSourceInParameterTypes((PhysicalOperationDefinition)physicalOperation, (ClassLoader)loader);
                for (Class type : types) {
                    dataTypes.add((DataType)new JavaType(type));
                }
            }
            return dataTypes;
        }
        catch (ClassNotFoundException e) {
            throw new BrokerException("Error transforming parameter", e);
        }
    }

    private Interceptor createInterceptor(SenderHolder holder, int i) {
        Sender sender = holder.getSender();
        if (sender instanceof NonReliableRequestReplySender) {
            return new RequestReplyInterceptor(i, (RequestReplySender)sender);
        }
        if (sender instanceof OneWaySender) {
            return new OneWayInterceptor(i, (OneWaySender)sender);
        }
        throw new AssertionError((Object)("Unknown sender type: " + sender.getClass().getName()));
    }

    private String createReceiverId(URI uri) {
        if (ZMQ.equals(uri.getScheme())) {
            return uri.getAuthority();
        }
        return uri.getPath().substring(1) + "/" + uri.getFragment();
    }

    private class SenderHolder {
        private Sender sender;
        private List<String> ids;

        private SenderHolder(Sender sender) {
            this.sender = sender;
            this.ids = new ArrayList<String>();
        }

        public Sender getSender() {
            return this.sender;
        }

        public List<String> getIds() {
            return this.ids;
        }
    }
}

