/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.nephele.rpc;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.minlog.Log;
import eu.stratosphere.nephele.rpc.MultiPacketOutputStream;
import eu.stratosphere.nephele.rpc.NetworkThread;
import eu.stratosphere.nephele.rpc.RPCCleanup;
import eu.stratosphere.nephele.rpc.RPCEnvelope;
import eu.stratosphere.nephele.rpc.RPCMessage;
import eu.stratosphere.nephele.rpc.RPCProtocol;
import eu.stratosphere.nephele.rpc.RPCRequest;
import eu.stratosphere.nephele.rpc.RPCResponse;
import eu.stratosphere.nephele.rpc.RPCReturnValue;
import eu.stratosphere.nephele.rpc.RPCStatistics;
import eu.stratosphere.nephele.rpc.RPCThrowable;
import eu.stratosphere.util.KryoUtil;
import eu.stratosphere.util.StringUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.DatagramPacket;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public final class RPCService {
    private static final int DEFAULT_NUM_RPC_HANDLERS = 1;
    static final int CLEANUP_INTERVAL = 10000;
    private static final int RPC_TIMEOUT = 60000;
    private final ExecutorService rpcHandlers;
    private final int rpcPort;
    private final NetworkThread networkThread;
    private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
    private final RPCStatistics statistics = new RPCStatistics();
    private final Timer cleanupTimer = new Timer();
    private final ConcurrentHashMap<String, RPCProtocol> callbackHandlers = new ConcurrentHashMap();
    private final ConcurrentHashMap<Integer, RPCRequestMonitor> pendingRequests = new ConcurrentHashMap();
    private final ConcurrentHashMap<Integer, RPCRequest> requestsBeingProcessed = new ConcurrentHashMap();
    private final ConcurrentHashMap<Integer, CachedResponse> cachedResponses = new ConcurrentHashMap();

    public RPCService() throws IOException {
        this(1);
    }

    public RPCService(int numRPCHandlers) throws IOException {
        this.rpcHandlers = Executors.newFixedThreadPool(numRPCHandlers);
        this.rpcPort = -1;
        this.networkThread = new NetworkThread(this, -1);
        this.networkThread.start();
        this.cleanupTimer.schedule((TimerTask)new CleanupTask(), 10000L, 10000L);
    }

    public RPCService(int rpcPort, int numRPCHandlers) throws IOException {
        this.rpcHandlers = Executors.newFixedThreadPool(numRPCHandlers);
        this.rpcPort = rpcPort;
        this.networkThread = new NetworkThread(this, rpcPort);
        this.networkThread.start();
        this.cleanupTimer.schedule((TimerTask)new CleanupTask(), 10000L, 10000L);
    }

    public <T extends RPCProtocol> T getProxy(InetSocketAddress remoteAddress, Class<T> protocol) {
        Class[] interfaces = new Class[]{protocol};
        return (T)((RPCProtocol)Proxy.newProxyInstance(RPCService.class.getClassLoader(), interfaces, (InvocationHandler)new RPCInvocationHandler(remoteAddress, protocol.getName())));
    }

    public int getRPCPort() {
        return this.rpcPort;
    }

    public void setProtocolCallbackHandler(Class<? extends RPCProtocol> protocol, RPCProtocol callbackHandler) {
        RPCService.checkRPCProtocol(protocol);
        if (this.callbackHandlers.putIfAbsent(protocol.getName(), callbackHandler) != null) {
            Log.error((String)("There is already a protocol call back handler set for protocol " + protocol.getName()));
        }
    }

    public void shutDown() {
        if (!this.shutdownRequested.compareAndSet(false, true)) {
            return;
        }
        try {
            this.networkThread.shutdown();
        }
        catch (InterruptedException ie) {
            Log.debug((String)"Caught exception while waiting for network thread to shut down: ", (Throwable)ie);
        }
        this.rpcHandlers.shutdown();
        try {
            this.rpcHandlers.awaitTermination(5000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException ie) {
            Log.debug((String)"Caught exception while waiting for RPC handlers to finish: ", (Throwable)ie);
        }
        this.cleanupTimer.cancel();
        this.statistics.processCollectedData();
    }

    void processIncomingRPCCleanup(RPCCleanup rpcCleanup) {
        this.cachedResponses.remove(rpcCleanup.getMessageID());
    }

    void processIncomingRPCMessage(final InetSocketAddress remoteSocketAddress, final Input input) {
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                Kryo k = KryoUtil.getKryo();
                k.reset();
                RPCEnvelope envelope = (RPCEnvelope)k.readObject(input, RPCEnvelope.class);
                RPCMessage msg = envelope.getRPCMessage();
                if (msg instanceof RPCRequest) {
                    RPCService.this.processIncomingRPCRequest(remoteSocketAddress, (RPCRequest)msg);
                } else if (msg instanceof RPCResponse) {
                    RPCService.this.processIncomingRPCResponse((RPCResponse)msg);
                } else {
                    RPCService.this.processIncomingRPCCleanup((RPCCleanup)msg);
                }
            }
        };
        this.rpcHandlers.execute(runnable);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void processIncomingRPCResponse(RPCResponse rpcResponse) {
        Integer messageID = rpcResponse.getMessageID();
        RPCRequestMonitor requestMonitor = this.pendingRequests.get(messageID);
        if (requestMonitor == null) {
            return;
        }
        RPCRequestMonitor rPCRequestMonitor = requestMonitor;
        synchronized (rPCRequestMonitor) {
            requestMonitor.rpcResponse = rpcResponse;
            requestMonitor.notify();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Object sendRPCRequest(InetSocketAddress remoteSocketAddress, RPCRequest request) throws Throwable {
        int numberOfRetries;
        if (this.shutdownRequested.get()) {
            throw new IOException("Shutdown of RPC service has already been requested");
        }
        long start = System.currentTimeMillis();
        DatagramPacket[] packets = this.messageToPackets(remoteSocketAddress, request);
        Integer messageID = request.getMessageID();
        RPCRequestMonitor requestMonitor = new RPCRequestMonitor();
        this.pendingRequests.put(messageID, requestMonitor);
        RPCResponse rpcResponse = null;
        try {
            numberOfRetries = this.networkThread.send(packets);
            RPCRequestMonitor rPCRequestMonitor = requestMonitor;
            synchronized (rPCRequestMonitor) {
                while (true) {
                    if (requestMonitor.rpcResponse != null) {
                        rpcResponse = requestMonitor.rpcResponse;
                        break;
                    }
                    long sleepTime = 60000L - (System.currentTimeMillis() - start);
                    if (sleepTime <= 0L) break;
                    requestMonitor.wait(sleepTime);
                }
            }
        }
        finally {
            this.pendingRequests.remove(messageID);
        }
        if (rpcResponse == null) {
            throw new IOException("Unable to complete RPC of method " + request.getMethodName() + " on " + remoteSocketAddress);
        }
        String methodName = request.getMethodName();
        this.statistics.reportSuccessfulTransmission(methodName, packets.length, numberOfRetries);
        this.statistics.reportRTT(methodName, (int)(System.currentTimeMillis() - start));
        if (rpcResponse instanceof RPCReturnValue) {
            return ((RPCReturnValue)rpcResponse).getRetVal();
        }
        throw ((RPCThrowable)rpcResponse).getThrowable();
    }

    private boolean isThrowableRegistered(Class<? extends Throwable> throwableType) {
        Kryo kryo = KryoUtil.getKryo();
        try {
            kryo.getRegistration(throwableType);
        }
        catch (IllegalArgumentException e) {
            return false;
        }
        return true;
    }

    private DatagramPacket[] messageToPackets(InetSocketAddress remoteSocketAddress, RPCMessage rpcMessage) {
        MultiPacketOutputStream mpos = new MultiPacketOutputStream(1024);
        Kryo kryo = KryoUtil.getKryo();
        kryo.reset();
        Output output = new Output((OutputStream)mpos);
        kryo.writeObject(output, (Object)new RPCEnvelope(rpcMessage));
        output.close();
        mpos.close();
        return mpos.createPackets(remoteSocketAddress);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processIncomingRPCRequest(InetSocketAddress remoteSocketAddress, RPCRequest rpcRequest) {
        Integer messageID = rpcRequest.getMessageID();
        if (this.requestsBeingProcessed.putIfAbsent(messageID, rpcRequest) != null) {
            Log.debug((String)("Request " + rpcRequest.getMessageID() + " is already being processed at the moment"));
            return;
        }
        CachedResponse cachedResponse = this.cachedResponses.get(messageID);
        if (cachedResponse != null) {
            try {
                int numberOfRetries = this.networkThread.send(cachedResponse.packets);
                this.statistics.reportSuccessfulTransmission(rpcRequest.getMethodName() + " (Response)", cachedResponse.packets.length, numberOfRetries);
            }
            catch (Exception e) {
                Log.error((String)"Caught exception while trying to send RPC response: ", (Throwable)e);
            }
            finally {
                this.requestsBeingProcessed.remove(messageID);
            }
            return;
        }
        RPCProtocol callbackHandler = this.callbackHandlers.get(rpcRequest.getInterfaceName());
        if (callbackHandler == null) {
            Log.error((String)("Cannot find callback handler for protocol " + rpcRequest.getInterfaceName()));
            this.requestsBeingProcessed.remove(messageID);
            return;
        }
        try {
            Method method = callbackHandler.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
            RPCResponse rpcResponse = null;
            try {
                Object retVal = method.invoke((Object)callbackHandler, rpcRequest.getArgs());
                rpcResponse = new RPCReturnValue(rpcRequest.getMessageID(), retVal);
            }
            catch (InvocationTargetException ite) {
                Throwable targetException = ite.getTargetException();
                targetException.getStackTrace();
                if (!this.isThrowableRegistered(targetException.getClass())) {
                    targetException = RPCService.wrapInIOException(rpcRequest, targetException);
                }
                rpcResponse = new RPCThrowable(rpcRequest.getMessageID(), targetException);
            }
            DatagramPacket[] packets = this.messageToPackets(remoteSocketAddress, rpcResponse);
            this.cachedResponses.put(messageID, new CachedResponse(System.currentTimeMillis(), packets));
            int numberOfRetries = this.networkThread.send(packets);
            this.statistics.reportSuccessfulTransmission(rpcRequest.getMethodName() + " (Response)", packets.length, numberOfRetries);
        }
        catch (Exception e) {
            Log.error((String)"Caught processing RPC request: ", (Throwable)e);
        }
        finally {
            this.requestsBeingProcessed.remove(messageID);
        }
    }

    static int decodeInteger(short val) {
        return val - Short.MIN_VALUE - 1;
    }

    static short encodeInteger(int val) {
        if (val < -1 || val > 65534) {
            throw new IllegalArgumentException("Value must be in the range -1 and 65534 but is " + val);
        }
        return (short)(val - Short.MIN_VALUE + 1);
    }

    private static final void checkRPCProtocol(Class<? extends RPCProtocol> protocol) {
        block7: {
            if (!protocol.isInterface()) {
                throw new IllegalArgumentException("Provided protocol " + protocol + " is not an interface");
            }
            try {
                Method[] methods = protocol.getMethods();
                for (int i = 0; i < methods.length; ++i) {
                    Method method = methods[i];
                    Class<?>[] exceptionTypes = method.getExceptionTypes();
                    boolean ioExceptionFound = false;
                    boolean interruptedExceptionFound = false;
                    for (int j = 0; j < exceptionTypes.length; ++j) {
                        if (IOException.class.equals(exceptionTypes[j])) {
                            ioExceptionFound = true;
                            continue;
                        }
                        if (!InterruptedException.class.equals(exceptionTypes[j])) continue;
                        interruptedExceptionFound = true;
                    }
                    if (!ioExceptionFound) {
                        throw new IllegalArgumentException("Method " + method.getName() + " of protocol " + protocol.getName() + " must be declared to throw an IOException");
                    }
                    if (interruptedExceptionFound) continue;
                    throw new IllegalArgumentException("Method " + method.getName() + " of protocol " + protocol.getName() + " must be declared to throw an InterruptedException");
                }
            }
            catch (SecurityException se) {
                if (!Log.DEBUG) break block7;
                Log.debug((String)StringUtils.stringifyException((Throwable)se));
            }
        }
    }

    private static IOException wrapInIOException(RPCRequest request, Throwable throwable) {
        StringBuilder sb = new StringBuilder("The remote procedure call of method ");
        sb.append(request.getInterfaceName());
        sb.append('.');
        sb.append(request.getMethodName());
        sb.append(" caused an unregistered exception: ");
        sb.append(StringUtils.stringifyException((Throwable)throwable));
        return new IOException(sb.toString());
    }

    private static final class RPCRequestMonitor {
        private RPCResponse rpcResponse = null;

        private RPCRequestMonitor() {
        }
    }

    private final class RPCInvocationHandler
    implements InvocationHandler {
        private final InetSocketAddress remoteSocketAddress;
        private final String interfaceName;

        private RPCInvocationHandler(InetSocketAddress remoteSocketAddress, String interfaceName) {
            this.remoteSocketAddress = remoteSocketAddress;
            this.interfaceName = interfaceName;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            int messageID = (int)(-2.147483648E9 + Math.random() * 2.147483647E9 * 2.0);
            RPCRequest rpcRequest = new RPCRequest(messageID, this.interfaceName, method, args);
            return RPCService.this.sendRPCRequest(this.remoteSocketAddress, rpcRequest);
        }
    }

    private final class CleanupTask
    extends TimerTask {
        private CleanupTask() {
        }

        @Override
        public void run() {
            RPCService.this.statistics.processCollectedData();
            long now = System.currentTimeMillis();
            Iterator it = RPCService.this.cachedResponses.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = it.next();
                CachedResponse cachedResponse = (CachedResponse)entry.getValue();
                if (cachedResponse.creationTime + 10000L >= now) continue;
                it.remove();
            }
            RPCService.this.networkThread.cleanUpStaleState();
        }
    }

    private static final class CachedResponse {
        private final long creationTime;
        private final DatagramPacket[] packets;

        private CachedResponse(long creationTime, DatagramPacket[] packets) {
            this.creationTime = creationTime;
            this.packets = packets;
        }
    }
}

