/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.runtime;

import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.DistributionPattern;
import eu.stratosphere.nephele.jobgraph.JobGenericInputVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.nephele.template.AbstractGenericInputTask;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.runtime.io.api.RecordReader;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.test.util.RecordAPITestBase;
import eu.stratosphere.util.LogUtils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class NetworkStackThroughput
extends RecordAPITestBase {
    private static final Log LOG = LogFactory.getLog(NetworkStackThroughput.class);
    private static final String DATA_VOLUME_GB_CONFIG_KEY = "data.volume.gb";
    private static final String USE_FORWARDER_CONFIG_KEY = "use.forwarder";
    private static final String NUM_SUBTASKS_CONFIG_KEY = "num.subtasks";
    private static final String NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY = "num.subtasks.instance";
    private static final String IS_SLOW_SENDER_CONFIG_KEY = "is.slow.sender";
    private static final String IS_SLOW_RECEIVER_CONFIG_KEY = "is.slow.receiver";
    private static final int IS_SLOW_SLEEP_MS = 10;
    private static final int IS_SLOW_EVERY_NUM_RECORDS = 512;

    public NetworkStackThroughput(Configuration config) {
        super(config);
        this.setNumTaskManager(2);
        LogUtils.initializeDefaultConsoleLogger();
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() {
        Object[][] configParams = new Object[][]{{1, false, false, false, 4, 2}, {1, true, false, false, 4, 2}, {1, true, true, false, 4, 2}, {1, true, false, true, 4, 2}, {2, true, false, false, 4, 2}, {4, true, false, false, 4, 2}, {4, true, false, false, 8, 4}, {4, true, false, false, 16, 8}};
        ArrayList<Configuration> configs = new ArrayList<Configuration>(configParams.length);
        for (Object[] p : configParams) {
            Configuration config = new Configuration();
            config.setInteger(DATA_VOLUME_GB_CONFIG_KEY, ((Integer)p[0]).intValue());
            config.setBoolean(USE_FORWARDER_CONFIG_KEY, ((Boolean)p[1]).booleanValue());
            config.setBoolean(IS_SLOW_SENDER_CONFIG_KEY, ((Boolean)p[2]).booleanValue());
            config.setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, ((Boolean)p[3]).booleanValue());
            config.setInteger(NUM_SUBTASKS_CONFIG_KEY, ((Integer)p[4]).intValue());
            config.setInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY, ((Integer)p[5]).intValue());
            configs.add(config);
        }
        return NetworkStackThroughput.toParameterList(configs);
    }

    protected JobGraph getJobGraph() throws Exception {
        int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
        boolean useForwarder = this.config.getBoolean(USE_FORWARDER_CONFIG_KEY, true);
        boolean isSlowSender = this.config.getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false);
        boolean isSlowReceiver = this.config.getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false);
        int numSubtasks = this.config.getInteger(NUM_SUBTASKS_CONFIG_KEY, 1);
        int numSubtasksPerInstance = this.config.getInteger(NUM_SUBTASKS_PER_INSTANCE_CONFIG_KEY, 1);
        return this.createJobGraph(dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, numSubtasks, numSubtasksPerInstance);
    }

    @After
    public void calculateThroughput() {
        if (this.getJobExecutionResult() != null) {
            int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
            double dataVolumeMbit = (double)dataVolumeGb * 8192.0;
            double runtimeSecs = (double)this.getJobExecutionResult().getNetRuntime() / 1000.0;
            int mbitPerSecond = (int)Math.round(dataVolumeMbit / runtimeSecs);
            LOG.info((Object)String.format("Test finished with throughput of %d MBit/s (runtime [secs]: %.2f, data volume [mbits]: %.2f)", mbitPerSecond, runtimeSecs, dataVolumeMbit));
        }
    }

    private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender, boolean isSlowReceiver, int numSubtasks, int numSubtasksPerInstance) throws JobGraphDefinitionException {
        JobGraph jobGraph = new JobGraph("Speed Test");
        JobGenericInputVertex producer = new JobGenericInputVertex("Speed Test Producer", jobGraph);
        producer.setInputClass(SpeedTestProducer.class);
        producer.setNumberOfSubtasks(numSubtasks);
        producer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
        producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb);
        producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender);
        JobTaskVertex forwarder = null;
        if (useForwarder) {
            forwarder = new JobTaskVertex("Speed Test Forwarder", jobGraph);
            forwarder.setTaskClass(SpeedTestForwarder.class);
            forwarder.setNumberOfSubtasks(numSubtasks);
            forwarder.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
        }
        JobOutputVertex consumer = new JobOutputVertex("Speed Test Consumer", jobGraph);
        consumer.setOutputClass(SpeedTestConsumer.class);
        consumer.setNumberOfSubtasks(numSubtasks);
        consumer.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
        consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver);
        if (useForwarder) {
            producer.connectTo((AbstractJobVertex)forwarder, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
            forwarder.connectTo((AbstractJobVertex)consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
            forwarder.setVertexToShareInstancesWith((AbstractJobVertex)producer);
            consumer.setVertexToShareInstancesWith((AbstractJobVertex)producer);
        } else {
            producer.connectTo((AbstractJobVertex)consumer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
            producer.setVertexToShareInstancesWith((AbstractJobVertex)consumer);
        }
        return jobGraph;
    }

    public static class SpeedTestRecord
    implements IOReadableWritable {
        private static final int RECORD_SIZE = 128;
        private final byte[] buf = new byte[128];

        public SpeedTestRecord() {
            for (int i = 0; i < 128; ++i) {
                this.buf[i] = (byte)(i % 128);
            }
        }

        public void write(DataOutput out) throws IOException {
            out.write(this.buf);
        }

        public void read(DataInput in) throws IOException {
            in.readFully(this.buf);
        }
    }

    public static class SpeedTestConsumer
    extends AbstractOutputTask {
        private RecordReader<SpeedTestRecord> reader;

        public void registerInputOutput() {
            this.reader = new RecordReader((AbstractOutputTask)this, SpeedTestRecord.class);
        }

        public void invoke() throws Exception {
            boolean isSlow = this.getTaskConfiguration().getBoolean(NetworkStackThroughput.IS_SLOW_RECEIVER_CONFIG_KEY, false);
            int numRecords = 0;
            while (this.reader.next() != null) {
                if (!isSlow || numRecords++ % 512 != 0) continue;
                Thread.sleep(10L);
            }
        }
    }

    public static class SpeedTestForwarder
    extends AbstractTask {
        private RecordReader<SpeedTestRecord> reader;
        private RecordWriter<SpeedTestRecord> writer;

        public void registerInputOutput() {
            this.reader = new RecordReader((AbstractTask)this, SpeedTestRecord.class);
            this.writer = new RecordWriter((AbstractTask)this);
        }

        public void invoke() throws Exception {
            SpeedTestRecord record;
            this.writer.initializeSerializers();
            while ((record = (SpeedTestRecord)this.reader.next()) != null) {
                this.writer.emit((IOReadableWritable)record);
            }
            this.writer.flush();
        }
    }

    public static class SpeedTestProducer
    extends AbstractGenericInputTask {
        private RecordWriter<SpeedTestRecord> writer;

        public void registerInputOutput() {
            this.writer = new RecordWriter((AbstractInputTask)this);
        }

        public void invoke() throws Exception {
            this.writer.initializeSerializers();
            int dataVolumeGb = this.getTaskConfiguration().getInteger(NetworkStackThroughput.DATA_VOLUME_GB_CONFIG_KEY, 1);
            long dataMbPerSubtask = dataVolumeGb * 1024 / this.getCurrentNumberOfSubtasks();
            long numRecordsToEmit = dataMbPerSubtask * 1024L * 1024L / 128L;
            LOG.info((Object)String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f GB)", this.getIndexInSubtaskGroup() + 1, this.getCurrentNumberOfSubtasks(), numRecordsToEmit, 128, (double)dataMbPerSubtask / 1024.0));
            boolean isSlow = this.getTaskConfiguration().getBoolean(NetworkStackThroughput.IS_SLOW_SENDER_CONFIG_KEY, false);
            int numRecords = 0;
            SpeedTestRecord record = new SpeedTestRecord();
            for (long i = 0L; i < numRecordsToEmit; ++i) {
                if (isSlow && numRecords++ % 512 == 0) {
                    Thread.sleep(10L);
                }
                this.writer.emit((IOReadableWritable)record);
            }
            this.writer.flush();
        }
    }
}

