package org.fabric3.binding.zeromq.runtime.message;

import java.lang.Thread;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.fabric3.api.annotation.management.Management;
import org.fabric3.api.annotation.management.ManagementOperation;
import org.fabric3.api.annotation.management.OperationType;
import org.fabric3.binding.zeromq.common.ZeroMQMetadata;
import org.fabric3.binding.zeromq.runtime.MessagingMonitor;
import org.fabric3.binding.zeromq.runtime.SocketAddress;
import org.fabric3.binding.zeromq.runtime.context.ContextManager;
import org.fabric3.binding.zeromq.runtime.federation.AddressListener;
import org.fabric3.binding.zeromq.runtime.handler.AsyncFanOutHandler;
import org.fabric3.spi.channel.ChannelConnection;
import org.fabric3.spi.channel.EventStreamHandler;
import org.zeromq.ZMQ;

@Management
/* loaded from: input_file:org/fabric3/binding/zeromq/runtime/message/NonReliableSubscriber.class */
public class NonReliableSubscriber extends AbstractStatistics implements Subscriber, AddressListener, Thread.UncaughtExceptionHandler {
    private static final byte[] EMPTY_BYTES = new byte[0];
    private String id;
    private ContextManager manager;
    private List<SocketAddress> addresses;
    private EventStreamHandler handler;
    private long pollTimeout;
    private ZeroMQMetadata metadata;
    private MessagingMonitor monitor;
    private AsyncFanOutHandler fanOutHandler;
    private SocketReceiver receiver;
    private String socketId = getClass().getName() + ":" + UUID.randomUUID();
    private AtomicInteger connectionCount = new AtomicInteger();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/fabric3/binding/zeromq/runtime/message/NonReliableSubscriber$SocketReceiver.class */
    public class SocketReceiver implements Runnable {
        private ZMQ.Socket socket;
        private ZMQ.Poller poller;
        private AtomicBoolean active;
        private AtomicBoolean doRefresh;

        private SocketReceiver() {
            this.active = new AtomicBoolean(true);
            this.doRefresh = new AtomicBoolean(true);
        }

        public void refresh() {
            this.doRefresh.set(true);
        }

        public synchronized void stop() {
            this.active.set(false);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                NonReliableSubscriber.this.startStatistics();
                while (this.active.get()) {
                    reconnect();
                    if (this.poller.poll(NonReliableSubscriber.this.pollTimeout) > 0) {
                        NonReliableSubscriber.this.handler.handle(this.socket.recv(0));
                        NonReliableSubscriber.this.messagesProcessed.incrementAndGet();
                    }
                }
                NonReliableSubscriber.this.startTime = 0L;
                closeSocket();
            } catch (RuntimeException e) {
                NonReliableSubscriber.this.schedule();
                throw e;
            }
        }

        private synchronized void reconnect() {
            if (this.doRefresh.getAndSet(false)) {
                closeSocket();
                NonReliableSubscriber.this.manager.reserve(NonReliableSubscriber.this.socketId);
                ZMQ.Context context = NonReliableSubscriber.this.manager.getContext();
                this.socket = context.socket(2);
                SocketHelper.configure(this.socket, NonReliableSubscriber.this.metadata);
                this.socket.subscribe(NonReliableSubscriber.EMPTY_BYTES);
                Iterator it = NonReliableSubscriber.this.addresses.iterator();
                while (it.hasNext()) {
                    this.socket.connect(((SocketAddress) it.next()).toProtocolString());
                }
                this.poller = context.poller();
                this.poller.register(this.socket, 1);
            }
        }

        private void closeSocket() {
            if (this.socket != null) {
                try {
                    this.socket.close();
                    NonReliableSubscriber.this.manager.release(NonReliableSubscriber.this.socketId);
                } catch (Throwable th) {
                    NonReliableSubscriber.this.manager.release(NonReliableSubscriber.this.socketId);
                    throw th;
                }
            }
        }
    }

    public NonReliableSubscriber(String str, ContextManager contextManager, List<SocketAddress> list, EventStreamHandler eventStreamHandler, ZeroMQMetadata zeroMQMetadata, long j, MessagingMonitor messagingMonitor) {
        this.id = str;
        this.manager = contextManager;
        this.addresses = list;
        this.handler = eventStreamHandler;
        this.metadata = zeroMQMetadata;
        this.pollTimeout = j * 1000;
        this.monitor = messagingMonitor;
        setFanOutHandler(this.handler);
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Subscriber
    @ManagementOperation(type = OperationType.POST)
    public void start() {
        if (this.receiver == null) {
            this.receiver = new SocketReceiver();
            schedule();
        }
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Subscriber
    @ManagementOperation(type = OperationType.POST)
    public void stop() {
        try {
            this.receiver.stop();
            this.receiver = null;
        } catch (Throwable th) {
            this.receiver = null;
            throw th;
        }
    }

    @ManagementOperation
    public List<String> getAddresses() {
        ArrayList arrayList = new ArrayList();
        Iterator<SocketAddress> it = this.addresses.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().toString());
        }
        return arrayList;
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Subscriber
    public void addConnection(URI uri, ChannelConnection channelConnection) {
        this.fanOutHandler.addConnection(uri, channelConnection);
        this.connectionCount.incrementAndGet();
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Subscriber
    public void removeConnection(URI uri) {
        this.fanOutHandler.removeConnection(uri);
        this.connectionCount.decrementAndGet();
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Subscriber
    public boolean hasConnections() {
        return this.connectionCount.get() > 0;
    }

    @Override // java.lang.Thread.UncaughtExceptionHandler
    public void uncaughtException(Thread thread, Throwable th) {
        this.monitor.error(th);
    }

    @Override // org.fabric3.binding.zeromq.runtime.federation.AddressListener
    public String getId() {
        return this.id;
    }

    @Override // org.fabric3.binding.zeromq.runtime.federation.AddressListener
    public void onUpdate(List<SocketAddress> list) {
        this.addresses = list;
        if (this.receiver != null) {
            this.receiver.refresh();
        }
    }

    private void setFanOutHandler(EventStreamHandler eventStreamHandler) {
        while (true) {
            if (eventStreamHandler == null) {
                break;
            }
            if (eventStreamHandler instanceof AsyncFanOutHandler) {
                this.fanOutHandler = (AsyncFanOutHandler) eventStreamHandler;
                break;
            }
            eventStreamHandler = eventStreamHandler.getNext();
        }
        if (this.fanOutHandler == null) {
            throw new AssertionError("Fanout handler not added to subscriber");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void schedule() {
        Thread thread = new Thread(this.receiver);
        thread.setUncaughtExceptionHandler(this);
        thread.start();
    }
}
