/*
 * Decompiled with CFR 0.152.
 */
package org.fabric3.binding.zeromq.runtime.message;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.fabric3.api.annotation.management.Management;
import org.fabric3.api.binding.zeromq.model.ZeroMQMetadata;
import org.fabric3.binding.zeromq.runtime.MessagingMonitor;
import org.fabric3.binding.zeromq.runtime.context.ContextManager;
import org.fabric3.binding.zeromq.runtime.message.AbstractReceiver;
import org.fabric3.spi.container.invocation.Message;
import org.fabric3.spi.container.invocation.MessageCache;
import org.fabric3.spi.container.invocation.WorkContext;
import org.fabric3.spi.container.wire.Interceptor;
import org.fabric3.spi.container.wire.InvocationChain;
import org.fabric3.spi.federation.addressing.SocketAddress;
import org.oasisopen.sca.ServiceRuntimeException;
import org.zeromq.ZMQ;

@Management
public class NonReliableRequestReplyReceiver
extends AbstractReceiver
implements Thread.UncaughtExceptionHandler {
    private static final Response SHUTDOWN = new Response(null, null);
    private LinkedBlockingQueue<Response> queue;
    private final long pollTimeout;

    public NonReliableRequestReplyReceiver(ContextManager manager, SocketAddress address, List<InvocationChain> chains, ExecutorService executorService, long pollTimeout, ZeroMQMetadata metadata, MessagingMonitor monitor) {
        super(manager, address, chains, 6, metadata, executorService, monitor);
        this.pollTimeout = pollTimeout;
        this.queue = new LinkedBlockingQueue();
    }

    @Override
    protected boolean invoke(ZMQ.Socket socket) {
        final byte[] clientId = socket.recv(1);
        if (clientId == null) {
            return false;
        }
        final byte[][] frames = new byte[3][];
        int i = 1;
        frames[0] = socket.recv(0);
        while (socket.hasReceiveMore()) {
            if (i > 2) {
                this.monitor.error("Invalid message: received more than three frames");
                return false;
            }
            frames[i] = socket.recv(0);
            ++i;
        }
        this.executorService.execute(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Message request = MessageCache.getAndResetMessage();
                try {
                    request.setBody((Object)frames[0]);
                    int methodIndex = ByteBuffer.wrap(frames[1]).getInt();
                    WorkContext context = NonReliableRequestReplyReceiver.this.setWorkContext(frames[2]);
                    request.setWorkContext(context);
                    Interceptor interceptor = NonReliableRequestReplyReceiver.this.interceptors[methodIndex];
                    Message response = interceptor.invoke(request);
                    Object responseBody = response.getBody();
                    if (!(responseBody instanceof byte[])) {
                        throw new ServiceRuntimeException("Return value not serialized");
                    }
                    try {
                        NonReliableRequestReplyReceiver.this.queue.put(new Response(clientId, (byte[])responseBody));
                    }
                    catch (InterruptedException e) {
                        Thread.interrupted();
                    }
                }
                finally {
                    request.reset();
                }
            }
        });
        return true;
    }

    @Override
    protected void response(ZMQ.Socket socket) {
        try {
            Response first = this.queue.poll(this.pollTimeout, TimeUnit.MICROSECONDS);
            if (first == null || SHUTDOWN == first) {
                return;
            }
            ArrayList<Response> drained = new ArrayList<Response>();
            drained.add(first);
            this.queue.drainTo(drained);
            for (Response response : drained) {
                if (SHUTDOWN == response) {
                    return;
                }
                socket.send(response.clientId, 2);
                socket.send(response.body, 0);
            }
        }
        catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    private static class Response {
        private byte[] clientId;
        private byte[] body;

        private Response(byte[] clientId, byte[] body) {
            this.clientId = clientId;
            this.body = body;
        }
    }
}

