package org.fabric3.binding.zeromq.runtime.broker;

import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.fabric3.api.annotation.monitor.Monitor;
import org.fabric3.binding.zeromq.common.SocketAddressDefinition;
import org.fabric3.binding.zeromq.common.ZeroMQMetadata;
import org.fabric3.binding.zeromq.runtime.BrokerException;
import org.fabric3.binding.zeromq.runtime.MessagingMonitor;
import org.fabric3.binding.zeromq.runtime.SocketAddress;
import org.fabric3.binding.zeromq.runtime.ZeroMQWireBroker;
import org.fabric3.binding.zeromq.runtime.context.ContextManager;
import org.fabric3.binding.zeromq.runtime.federation.AddressAnnouncement;
import org.fabric3.binding.zeromq.runtime.federation.AddressCache;
import org.fabric3.binding.zeromq.runtime.interceptor.OneWayInterceptor;
import org.fabric3.binding.zeromq.runtime.interceptor.RequestReplyInterceptor;
import org.fabric3.binding.zeromq.runtime.interceptor.UnwrappingInterceptor;
import org.fabric3.binding.zeromq.runtime.interceptor.WrappingInterceptor;
import org.fabric3.binding.zeromq.runtime.management.ZeroMQManagementService;
import org.fabric3.binding.zeromq.runtime.message.DelegatingOneWaySender;
import org.fabric3.binding.zeromq.runtime.message.DynamicOneWaySender;
import org.fabric3.binding.zeromq.runtime.message.NonReliableOneWayReceiver;
import org.fabric3.binding.zeromq.runtime.message.NonReliableOneWaySender;
import org.fabric3.binding.zeromq.runtime.message.NonReliableRequestReplyReceiver;
import org.fabric3.binding.zeromq.runtime.message.NonReliableRequestReplySender;
import org.fabric3.binding.zeromq.runtime.message.OneWaySender;
import org.fabric3.binding.zeromq.runtime.message.Receiver;
import org.fabric3.binding.zeromq.runtime.message.RequestReplySender;
import org.fabric3.binding.zeromq.runtime.message.Sender;
import org.fabric3.host.runtime.HostInfo;
import org.fabric3.model.type.contract.DataType;
import org.fabric3.spi.event.EventService;
import org.fabric3.spi.event.Fabric3EventListener;
import org.fabric3.spi.event.RuntimeStop;
import org.fabric3.spi.host.PortAllocationException;
import org.fabric3.spi.host.PortAllocator;
import org.fabric3.spi.invocation.CallbackReference;
import org.fabric3.spi.invocation.WorkContext;
import org.fabric3.spi.model.physical.ParameterTypeHelper;
import org.fabric3.spi.model.physical.PhysicalOperationDefinition;
import org.fabric3.spi.model.type.java.JavaClass;
import org.fabric3.spi.wire.Interceptor;
import org.fabric3.spi.wire.InterceptorCreationException;
import org.fabric3.spi.wire.InvocationChain;
import org.fabric3.spi.wire.TransformerInterceptorFactory;
import org.oasisopen.sca.annotation.Init;
import org.oasisopen.sca.annotation.Property;
import org.oasisopen.sca.annotation.Reference;
import org.oasisopen.sca.annotation.Service;

@Service(ZeroMQWireBroker.class)
/* loaded from: input_file:org/fabric3/binding/zeromq/runtime/broker/ZeroMQWireBrokerImpl.class */
public class ZeroMQWireBrokerImpl implements ZeroMQWireBroker, DynamicOneWaySender, Fabric3EventListener<RuntimeStop> {
    private static final DataType BYTE_TYPE = new JavaClass(byte[].class);
    private static final DataType EMPTY_TYPE = new JavaClass(Void.class);
    private static final String ZMQ = "zmq";
    private ContextManager manager;
    private AddressCache addressCache;
    private PortAllocator allocator;
    private EventService eventService;
    private HostInfo info;
    private ZeroMQManagementService managementService;
    private ExecutorService executorService;
    private MessagingMonitor monitor;
    private TransformerInterceptorFactory interceptorFactory;
    private String hostAddress;
    private long pollTimeout = 10000000;
    private Map<String, SenderHolder> senders = new HashMap();
    private Map<String, Receiver> receivers = new HashMap();
    private String host = InetAddress.getLocalHost().getHostAddress();
    List<DataType<?>> TRANSPORT_TYPES = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/fabric3/binding/zeromq/runtime/broker/ZeroMQWireBrokerImpl$SenderHolder.class */
    public class SenderHolder {
        private Sender sender;
        private List<String> ids;

        private SenderHolder(Sender sender) {
            this.sender = sender;
            this.ids = new ArrayList();
        }

        public Sender getSender() {
            return this.sender;
        }

        public List<String> getIds() {
            return this.ids;
        }
    }

    public ZeroMQWireBrokerImpl(@Reference ContextManager contextManager, @Reference AddressCache addressCache, @Reference PortAllocator portAllocator, @Reference ExecutorService executorService, @Reference ZeroMQManagementService zeroMQManagementService, @Reference EventService eventService, @Reference TransformerInterceptorFactory transformerInterceptorFactory, @Reference HostInfo hostInfo, @Monitor MessagingMonitor messagingMonitor) throws UnknownHostException {
        this.manager = contextManager;
        this.addressCache = addressCache;
        this.allocator = portAllocator;
        this.executorService = executorService;
        this.managementService = zeroMQManagementService;
        this.eventService = eventService;
        this.interceptorFactory = transformerInterceptorFactory;
        this.info = hostInfo;
        this.monitor = messagingMonitor;
        this.TRANSPORT_TYPES.add(BYTE_TYPE);
        this.hostAddress = InetAddress.getLocalHost().getHostAddress();
    }

    @Property(required = false)
    public void setPollTimeout(long j) {
        this.pollTimeout = j * 1000;
    }

    @Property(required = false)
    public void setHost(String str) {
        this.host = str;
    }

    @Init
    public void init() {
        this.eventService.subscribe(RuntimeStop.class, this);
    }

    @Override // org.fabric3.binding.zeromq.runtime.ZeroMQWireBroker
    public void connectToSender(String str, URI uri, List<InvocationChain> list, ZeroMQMetadata zeroMQMetadata, ClassLoader classLoader) throws BrokerException {
        SenderHolder senderHolder = ZMQ.equals(uri.getScheme()) ? new SenderHolder(new DelegatingOneWaySender(str, this, zeroMQMetadata)) : this.senders.get(uri.toString());
        if (senderHolder == null) {
            senderHolder = createSender(uri.toString(), isOneWay(list, uri), zeroMQMetadata);
            this.managementService.registerSender(str, senderHolder.getSender());
        }
        int size = list.size();
        for (int i = 0; i < size; i++) {
            InvocationChain invocationChain = list.get(i);
            try {
                PhysicalOperationDefinition physicalOperation = invocationChain.getPhysicalOperation();
                invocationChain.addInterceptor(this.interceptorFactory.createInterceptor(physicalOperation, createTypes(physicalOperation, classLoader), this.TRANSPORT_TYPES, classLoader, classLoader));
                invocationChain.addInterceptor(new UnwrappingInterceptor());
                invocationChain.addInterceptor(createInterceptor(senderHolder, i));
            } catch (InterceptorCreationException e) {
                throw new BrokerException((Throwable) e);
            }
        }
        senderHolder.getIds().add(str);
    }

    @Override // org.fabric3.binding.zeromq.runtime.ZeroMQWireBroker
    public void releaseSender(String str, URI uri) throws BrokerException {
        SenderHolder senderHolder = this.senders.get(uri.toString());
        if (senderHolder == null) {
            if (!ZMQ.equals(uri.getScheme())) {
                throw new BrokerException("Sender not found for " + uri);
            }
            return;
        }
        senderHolder.getIds().remove(str);
        if (senderHolder.getIds().isEmpty()) {
            this.senders.remove(uri.toString());
            senderHolder.getSender().stop();
            this.managementService.unregisterSender(str);
        }
    }

    @Override // org.fabric3.binding.zeromq.runtime.ZeroMQWireBroker
    public void connectToReceiver(URI uri, List<InvocationChain> list, ZeroMQMetadata zeroMQMetadata, ClassLoader classLoader) throws BrokerException {
        SocketAddress socketAddress;
        if (this.receivers.containsKey(uri.toString())) {
            throw new BrokerException("Receiver already defined for " + uri);
        }
        try {
            String uri2 = uri.toString();
            String runtimeName = this.info.getRuntimeName();
            if (zeroMQMetadata.getSocketAddresses() == null || zeroMQMetadata.getSocketAddresses().isEmpty()) {
                socketAddress = new SocketAddress(runtimeName, "tcp", this.host, this.allocator.allocate(uri2, ZMQ));
            } else {
                if (zeroMQMetadata.getSocketAddresses().size() != 1) {
                    throw new BrokerException("Only one socket address can be specified");
                }
                SocketAddressDefinition socketAddressDefinition = zeroMQMetadata.getSocketAddresses().get(0);
                String host = socketAddressDefinition.getHost();
                if ("localhost".equals(host)) {
                    host = this.hostAddress;
                }
                socketAddress = new SocketAddress(runtimeName, "tcp", host, this.allocator.reserve(uri2, ZMQ, socketAddressDefinition.getPort()));
            }
            addTransformer(list, classLoader);
            Receiver nonReliableOneWayReceiver = isOneWay(list, uri) ? new NonReliableOneWayReceiver(this.manager, socketAddress, list, this.executorService, zeroMQMetadata, this.monitor) : new NonReliableRequestReplyReceiver(this.manager, socketAddress, list, this.executorService, this.pollTimeout, zeroMQMetadata, this.monitor);
            nonReliableOneWayReceiver.start();
            this.addressCache.publish(new AddressAnnouncement(uri2, AddressAnnouncement.Type.ACTIVATED, socketAddress));
            this.receivers.put(uri.toString(), nonReliableOneWayReceiver);
            String createReceiverId = createReceiverId(uri);
            this.managementService.registerReceiver(createReceiverId, nonReliableOneWayReceiver);
            this.monitor.onProvisionEndpoint(createReceiverId);
        } catch (PortAllocationException e) {
            throw new BrokerException("Error allocating port for " + uri, e);
        }
    }

    @Override // org.fabric3.binding.zeromq.runtime.ZeroMQWireBroker
    public void releaseReceiver(URI uri) throws BrokerException {
        Receiver remove = this.receivers.remove(uri.toString());
        if (remove == null) {
            throw new BrokerException("Receiver not found for " + uri);
        }
        String uri2 = uri.toString();
        this.addressCache.publish(new AddressAnnouncement(uri2, AddressAnnouncement.Type.REMOVED, remove.getAddress()));
        remove.stop();
        this.allocator.release(uri2);
        String createReceiverId = createReceiverId(uri);
        this.managementService.unregisterReceiver(createReceiverId);
        this.monitor.onRemoveEndpoint(createReceiverId);
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.DynamicOneWaySender
    public void send(byte[] bArr, int i, WorkContext workContext, ZeroMQMetadata zeroMQMetadata) {
        CallbackReference peekCallbackReference = workContext.peekCallbackReference();
        if (peekCallbackReference == null) {
            this.monitor.error("Callback reference not found");
            return;
        }
        String serviceUri = peekCallbackReference.getServiceUri();
        SenderHolder senderHolder = this.senders.get(serviceUri);
        if (senderHolder == null) {
            senderHolder = createSender(serviceUri, true, zeroMQMetadata);
        }
        Sender sender = senderHolder.getSender();
        if (sender instanceof OneWaySender) {
            ((OneWaySender) sender).send(bArr, i, workContext);
        } else {
            this.monitor.error("Callback sender is not a one-way type: " + senderHolder.getClass().getName());
        }
    }

    @Override // org.fabric3.binding.zeromq.runtime.ZeroMQWireBroker
    public void startAll() {
        Iterator<Receiver> it = this.receivers.values().iterator();
        while (it.hasNext()) {
            it.next().start();
        }
        Iterator<SenderHolder> it2 = this.senders.values().iterator();
        while (it2.hasNext()) {
            it2.next().getSender().start();
        }
    }

    @Override // org.fabric3.binding.zeromq.runtime.ZeroMQWireBroker
    public void stopAll() {
        Iterator<Receiver> it = this.receivers.values().iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        Iterator<SenderHolder> it2 = this.senders.values().iterator();
        while (it2.hasNext()) {
            it2.next().getSender().stop();
        }
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Sender
    public void start() {
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Sender
    public void stop() {
    }

    @Override // org.fabric3.binding.zeromq.runtime.federation.AddressListener
    public String getId() {
        return "ZeroMQWireBroker";
    }

    @Override // org.fabric3.binding.zeromq.runtime.federation.AddressListener
    public void onUpdate(List<SocketAddress> list) {
    }

    public void onEvent(RuntimeStop runtimeStop) {
        stopAll();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private SenderHolder createSender(String str, boolean z, ZeroMQMetadata zeroMQMetadata) {
        boolean z2;
        List arrayList = new ArrayList();
        if (zeroMQMetadata.getSocketAddresses() != null) {
            z2 = false;
            for (SocketAddressDefinition socketAddressDefinition : zeroMQMetadata.getSocketAddresses()) {
                SpecifiedPort specifiedPort = new SpecifiedPort(socketAddressDefinition.getPort());
                String host = socketAddressDefinition.getHost();
                if ("localhost".equals(host)) {
                    host = this.hostAddress;
                }
                arrayList.add(new SocketAddress("synthetic", "tcp", host, specifiedPort));
            }
        } else {
            z2 = true;
            arrayList = this.addressCache.getActiveAddresses(str);
        }
        Sender nonReliableOneWaySender = z ? new NonReliableOneWaySender(str, this.manager, arrayList, this.pollTimeout, zeroMQMetadata, this.monitor) : new NonReliableRequestReplySender(str, this.manager, arrayList, this.pollTimeout, zeroMQMetadata, this.monitor);
        SenderHolder senderHolder = new SenderHolder(nonReliableOneWaySender);
        nonReliableOneWaySender.start();
        if (z2) {
            this.addressCache.subscribe(str, nonReliableOneWaySender);
        }
        this.senders.put(str, senderHolder);
        return senderHolder;
    }

    private boolean isOneWay(List<InvocationChain> list, URI uri) {
        if (list.size() < 1) {
            throw new AssertionError("Contract must have at least one operation: " + uri);
        }
        return list.get(0).getPhysicalOperation().isOneWay();
    }

    private void addTransformer(List<InvocationChain> list, ClassLoader classLoader) throws BrokerException {
        for (InvocationChain invocationChain : list) {
            try {
                PhysicalOperationDefinition physicalOperation = invocationChain.getPhysicalOperation();
                Interceptor createInterceptor = this.interceptorFactory.createInterceptor(physicalOperation, this.TRANSPORT_TYPES, createTypes(physicalOperation, classLoader), classLoader, classLoader);
                invocationChain.addInterceptor(new WrappingInterceptor());
                invocationChain.addInterceptor(createInterceptor);
            } catch (InterceptorCreationException e) {
                throw new BrokerException((Throwable) e);
            }
        }
    }

    private List<DataType<?>> createTypes(PhysicalOperationDefinition physicalOperationDefinition, ClassLoader classLoader) throws BrokerException {
        try {
            ArrayList arrayList = new ArrayList();
            if (physicalOperationDefinition.getSourceParameterTypes().isEmpty()) {
                arrayList.add(EMPTY_TYPE);
            } else {
                Iterator it = ParameterTypeHelper.loadSourceInParameterTypes(physicalOperationDefinition, classLoader).iterator();
                while (it.hasNext()) {
                    arrayList.add(new JavaClass((Class) it.next()));
                }
            }
            return arrayList;
        } catch (ClassNotFoundException e) {
            throw new BrokerException("Error transforming parameter", e);
        }
    }

    private Interceptor createInterceptor(SenderHolder senderHolder, int i) {
        Sender sender = senderHolder.getSender();
        if (sender instanceof NonReliableRequestReplySender) {
            return new RequestReplyInterceptor(i, (RequestReplySender) sender);
        }
        if (sender instanceof OneWaySender) {
            return new OneWayInterceptor(i, (OneWaySender) sender);
        }
        throw new AssertionError("Unknown sender type: " + sender.getClass().getName());
    }

    private String createReceiverId(URI uri) {
        return ZMQ.equals(uri.getScheme()) ? uri.getAuthority() : uri.getPath().substring(1) + "/" + uri.getFragment();
    }
}
