/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.iterative.nephele;

import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.io.InputFormat;
import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.client.JobExecutionException;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.DistributionPattern;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.pact.runtime.iterative.io.FakeOutputTask;
import eu.stratosphere.pact.runtime.iterative.task.IterationSynchronizationSinkTask;
import eu.stratosphere.pact.runtime.task.DataSinkTask;
import eu.stratosphere.pact.runtime.task.DataSourceTask;
import eu.stratosphere.pact.runtime.task.RegularPactTask;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.runtime.io.channels.ChannelType;
import java.io.IOException;

public class JobGraphUtils {
    public static final long MEGABYTE = 0x100000L;

    private JobGraphUtils() {
    }

    public static void submit(JobGraph graph, Configuration nepheleConfig) throws IOException, JobExecutionException {
        JobClient client = new JobClient(graph, nepheleConfig);
        client.submitJobAndWait();
    }

    public static <T extends FileInputFormat<?>> JobInputVertex createInput(T stub, String path, String name, JobGraph graph, int degreeOfParallelism, int numSubTasksPerInstance) {
        stub.setFilePath(path);
        return JobGraphUtils.createInput(new UserCodeObjectWrapper(stub), name, graph, degreeOfParallelism, numSubTasksPerInstance);
    }

    private static <T extends InputFormat<?, ?>> JobInputVertex createInput(UserCodeWrapper<T> stub, String name, JobGraph graph, int degreeOfParallelism, int numSubTasksPerInstance) {
        JobInputVertex inputVertex = new JobInputVertex(name, graph);
        Class<DataSourceTask> clazz = DataSourceTask.class;
        inputVertex.setInputClass(clazz);
        inputVertex.setNumberOfSubtasks(degreeOfParallelism);
        inputVertex.setNumberOfSubtasksPerInstance(numSubTasksPerInstance);
        TaskConfig inputConfig = new TaskConfig(inputVertex.getConfiguration());
        inputConfig.setStubWrapper(stub);
        return inputVertex;
    }

    public static void connect(AbstractJobVertex source, AbstractJobVertex target, ChannelType channelType, DistributionPattern distributionPattern) throws JobGraphDefinitionException {
        source.connectTo(target, channelType, distributionPattern);
    }

    public static JobTaskVertex createTask(Class<? extends RegularPactTask> task, String name, JobGraph graph, int degreeOfParallelism, int numSubtasksPerInstance) {
        JobTaskVertex taskVertex = new JobTaskVertex(name, graph);
        taskVertex.setTaskClass(task);
        taskVertex.setNumberOfSubtasks(degreeOfParallelism);
        taskVertex.setNumberOfSubtasksPerInstance(numSubtasksPerInstance);
        return taskVertex;
    }

    public static JobOutputVertex createSync(JobGraph jobGraph, int degreeOfParallelism) {
        JobOutputVertex sync = new JobOutputVertex("BulkIterationSync", jobGraph);
        sync.setOutputClass(IterationSynchronizationSinkTask.class);
        sync.setNumberOfSubtasks(1);
        TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
        syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, degreeOfParallelism);
        return sync;
    }

    public static JobOutputVertex createFakeOutput(JobGraph jobGraph, String name, int degreeOfParallelism, int numSubTasksPerInstance) {
        JobOutputVertex outputVertex = new JobOutputVertex(name, jobGraph);
        outputVertex.setOutputClass(FakeOutputTask.class);
        outputVertex.setNumberOfSubtasks(degreeOfParallelism);
        outputVertex.setNumberOfSubtasksPerInstance(numSubTasksPerInstance);
        return outputVertex;
    }

    public static JobOutputVertex createFileOutput(JobGraph jobGraph, String name, int degreeOfParallelism, int numSubTasksPerInstance) {
        JobOutputVertex sinkVertex = new JobOutputVertex(name, jobGraph);
        sinkVertex.setOutputClass(DataSinkTask.class);
        sinkVertex.setNumberOfSubtasks(degreeOfParallelism);
        sinkVertex.setNumberOfSubtasksPerInstance(numSubTasksPerInstance);
        return sinkVertex;
    }
}

