/*
 * Decompiled with CFR 0.152.
 */
package org.fabric3.binding.zeromq.runtime.message;

import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.fabric3.api.binding.zeromq.model.ZeroMQMetadata;
import org.fabric3.binding.zeromq.runtime.MessagingMonitor;
import org.fabric3.binding.zeromq.runtime.context.ContextManager;
import org.fabric3.binding.zeromq.runtime.message.SocketHelper;
import org.fabric3.spi.container.invocation.CallbackReferenceSerializer;
import org.fabric3.spi.container.invocation.WorkContext;
import org.fabric3.spi.container.invocation.WorkContextCache;
import org.fabric3.spi.container.wire.Interceptor;
import org.fabric3.spi.container.wire.InvocationChain;
import org.fabric3.spi.federation.addressing.SocketAddress;
import org.fabric3.spi.host.Port;
import org.oasisopen.sca.ServiceRuntimeException;
import org.zeromq.ZMQ;

public abstract class AbstractReceiver
implements org.fabric3.binding.zeromq.runtime.message.Receiver,
Thread.UncaughtExceptionHandler {
    protected ContextManager manager;
    protected SocketAddress address;
    protected ExecutorService executorService;
    protected int socketType;
    protected Interceptor[] interceptors;
    protected MessagingMonitor monitor;
    protected Receiver receiver;
    protected ZeroMQMetadata metadata;
    protected String id = this.getClass().getName() + ":" + UUID.randomUUID().toString();

    public AbstractReceiver(ContextManager manager, SocketAddress address, List<InvocationChain> chains, int socketType, ZeroMQMetadata metadata, ExecutorService executorService, MessagingMonitor monitor) {
        this.manager = manager;
        this.address = address;
        this.executorService = executorService;
        this.interceptors = new Interceptor[chains.size()];
        int chainsSize = chains.size();
        for (int i = 0; i < chainsSize; ++i) {
            InvocationChain chain = chains.get(i);
            this.interceptors[i] = chain.getHeadInterceptor();
        }
        this.socketType = socketType;
        this.metadata = metadata;
        this.monitor = monitor;
    }

    @Override
    public void start() {
        if (this.receiver == null) {
            this.receiver = new Receiver();
            this.schedule();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        try {
            this.receiver.stop();
        }
        finally {
            this.receiver = null;
        }
    }

    @Override
    public SocketAddress getAddress() {
        return this.address;
    }

    @Override
    public void uncaughtException(Thread t, Throwable e) {
        this.monitor.error(e);
    }

    private void schedule() {
        this.executorService.submit(this.receiver);
    }

    protected WorkContext setWorkContext(byte[] header) {
        try {
            WorkContext workContext = WorkContextCache.getAndResetThreadWorkContext();
            if (header == null || header.length == 0) {
                return workContext;
            }
            List stack = CallbackReferenceSerializer.deserialize((byte[])header);
            if (!stack.isEmpty()) {
                stack.add(stack.get(stack.size() - 1));
            }
            workContext.addCallbackReferences(stack);
            return workContext;
        }
        catch (IOException e) {
            throw new ServiceRuntimeException("Error deserializing callback references", (Throwable)e);
        }
    }

    protected abstract boolean invoke(ZMQ.Socket var1);

    protected abstract void response(ZMQ.Socket var1);

    private class Receiver
    implements Runnable {
        private ZMQ.Socket socket;
        private ZMQ.Socket controlSocket;
        private ZMQ.Poller poller;
        private AtomicBoolean active = new AtomicBoolean(true);

        private Receiver() {
        }

        public synchronized void stop() {
            this.active.set(false);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block12: {
                try {
                    this.bind();
                    while (this.active.get()) {
                        if (this.poller == null) {
                            AbstractReceiver.this.monitor.error("Failed to initialize ZeroMQ socket, aborting receiver");
                            return;
                        }
                        long val = this.poller.poll();
                        if (val <= 0L) continue;
                        byte[] controlPayload = this.controlSocket.recv(1);
                        if (controlPayload != null) {
                            try {
                                this.socket.close();
                                this.controlSocket.close();
                            }
                            finally {
                                AbstractReceiver.this.manager.release(AbstractReceiver.this.id);
                            }
                            return;
                        }
                        if (!AbstractReceiver.this.invoke(this.socket)) continue;
                        AbstractReceiver.this.response(this.socket);
                    }
                    if (this.socket == null) break block12;
                    try {
                        this.socket.close();
                        this.controlSocket.close();
                    }
                    finally {
                        AbstractReceiver.this.manager.release(AbstractReceiver.this.id);
                    }
                    this.socket = null;
                }
                catch (RuntimeException e) {
                    AbstractReceiver.this.manager.release(AbstractReceiver.this.id);
                    AbstractReceiver.this.schedule();
                    throw e;
                }
            }
        }

        private void bind() {
            if (this.socket != null) {
                return;
            }
            AbstractReceiver.this.manager.reserve(AbstractReceiver.this.id);
            this.socket = AbstractReceiver.this.manager.getContext().socket(AbstractReceiver.this.socketType);
            SocketHelper.configure(this.socket, AbstractReceiver.this.metadata);
            AbstractReceiver.this.address.getPort().bind(Port.TYPE.TCP);
            this.socket.bind(AbstractReceiver.this.address.toProtocolString());
            this.controlSocket = AbstractReceiver.this.manager.createControlSocket();
            this.poller = AbstractReceiver.this.manager.getContext().poller();
            this.poller.register(this.controlSocket, 1);
            this.poller.register(this.socket, 1);
        }
    }
}

