package org.fabric3.binding.zeromq.runtime.message;

import java.lang.Thread;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.fabric3.api.annotation.management.Management;
import org.fabric3.api.annotation.management.ManagementOperation;
import org.fabric3.api.annotation.management.OperationType;
import org.fabric3.binding.zeromq.common.ZeroMQMetadata;
import org.fabric3.binding.zeromq.runtime.MessagingMonitor;
import org.fabric3.binding.zeromq.runtime.SocketAddress;
import org.fabric3.binding.zeromq.runtime.context.ContextManager;
import org.fabric3.spi.host.Port;
import org.zeromq.ZMQ;

@Management
/* loaded from: input_file:org/fabric3/binding/zeromq/runtime/message/NonReliablePublisher.class */
public class NonReliablePublisher extends AbstractStatistics implements Publisher, Thread.UncaughtExceptionHandler {
    private ContextManager manager;
    private SocketAddress address;
    private long pollTimeout;
    private ZeroMQMetadata metadata;
    private MessagingMonitor monitor;
    private ZMQ.Socket socket;
    private Dispatcher dispatcher;
    private LinkedBlockingQueue<byte[]> queue = new LinkedBlockingQueue<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/fabric3/binding/zeromq/runtime/message/NonReliablePublisher$Dispatcher.class */
    public class Dispatcher implements Runnable {
        private AtomicBoolean active;

        private Dispatcher() {
            this.active = new AtomicBoolean(true);
        }

        public void stop() {
            this.active.set(false);
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            String str = getClass().getName() + ":" + UUID.randomUUID().toString();
            NonReliablePublisher.this.manager.reserve(str);
            NonReliablePublisher.this.socket = NonReliablePublisher.this.manager.getContext().socket(1);
            SocketHelper.configure(NonReliablePublisher.this.socket, NonReliablePublisher.this.metadata);
            NonReliablePublisher.this.address.getPort().bind(Port.TYPE.TCP);
            NonReliablePublisher.this.socket.bind(NonReliablePublisher.this.address.toProtocolString());
            NonReliablePublisher.this.startStatistics();
            while (this.active.get()) {
                try {
                    byte[] bArr = (byte[]) NonReliablePublisher.this.queue.poll(NonReliablePublisher.this.pollTimeout, TimeUnit.MILLISECONDS);
                    if (bArr != null) {
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(bArr);
                        NonReliablePublisher.this.queue.drainTo(arrayList);
                        Iterator it = arrayList.iterator();
                        while (it.hasNext()) {
                            NonReliablePublisher.this.socket.send((byte[]) it.next(), 0);
                        }
                        NonReliablePublisher.this.messagesProcessed.incrementAndGet();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (RuntimeException e2) {
                    NonReliablePublisher.this.manager.release(str);
                    NonReliablePublisher.this.schedule();
                    throw e2;
                }
            }
            NonReliablePublisher.this.startTime = 0L;
            if (NonReliablePublisher.this.socket != null) {
                try {
                    NonReliablePublisher.this.socket.close();
                    NonReliablePublisher.this.manager.release(str);
                } catch (Throwable th) {
                    NonReliablePublisher.this.manager.release(str);
                    throw th;
                }
            }
        }
    }

    public NonReliablePublisher(ContextManager contextManager, SocketAddress socketAddress, ZeroMQMetadata zeroMQMetadata, long j, MessagingMonitor messagingMonitor) {
        this.manager = contextManager;
        this.address = socketAddress;
        this.pollTimeout = j;
        this.metadata = zeroMQMetadata;
        this.monitor = messagingMonitor;
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Publisher
    @ManagementOperation(type = OperationType.POST)
    public void start() {
        if (this.dispatcher == null) {
            this.dispatcher = new Dispatcher();
            schedule();
        }
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Publisher
    @ManagementOperation(type = OperationType.POST)
    public void stop() {
        try {
            if (this.dispatcher != null) {
                this.dispatcher.stop();
            }
        } finally {
            this.dispatcher = null;
        }
    }

    @ManagementOperation
    public String getAddress() {
        return this.address.toString();
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Publisher
    public void publish(byte[] bArr) {
        try {
            this.queue.put(bArr);
        } catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    @Override // java.lang.Thread.UncaughtExceptionHandler
    public void uncaughtException(Thread thread, Throwable th) {
        this.monitor.error(th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void schedule() {
        Thread thread = new Thread(this.dispatcher);
        thread.setUncaughtExceptionHandler(this);
        thread.start();
    }
}
