/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.nephele.rpc;

import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.minlog.Log;
import eu.stratosphere.nephele.rpc.MultiPacketInputStream;
import eu.stratosphere.nephele.rpc.NumberUtils;
import eu.stratosphere.nephele.rpc.RPCService;
import eu.stratosphere.nephele.rpc.SinglePacketInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;

final class NetworkThread
extends Thread {
    private static final int MAXIMUM_NUMBER_OF_RETRANSMISSIONS = 20;
    private static final int RETRANSMISSION_TIMEOUT = 100;
    private static final int MAXIMUM_NUMBER_OF_OUTSTANDING_PACKETS = 100;
    private final RPCService rpcService;
    private final DatagramSocket socket;
    private final ConcurrentHashMap<Integer, OutstandingTransmission> outstandingTransmissions = new ConcurrentHashMap();
    private final ConcurrentHashMap<Integer, MultiPacketInputStream> incompleteInputStreams = new ConcurrentHashMap();
    private volatile boolean shutdownRequested = false;

    NetworkThread(RPCService rpcService, int rpcPort) throws IOException {
        super("RPC Network Thread");
        this.rpcService = rpcService;
        this.socket = rpcPort == -1 ? new DatagramSocket() : new DatagramSocket(rpcPort);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        byte[] dataBuf = new byte[1024];
        DatagramPacket dataDP = new DatagramPacket(dataBuf, dataBuf.length);
        byte[] ackBuf = new byte[6];
        DatagramPacket ackDP = new DatagramPacket(ackBuf, ackBuf.length);
        while (!this.shutdownRequested) {
            InetSocketAddress remoteSocketAddress;
            int expectedIndex;
            int packetIndex;
            MultiPacketInputStream oldVal;
            Integer msgID;
            try {
                this.socket.receive(dataDP);
            }
            catch (SocketException se) {
                if (this.shutdownRequested) {
                    return;
                }
                Log.error((String)"Shutting down receiver thread due to error: ", (Throwable)se);
                return;
            }
            catch (IOException ioe) {
                Log.error((String)"Shutting down receiver thread due to error: ", (Throwable)ioe);
                return;
            }
            byte[] dbbuf = dataDP.getData();
            int length = dataDP.getLength();
            if (length < 8) {
                int messageID = NumberUtils.byteArrayToInteger(dbbuf, 0);
                int ackedPacket = RPCService.decodeInteger(NumberUtils.byteArrayToShort(dbbuf, 4));
                msgID = messageID;
                OutstandingTransmission outstandingTransmission = this.outstandingTransmissions.get(msgID);
                if (outstandingTransmission == null) continue;
                OutstandingTransmission outstandingTransmission2 = outstandingTransmission;
                synchronized (outstandingTransmission2) {
                    if (outstandingTransmission.lastAckedPacket < ackedPacket) {
                        outstandingTransmission.lastAckedPacket = ackedPacket;
                        outstandingTransmission.notify();
                    }
                    continue;
                }
            }
            int numberOfPackets = RPCService.decodeInteger(NumberUtils.byteArrayToShort(dbbuf, (length -= 8) + 2));
            int messageID = NumberUtils.byteArrayToInteger(dbbuf, length + 4);
            if (numberOfPackets == 1) {
                InetSocketAddress remoteSocketAddress2 = (InetSocketAddress)dataDP.getSocketAddress();
                NumberUtils.integerToByteArray(messageID, ackBuf, 0);
                NumberUtils.shortToByteArray(RPCService.encodeInteger(0), ackBuf, 4);
                ackDP.setSocketAddress(remoteSocketAddress2);
                try {
                    this.socket.send(ackDP);
                }
                catch (IOException ioe) {
                    if (this.shutdownRequested) {
                        return;
                    }
                    Log.error((String)"Shutting down receiver thread due to error: ", (Throwable)ioe);
                    return;
                }
                dataBuf = new byte[1024];
                dataDP = new DatagramPacket(dataBuf, dataBuf.length);
                this.rpcService.processIncomingRPCMessage(remoteSocketAddress2, new Input((InputStream)new SinglePacketInputStream(dbbuf, length)));
                continue;
            }
            msgID = messageID;
            MultiPacketInputStream mpis = this.incompleteInputStreams.get(msgID);
            if (mpis == null && (oldVal = this.incompleteInputStreams.putIfAbsent(msgID, mpis = new MultiPacketInputStream(numberOfPackets))) != null) {
                mpis = oldVal;
            }
            if ((packetIndex = RPCService.decodeInteger(NumberUtils.byteArrayToShort(dbbuf, length))) != (expectedIndex = mpis.addPacket(packetIndex, dataDP))) {
                remoteSocketAddress = (InetSocketAddress)dataDP.getSocketAddress();
                NumberUtils.integerToByteArray(messageID, ackBuf, 0);
                NumberUtils.shortToByteArray(RPCService.encodeInteger(expectedIndex - 1), ackBuf, 4);
                ackDP.setSocketAddress(remoteSocketAddress);
                try {
                    this.socket.send(ackDP);
                    continue;
                }
                catch (IOException ioe) {
                    if (this.shutdownRequested) {
                        return;
                    }
                    Log.error((String)"Shutting down receiver thread due to error: ", (Throwable)ioe);
                    return;
                }
            }
            if ((packetIndex - 1) % 10 == 0 || packetIndex == numberOfPackets - 1) {
                remoteSocketAddress = (InetSocketAddress)dataDP.getSocketAddress();
                NumberUtils.integerToByteArray(messageID, ackBuf, 0);
                NumberUtils.shortToByteArray(RPCService.encodeInteger(packetIndex), ackBuf, 4);
                ackDP.setSocketAddress(remoteSocketAddress);
                try {
                    this.socket.send(ackDP);
                }
                catch (IOException ioe) {
                    if (this.shutdownRequested) {
                        return;
                    }
                    Log.error((String)"Shutting down receiver thread due to error: ", (Throwable)ioe);
                    return;
                }
            }
            remoteSocketAddress = (InetSocketAddress)dataDP.getSocketAddress();
            dataBuf = new byte[1024];
            dataDP = new DatagramPacket(dataBuf, dataBuf.length);
            if (!mpis.isComplete()) continue;
            this.incompleteInputStreams.remove(msgID);
            this.rpcService.processIncomingRPCMessage(remoteSocketAddress, new Input((InputStream)mpis));
        }
    }

    void cleanUpStaleState() {
        long now = System.currentTimeMillis();
        Iterator<MultiPacketInputStream> it = this.incompleteInputStreams.values().iterator();
        while (it.hasNext()) {
            if (it.next().getCreationTime() + 10000L >= now) continue;
            it.remove();
        }
    }

    int send(DatagramPacket[] packets) throws IOException, InterruptedException {
        return this.send(packets, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    int send(DatagramPacket[] packets, boolean waitForAck) throws IOException, InterruptedException {
        if (packets.length == 0) {
            return 0;
        }
        OutstandingTransmission outstandingTransmission = new OutstandingTransmission();
        int messageID = NumberUtils.byteArrayToInteger(packets[0].getData(), packets[0].getLength() - 8 + 4);
        Integer msgID = messageID;
        if (!waitForAck) {
            int j = 0;
            while (true) {
                if (j >= packets.length) {
                    return 0;
                }
                this.socket.send(packets[j]);
                ++j;
            }
        }
        this.outstandingTransmissions.put(msgID, outstandingTransmission);
        int lastAckedPacket = -1;
        int retryCounter = 0;
        int nextPacketToSend = 0;
        boolean timeout = true;
        try {
            while (true) {
                if (timeout) {
                    int newNextPacketToSend = Math.min(packets.length, lastAckedPacket + 100 + 1);
                    for (int j = lastAckedPacket + 1; j < newNextPacketToSend; ++j) {
                        this.socket.send(packets[j]);
                    }
                    nextPacketToSend = newNextPacketToSend;
                } else {
                    int numberOfOutstandingPackets = nextPacketToSend - lastAckedPacket - 1;
                    int newNextPacketToSend = Math.min(packets.length, nextPacketToSend + 100 - numberOfOutstandingPackets);
                    for (int j = nextPacketToSend; j < newNextPacketToSend; ++j) {
                        this.socket.send(packets[j]);
                    }
                    nextPacketToSend = newNextPacketToSend;
                }
                OutstandingTransmission outstandingTransmission2 = outstandingTransmission;
                synchronized (outstandingTransmission2) {
                    lastAckedPacket = outstandingTransmission.lastAckedPacket;
                    if (lastAckedPacket == nextPacketToSend - 1) {
                        if (nextPacketToSend == packets.length) {
                            break;
                        } else {
                            timeout = false;
                            continue;
                        }
                    }
                    outstandingTransmission.wait(100L);
                    int newLastAcked = outstandingTransmission.lastAckedPacket;
                    if (lastAckedPacket == newLastAcked) {
                        timeout = true;
                        if (++retryCounter == 20) {
                            break;
                        } else {
                            continue;
                        }
                    }
                    timeout = false;
                    lastAckedPacket = newLastAcked;
                    if (lastAckedPacket == nextPacketToSend - 1) {
                        if (nextPacketToSend == packets.length) {
                            break;
                        } else {
                        }
                    }
                }
            }
        }
        finally {
            this.outstandingTransmissions.remove(msgID);
        }
        if (lastAckedPacket != packets.length - 1) {
            throw new IOException("Unable to send RPC request to " + packets[0].getSocketAddress());
        }
        return retryCounter;
    }

    void shutdown() throws InterruptedException {
        this.shutdownRequested = true;
        this.socket.close();
        NetworkThread.interrupted();
        this.join();
    }

    private static final class OutstandingTransmission {
        private int lastAckedPacket = -1;

        private OutstandingTransmission() {
        }
    }
}

