/*
 * Decompiled with CFR 0.152.
 */
package org.fabric3.binding.zeromq.runtime.message;

import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.fabric3.api.annotation.management.Management;
import org.fabric3.api.annotation.management.ManagementOperation;
import org.fabric3.api.annotation.management.OperationType;
import org.fabric3.api.binding.zeromq.model.ZeroMQMetadata;
import org.fabric3.binding.zeromq.runtime.MessagingMonitor;
import org.fabric3.binding.zeromq.runtime.context.ContextManager;
import org.fabric3.binding.zeromq.runtime.message.Publisher;
import org.fabric3.binding.zeromq.runtime.message.SocketHelper;
import org.fabric3.spi.federation.addressing.SocketAddress;
import org.fabric3.spi.host.Port;
import org.zeromq.ZMQ;

@Management
public class NonReliableQueuedPublisher
implements Publisher,
Thread.UncaughtExceptionHandler {
    private static final byte[] SHUTDOWN = new byte[0];
    private ContextManager manager;
    private SocketAddress address;
    private long pollTimeout;
    private ZeroMQMetadata metadata;
    private MessagingMonitor monitor;
    private ZMQ.Socket socket;
    private Dispatcher dispatcher;
    private LinkedBlockingQueue<Object> queue;

    public NonReliableQueuedPublisher(ContextManager manager, SocketAddress address, ZeroMQMetadata metadata, long pollTimeout, MessagingMonitor monitor) {
        this.manager = manager;
        this.address = address;
        this.pollTimeout = pollTimeout;
        this.metadata = metadata;
        this.monitor = monitor;
        this.queue = new LinkedBlockingQueue();
    }

    @Override
    @ManagementOperation(type=OperationType.POST)
    public void start() {
        if (this.dispatcher == null) {
            this.dispatcher = new Dispatcher();
            this.schedule();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @ManagementOperation(type=OperationType.POST)
    public void stop() {
        try {
            if (this.dispatcher != null) {
                this.dispatcher.stop();
            }
            this.queue.put(SHUTDOWN);
        }
        catch (InterruptedException e) {
            this.monitor.error(e);
        }
        finally {
            this.dispatcher = null;
        }
    }

    @ManagementOperation
    public String getAddress() {
        return this.address.toString();
    }

    @Override
    public void publish(byte[] message) {
        try {
            this.queue.put(message);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    @Override
    public void publish(byte[][] message) {
        try {
            this.queue.put(message);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    @Override
    public void uncaughtException(Thread t, Throwable e) {
        this.monitor.error(e);
    }

    private void schedule() {
        Thread thread = new Thread(this.dispatcher);
        thread.setUncaughtExceptionHandler(this);
        thread.start();
    }

    private class Dispatcher
    implements Runnable {
        private AtomicBoolean active = new AtomicBoolean(true);

        private Dispatcher() {
        }

        public void stop() {
            this.active.set(false);
        }

        @Override
        public void run() {
            String id = this.getClass().getName() + ":" + UUID.randomUUID().toString();
            NonReliableQueuedPublisher.this.manager.reserve(id);
            NonReliableQueuedPublisher.this.socket = NonReliableQueuedPublisher.this.manager.getContext().socket(1);
            SocketHelper.configure(NonReliableQueuedPublisher.this.socket, NonReliableQueuedPublisher.this.metadata);
            NonReliableQueuedPublisher.this.address.getPort().bind(Port.TYPE.TCP);
            NonReliableQueuedPublisher.this.socket.bind(NonReliableQueuedPublisher.this.address.toProtocolString());
            while (this.active.get()) {
                try {
                    Object value = NonReliableQueuedPublisher.this.queue.poll(NonReliableQueuedPublisher.this.pollTimeout, TimeUnit.MILLISECONDS);
                    if (SHUTDOWN == value) {
                        this.close(id);
                        return;
                    }
                    if (value == null) continue;
                    ArrayList drained = new ArrayList();
                    drained.add(value);
                    NonReliableQueuedPublisher.this.queue.drainTo(drained);
                    for (Object object : drained) {
                        if (object instanceof byte[]) {
                            NonReliableQueuedPublisher.this.socket.send((byte[])object, 0);
                            continue;
                        }
                        if (object instanceof byte[][]) {
                            byte[][] byteArray = (byte[][])object;
                            int length = byteArray.length;
                            for (int i = 0; i < length - 1; ++i) {
                                byte[] bytes = byteArray[i];
                                NonReliableQueuedPublisher.this.socket.send(bytes, 2);
                            }
                            NonReliableQueuedPublisher.this.socket.send(byteArray[length - 1], 0);
                            continue;
                        }
                        NonReliableQueuedPublisher.this.monitor.error("Unknown object type:" + object.getClass().getName());
                    }
                }
                catch (RuntimeException e) {
                    NonReliableQueuedPublisher.this.manager.release(id);
                    NonReliableQueuedPublisher.this.schedule();
                    throw e;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            this.close(id);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void close(String id) {
            if (NonReliableQueuedPublisher.this.socket != null) {
                try {
                    NonReliableQueuedPublisher.this.socket.close();
                }
                finally {
                    NonReliableQueuedPublisher.this.manager.release(id);
                }
            }
        }
    }
}

