/*
 * Decompiled with CFR 0.152.
 */
package org.fabric3.binding.zeromq.runtime.message;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.fabric3.api.binding.zeromq.model.ZeroMQMetadata;
import org.fabric3.binding.zeromq.runtime.MessagingMonitor;
import org.fabric3.binding.zeromq.runtime.context.ContextManager;
import org.fabric3.binding.zeromq.runtime.message.RequestReplySender;
import org.fabric3.binding.zeromq.runtime.message.RoundRobinSocketMultiplexer;
import org.fabric3.spi.container.invocation.CallbackReferenceSerializer;
import org.fabric3.spi.container.invocation.WorkContext;
import org.fabric3.spi.federation.addressing.SocketAddress;
import org.oasisopen.sca.ServiceRuntimeException;
import org.oasisopen.sca.ServiceUnavailableException;
import org.zeromq.ZMQ;

public class NonReliableRequestReplySender
implements RequestReplySender,
Thread.UncaughtExceptionHandler {
    private static final Callable<byte[]> CALLABLE = new Callable<byte[]>(){

        @Override
        public byte[] call() throws Exception {
            return null;
        }
    };
    private static final Request SHUTDOWN = new Request(null, 0, null);
    private String id;
    private ContextManager manager;
    private List<SocketAddress> addresses;
    private long pollTimeout;
    private MessagingMonitor monitor;
    private Dispatcher dispatcher;
    private RoundRobinSocketMultiplexer multiplexer;
    private Map<ZMQ.Socket, ZMQ.Poller> pollers;
    private LinkedBlockingQueue<Request> queue;

    public NonReliableRequestReplySender(String id, ContextManager manager, List<SocketAddress> addresses, long pollTimeout, ZeroMQMetadata metadata, MessagingMonitor monitor) {
        this.id = id;
        this.manager = manager;
        this.addresses = addresses;
        this.pollTimeout = pollTimeout;
        this.monitor = monitor;
        this.multiplexer = new RoundRobinSocketMultiplexer(manager, 5, metadata);
        this.queue = new LinkedBlockingQueue();
        this.pollers = new ConcurrentHashMap<ZMQ.Socket, ZMQ.Poller>();
    }

    @Override
    public void start() {
        if (this.dispatcher == null) {
            this.dispatcher = new Dispatcher();
            this.schedule();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        try {
            this.dispatcher.stop();
            this.queue.put(SHUTDOWN);
        }
        catch (InterruptedException e) {
            this.monitor.error(e);
        }
        finally {
            this.dispatcher = null;
        }
    }

    public String getId() {
        return this.id;
    }

    public void onUpdate(List<SocketAddress> addresses) {
        this.addresses = addresses;
        this.dispatcher.refresh();
    }

    @Override
    public byte[] sendAndReply(byte[] message, int index, WorkContext workContext) {
        try {
            byte[] serializedWorkContext = this.serialize(workContext);
            Request request = new Request(message, index, serializedWorkContext);
            this.queue.put(request);
            return (byte[])request.get(100000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            throw new ServiceRuntimeException((Throwable)e);
        }
        catch (ExecutionException e) {
            throw new ServiceRuntimeException((Throwable)e);
        }
        catch (IOException | TimeoutException e) {
            throw new ServiceUnavailableException((Throwable)e);
        }
    }

    @Override
    public void uncaughtException(Thread t, Throwable e) {
        this.monitor.error(e);
    }

    private void schedule() {
        Thread thread = new Thread(this.dispatcher);
        thread.setUncaughtExceptionHandler(this);
        thread.start();
    }

    private byte[] serialize(WorkContext workContext) throws IOException {
        List stack = workContext.getCallbackReferences();
        if (stack == null || stack.isEmpty()) {
            return null;
        }
        return CallbackReferenceSerializer.serializeToBytes((List)stack);
    }

    private static class Request
    extends FutureTask<byte[]> {
        private byte[] payload;
        private byte[] workContext;
        private int index;

        public Request(byte[] payload, int index, byte[] workContext) {
            super(CALLABLE);
            this.payload = payload;
            this.index = index;
            this.workContext = workContext;
        }

        public byte[] getPayload() {
            return this.payload;
        }

        public int getIndex() {
            return this.index;
        }

        public byte[] getWorkContext() {
            return this.workContext;
        }

        @Override
        public void set(byte[] s) {
            super.set(s);
        }

        @Override
        protected void setException(Throwable t) {
            super.setException(t);
        }
    }

    private class Dispatcher
    implements Runnable {
        private AtomicBoolean active = new AtomicBoolean(true);
        private AtomicBoolean doRefresh = new AtomicBoolean(true);
        private ZMQ.Socket controlSocket;

        private Dispatcher() {
        }

        public void refresh() {
            this.doRefresh.set(true);
        }

        public void stop() {
            this.active.set(false);
        }

        @Override
        public void run() {
            while (this.active.get()) {
                try {
                    this.reconnect();
                    ArrayList<Request> drained = new ArrayList<Request>();
                    Request value = (Request)NonReliableRequestReplySender.this.queue.poll(NonReliableRequestReplySender.this.pollTimeout, TimeUnit.MICROSECONDS);
                    if (SHUTDOWN == value) {
                        NonReliableRequestReplySender.this.multiplexer.close();
                        this.controlSocket.close();
                        return;
                    }
                    if (!NonReliableRequestReplySender.this.multiplexer.isAvailable()) {
                        NonReliableRequestReplySender.this.monitor.dropMessage();
                        continue;
                    }
                    if (value != null) {
                        drained.add(value);
                        NonReliableRequestReplySender.this.queue.drainTo(drained);
                    }
                    for (Request request : drained) {
                        ZMQ.Socket socket = NonReliableRequestReplySender.this.multiplexer.get();
                        socket.send(request.getPayload(), 2);
                        int index = request.getIndex();
                        byte[] serializedIndex = ByteBuffer.allocate(4).putInt(index).array();
                        byte[] context = request.getWorkContext();
                        if (context != null && context.length > 0) {
                            socket.send(serializedIndex, 2);
                            socket.send(context, 0);
                        } else {
                            socket.send(serializedIndex, 0);
                        }
                        ZMQ.Poller poller = (ZMQ.Poller)NonReliableRequestReplySender.this.pollers.get(socket);
                        long val = poller.poll(NonReliableRequestReplySender.this.pollTimeout);
                        if (val < 0L) {
                            request.setException((Throwable)new ServiceUnavailableException("Timeout waiting on response"));
                            request.run();
                            continue;
                        }
                        byte[] controlPayload = this.controlSocket.recv(1);
                        if (controlPayload != null) {
                            NonReliableRequestReplySender.this.multiplexer.close();
                            if (this.controlSocket != null) {
                                this.controlSocket.close();
                            }
                            return;
                        }
                        byte[] response = socket.recv(0);
                        request.set(response);
                        request.run();
                    }
                }
                catch (RuntimeException e) {
                    NonReliableRequestReplySender.this.schedule();
                    throw e;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            NonReliableRequestReplySender.this.multiplexer.close();
            if (this.controlSocket != null) {
                this.controlSocket.close();
            }
        }

        private void reconnect() {
            if (!this.doRefresh.getAndSet(false)) {
                return;
            }
            if (this.controlSocket == null) {
                this.controlSocket = NonReliableRequestReplySender.this.manager.createControlSocket();
            }
            NonReliableRequestReplySender.this.multiplexer.update(NonReliableRequestReplySender.this.addresses);
            Collection<ZMQ.Socket> sockets = NonReliableRequestReplySender.this.multiplexer.getAll();
            NonReliableRequestReplySender.this.pollers.clear();
            for (ZMQ.Socket socket : sockets) {
                ZMQ.Poller poller = NonReliableRequestReplySender.this.manager.getContext().poller();
                poller.register(socket, 1);
                poller.register(this.controlSocket, 1);
                NonReliableRequestReplySender.this.pollers.put(socket, poller);
            }
        }
    }
}

