/*
 * Decompiled with CFR 0.152.
 */
package org.fabric3.binding.zeromq.runtime.message;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.fabric3.api.annotation.management.Management;
import org.fabric3.api.binding.zeromq.model.ZeroMQMetadata;
import org.fabric3.binding.zeromq.runtime.MessagingMonitor;
import org.fabric3.binding.zeromq.runtime.context.ContextManager;
import org.fabric3.binding.zeromq.runtime.message.OneWaySender;
import org.fabric3.binding.zeromq.runtime.message.RoundRobinSocketMultiplexer;
import org.fabric3.binding.zeromq.runtime.message.SocketMultiplexer;
import org.fabric3.spi.container.invocation.CallbackReferenceSerializer;
import org.fabric3.spi.container.invocation.WorkContext;
import org.fabric3.spi.federation.addressing.SocketAddress;
import org.oasisopen.sca.ServiceRuntimeException;
import org.zeromq.ZMQ;

@Management
public class NonReliableOneWaySender
implements OneWaySender,
Thread.UncaughtExceptionHandler {
    private static final Request SHUTDOWN = new Request(null, 0, null);
    private String id;
    private List<SocketAddress> addresses;
    private MessagingMonitor monitor;
    private SocketMultiplexer multiplexer;
    private Dispatcher dispatcher;
    private LinkedBlockingQueue<Request> queue;
    private long pollTimeout;

    public NonReliableOneWaySender(String id, ContextManager manager, List<SocketAddress> addresses, long pollTimeout, ZeroMQMetadata metadata, MessagingMonitor monitor) {
        this.id = id;
        this.addresses = addresses;
        this.pollTimeout = pollTimeout;
        this.monitor = monitor;
        this.queue = new LinkedBlockingQueue();
        this.multiplexer = new RoundRobinSocketMultiplexer(manager, 8, metadata);
    }

    @Override
    public void start() {
        if (this.dispatcher == null) {
            this.dispatcher = new Dispatcher();
            this.schedule();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        try {
            this.dispatcher.stop();
            this.queue.put(SHUTDOWN);
        }
        catch (InterruptedException e) {
            this.monitor.error(e);
        }
        finally {
            this.dispatcher = null;
        }
    }

    public String getId() {
        return this.id;
    }

    public void onUpdate(List<SocketAddress> addresses) {
        this.addresses = addresses;
        this.dispatcher.refresh();
    }

    @Override
    public void send(byte[] message, int index, WorkContext workContext) {
        try {
            Request request = new Request(message, index, this.serialize(workContext));
            this.queue.put(request);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            throw new ServiceRuntimeException((Throwable)e);
        }
        catch (IOException e) {
            throw new ServiceRuntimeException((Throwable)e);
        }
    }

    @Override
    public void uncaughtException(Thread t, Throwable e) {
        this.monitor.error(e);
    }

    private void schedule() {
        Thread thread = new Thread(this.dispatcher);
        thread.setUncaughtExceptionHandler(this);
        thread.start();
    }

    private byte[] serialize(WorkContext workContext) throws IOException {
        List stack = workContext.getCallbackReferences();
        if (stack == null || stack.isEmpty()) {
            return null;
        }
        return CallbackReferenceSerializer.serializeToBytes((List)stack);
    }

    private static class Request {
        private byte[] payload;
        private byte[] workContext;
        private int index;

        public Request(byte[] payload, int index, byte[] workContext) {
            this.payload = payload;
            this.index = index;
            this.workContext = workContext;
        }

        public byte[] getPayload() {
            return this.payload;
        }

        public int getIndex() {
            return this.index;
        }

        public byte[] getWorkContext() {
            return this.workContext;
        }
    }

    private class Dispatcher
    implements Runnable {
        private AtomicBoolean active = new AtomicBoolean(true);
        private AtomicBoolean doRefresh = new AtomicBoolean(true);

        private Dispatcher() {
        }

        public void refresh() {
            this.doRefresh.set(true);
        }

        public void stop() {
            this.active.set(false);
        }

        @Override
        public void run() {
            while (this.active.get()) {
                try {
                    this.reconnect();
                    ArrayList<Request> drained = new ArrayList<Request>();
                    Request value = (Request)NonReliableOneWaySender.this.queue.poll(NonReliableOneWaySender.this.pollTimeout, TimeUnit.MICROSECONDS);
                    if (SHUTDOWN == value) {
                        NonReliableOneWaySender.this.multiplexer.close();
                        return;
                    }
                    if (!NonReliableOneWaySender.this.multiplexer.isAvailable()) {
                        NonReliableOneWaySender.this.monitor.dropMessage();
                        continue;
                    }
                    if (value != null) {
                        drained.add(value);
                        NonReliableOneWaySender.this.queue.drainTo(drained);
                    }
                    for (Request request : drained) {
                        if (SHUTDOWN == request) {
                            NonReliableOneWaySender.this.multiplexer.close();
                            return;
                        }
                        ZMQ.Socket socket = NonReliableOneWaySender.this.multiplexer.get();
                        socket.send(request.getPayload(), 2);
                        int index = request.getIndex();
                        byte[] context = request.getWorkContext();
                        byte[] serializedIndex = ByteBuffer.allocate(4).putInt(index).array();
                        if (context != null && context.length > 0) {
                            socket.send(serializedIndex, 2);
                            socket.send(context, 0);
                            continue;
                        }
                        socket.send(serializedIndex, 0);
                    }
                }
                catch (RuntimeException e) {
                    NonReliableOneWaySender.this.schedule();
                    throw e;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            NonReliableOneWaySender.this.multiplexer.close();
        }

        private void reconnect() {
            if (!this.doRefresh.getAndSet(false)) {
                return;
            }
            NonReliableOneWaySender.this.multiplexer.update(NonReliableOneWaySender.this.addresses);
        }
    }
}

