/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.broadcastvars;

import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
import eu.stratosphere.api.common.typeutils.TypeComparatorFactory;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.api.java.record.io.CsvInputFormat;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.nephele.io.DistributionPattern;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.pact.runtime.iterative.task.IterationHeadPactTask;
import eu.stratosphere.pact.runtime.iterative.task.IterationIntermediatePactTask;
import eu.stratosphere.pact.runtime.iterative.task.IterationTailPactTask;
import eu.stratosphere.pact.runtime.plugable.pactrecord.RecordComparatorFactory;
import eu.stratosphere.pact.runtime.plugable.pactrecord.RecordSerializerFactory;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.CollectorMapDriver;
import eu.stratosphere.pact.runtime.task.DriverStrategy;
import eu.stratosphere.pact.runtime.task.GroupReduceDriver;
import eu.stratosphere.pact.runtime.task.NoOpDriver;
import eu.stratosphere.pact.runtime.task.chaining.ChainedCollectorMapDriver;
import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.test.iterative.nephele.JobGraphUtils;
import eu.stratosphere.test.recordJobs.kmeans.KMeansBroadcast;
import eu.stratosphere.test.util.RecordAPITestBase;
import eu.stratosphere.types.DoubleValue;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.util.LogUtils;
import org.apache.log4j.Level;

public class KMeansIterativeNepheleITCase
extends RecordAPITestBase {
    private static final int ITERATION_ID = 42;
    private static final int MEMORY_PER_CONSUMER = 2;
    protected String dataPath;
    protected String clusterPath;
    protected String resultPath;

    public KMeansIterativeNepheleITCase() {
        LogUtils.initializeDefaultConsoleLogger((Level)Level.ERROR);
    }

    protected void preSubmit() throws Exception {
        this.dataPath = this.createTempFile("datapoints.txt", "0|50.90|16.20|72.08|\n1|73.65|61.76|62.89|\n2|61.73|49.95|92.74|\n3|1.60|70.11|16.32|\n4|2.43|19.81|89.56|\n5|67.99|9.00|14.48|\n6|87.80|84.49|55.83|\n7|90.26|42.99|53.29|\n8|51.36|6.16|9.35|\n9|12.43|9.52|12.54|\n10|80.01|8.78|29.74|\n11|92.76|2.93|80.07|\n12|46.32|100.00|22.98|\n13|34.11|45.61|58.60|\n14|68.82|16.36|96.60|\n15|81.47|76.45|28.40|\n16|65.55|40.21|43.43|\n17|84.22|88.56|13.31|\n18|36.99|68.36|57.12|\n19|28.87|37.69|91.04|\n20|31.56|13.22|86.00|\n21|18.49|34.45|54.52|\n22|13.33|94.02|92.07|\n23|91.19|81.62|55.06|\n24|85.78|39.02|25.58|\n25|94.41|47.07|78.23|\n26|90.62|10.43|80.20|\n27|31.52|85.81|39.79|\n28|24.65|77.98|26.35|\n29|69.34|75.79|63.96|\n30|22.56|78.61|66.66|\n31|91.74|83.82|73.92|\n32|76.64|89.53|44.66|\n33|36.02|73.01|92.32|\n34|87.86|18.94|10.74|\n35|91.94|34.61|5.20|\n36|12.52|47.01|95.29|\n37|44.01|26.19|78.50|\n38|26.20|73.36|10.08|\n39|15.21|17.37|54.33|\n40|27.96|94.81|44.41|\n41|26.44|44.81|70.88|\n42|53.29|26.69|2.40|\n43|23.94|11.50|1.71|\n44|19.00|25.48|50.80|\n45|82.26|1.88|58.08|\n46|47.56|82.54|82.73|\n47|51.54|35.10|32.95|\n48|86.71|55.51|19.08|\n49|54.16|23.68|32.41|\n50|71.81|32.83|46.66|\n51|20.70|14.19|64.96|\n52|57.17|88.56|55.23|\n53|91.39|49.38|70.55|\n54|47.90|62.07|76.03|\n55|55.70|37.77|30.15|\n56|87.87|74.62|25.95|\n57|95.70|45.04|15.27|\n58|41.61|89.37|24.45|\n59|82.19|20.84|11.13|\n60|49.88|2.62|18.62|\n61|16.42|53.30|74.13|\n62|38.37|72.62|35.16|\n63|43.26|49.59|92.56|\n64|28.96|2.36|78.49|\n65|88.41|91.43|92.55|\n66|98.61|79.58|33.03|\n67|4.94|18.65|30.78|\n68|75.89|79.30|63.90|\n69|93.18|76.26|9.50|\n70|73.43|70.50|76.49|\n71|78.64|90.87|34.49|\n72|58.47|63.07|8.82|\n73|69.74|54.36|64.43|\n74|38.47|36.60|33.39|\n75|51.07|14.75|2.54|\n76|24.18|16.85|15.00|\n77|7.56|50.72|93.45|\n78|64.28|97.01|57.31|\n79|85.30|24.13|76.57|\n80|72.78|30.78|13.11|\n81|18.42|17.45|32.20|\n82|87.44|74.98|87.90|\n83|38.30|17.77|37.33|\n84|63.62|7.90|34.23|\n85|8.84|67.87|30.65|\n86|76.12|51.83|80.12|\n87|32.30|74.79|4.39|\n88|41.73|45.34|18.66|\n89|58.13|18.43|83.38|\n90|98.10|33.46|83.07|\n91|17.76|4.10|88.51|\n92|60.58|18.15|59.96|\n93|50.11|33.25|85.64|\n94|97.74|60.93|38.97|\n95|76.31|52.50|95.43|\n96|7.71|85.85|36.26|\n97|9.32|72.21|42.17|\n98|71.29|51.88|57.62|\n99|31.39|7.27|88.74|");
        this.clusterPath = this.createTempFile("initial_centers.txt", "0|1.96|65.04|20.82|\n1|53.99|84.23|81.59|\n2|97.28|74.50|40.32|\n3|63.57|24.53|87.07|\n4|28.10|43.27|86.53|\n5|99.51|62.70|64.48|\n6|30.31|30.36|80.46|");
        this.resultPath = this.getTempDirPath("result");
    }

    protected void postSubmit() throws Exception {
        this.compareResultsByLinesInMemory("0|38.3|54.5|19.3|\n1|32.1|83.0|50.4|\n2|87.5|56.6|20.3|\n3|75.4|18.6|67.5|\n4|24.9|29.2|77.6|\n5|78.7|66.1|70.8|\n6|39.5|14.0|18.7|\n", this.resultPath);
    }

    protected JobGraph getJobGraph() throws Exception {
        return KMeansIterativeNepheleITCase.createJobGraph(this.dataPath, this.clusterPath, this.resultPath, 4, 20);
    }

    private static JobInputVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
        CsvInputFormat pointsInFormat = new CsvInputFormat('|', new Class[]{IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class});
        JobInputVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "[Points]", jobGraph, numSubTasks, numSubTasks);
        TaskConfig taskConfig = new TaskConfig(pointsInput.getConfiguration());
        taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig.setOutputSerializer(serializer);
        TaskConfig chainedMapper = new TaskConfig(new Configuration());
        chainedMapper.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        chainedMapper.setStubWrapper((UserCodeWrapper)new UserCodeObjectWrapper((Object)new KMeansBroadcast.PointBuilder()));
        chainedMapper.addOutputShipStrategy(ShipStrategyType.FORWARD);
        chainedMapper.setOutputSerializer(serializer);
        taskConfig.addChainedTask(ChainedCollectorMapDriver.class, chainedMapper, "Build points");
        return pointsInput;
    }

    private static JobInputVertex createCentersInput(JobGraph jobGraph, String centersPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
        CsvInputFormat modelsInFormat = new CsvInputFormat('|', new Class[]{IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class});
        JobInputVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, centersPath, "[Models]", jobGraph, numSubTasks, numSubTasks);
        TaskConfig taskConfig = new TaskConfig(modelsInput.getConfiguration());
        taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig.setOutputSerializer(serializer);
        TaskConfig chainedMapper = new TaskConfig(new Configuration());
        chainedMapper.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        chainedMapper.setStubWrapper((UserCodeWrapper)new UserCodeObjectWrapper((Object)new KMeansBroadcast.PointBuilder()));
        chainedMapper.addOutputShipStrategy(ShipStrategyType.FORWARD);
        chainedMapper.setOutputSerializer(serializer);
        taskConfig.addChainedTask(ChainedCollectorMapDriver.class, chainedMapper, "Build centers");
        return modelsInput;
    }

    private static JobOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
        JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks, numSubTasks);
        TaskConfig taskConfig = new TaskConfig(output.getConfiguration());
        taskConfig.addInputToGroup(0);
        taskConfig.setInputSerializer(serializer, 0);
        KMeansBroadcast.PointOutFormat outFormat = new KMeansBroadcast.PointOutFormat();
        outFormat.setOutputFilePath(new Path(resultPath));
        taskConfig.setStubWrapper((UserCodeWrapper)new UserCodeObjectWrapper((Object)outFormat));
        return output;
    }

    private static JobTaskVertex createIterationHead(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> serializer) {
        JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "Iteration Head", jobGraph, numSubTasks, numSubTasks);
        TaskConfig headConfig = new TaskConfig(head.getConfiguration());
        headConfig.setIterationId(42);
        headConfig.addInputToGroup(0);
        headConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
        headConfig.setInputSerializer(serializer, 0);
        headConfig.setBackChannelMemory(0x200000L);
        headConfig.setOutputSerializer(serializer);
        headConfig.addOutputShipStrategy(ShipStrategyType.BROADCAST);
        TaskConfig headFinalOutConfig = new TaskConfig(new Configuration());
        headFinalOutConfig.setOutputSerializer(serializer);
        headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig);
        headConfig.setIterationHeadIndexOfSyncOutput(2);
        headConfig.setDriver(NoOpDriver.class);
        headConfig.setDriverStrategy(DriverStrategy.UNARY_NO_OP);
        return head;
    }

    private static JobTaskVertex createMapper(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> inputSerializer, TypeSerializerFactory<?> broadcastVarSerializer, TypeSerializerFactory<?> outputSerializer, TypeComparatorFactory<?> outputComparator) {
        JobTaskVertex mapper = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "Map (Select nearest center)", jobGraph, numSubTasks, numSubTasks);
        TaskConfig intermediateConfig = new TaskConfig(mapper.getConfiguration());
        intermediateConfig.setIterationId(42);
        intermediateConfig.setDriver(CollectorMapDriver.class);
        intermediateConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        intermediateConfig.addInputToGroup(0);
        intermediateConfig.setInputSerializer(inputSerializer, 0);
        intermediateConfig.setOutputSerializer(outputSerializer);
        intermediateConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        intermediateConfig.setOutputComparator(outputComparator, 0);
        intermediateConfig.setBroadcastInputName("centers", 0);
        intermediateConfig.addBroadcastInputToGroup(0);
        intermediateConfig.setBroadcastInputSerializer(broadcastVarSerializer, 0);
        intermediateConfig.setStubWrapper((UserCodeWrapper)new UserCodeObjectWrapper((Object)new KMeansBroadcast.SelectNearestCenter()));
        return mapper;
    }

    private static JobTaskVertex createReducer(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> inputSerializer, TypeComparatorFactory<?> inputComparator, TypeSerializerFactory<?> outputSerializer) {
        JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "Reduce / Iteration Tail", jobGraph, numSubTasks, numSubTasks);
        TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
        tailConfig.setIterationId(42);
        tailConfig.setIsWorksetUpdate();
        tailConfig.setDriver(GroupReduceDriver.class);
        tailConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        tailConfig.addInputToGroup(0);
        tailConfig.setInputSerializer(inputSerializer, 0);
        tailConfig.setDriverComparator(inputComparator, 0);
        tailConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
        tailConfig.setInputComparator(inputComparator, 0);
        tailConfig.setMemoryInput(0, 0x200000L);
        tailConfig.setFilehandlesInput(0, 128);
        tailConfig.setSpillingThresholdInput(0, 0.9f);
        tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        tailConfig.setOutputSerializer(outputSerializer);
        tailConfig.setStubWrapper((UserCodeWrapper)new UserCodeObjectWrapper((Object)new KMeansBroadcast.RecomputeClusterCenter()));
        return tail;
    }

    private static JobOutputVertex createSync(JobGraph jobGraph, int numIterations, int dop) {
        JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, dop);
        TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
        syncConfig.setNumberOfIterations(numIterations);
        syncConfig.setIterationId(42);
        return sync;
    }

    private static JobGraph createJobGraph(String pointsPath, String centersPath, String resultPath, int numSubTasks, int numIterations) throws JobGraphDefinitionException {
        RecordSerializerFactory serializer = RecordSerializerFactory.get();
        RecordComparatorFactory int0Comparator = new RecordComparatorFactory(new int[]{0}, new Class[]{IntValue.class});
        JobGraph jobGraph = new JobGraph("KMeans Iterative");
        JobInputVertex points = KMeansIterativeNepheleITCase.createPointsInput(jobGraph, pointsPath, numSubTasks, serializer);
        JobInputVertex centers = KMeansIterativeNepheleITCase.createCentersInput(jobGraph, centersPath, numSubTasks, serializer);
        JobTaskVertex head = KMeansIterativeNepheleITCase.createIterationHead(jobGraph, numSubTasks, serializer);
        JobTaskVertex mapper = KMeansIterativeNepheleITCase.createMapper(jobGraph, numSubTasks, serializer, serializer, serializer, int0Comparator);
        JobTaskVertex reducer = KMeansIterativeNepheleITCase.createReducer(jobGraph, numSubTasks, serializer, int0Comparator, serializer);
        JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks, numSubTasks);
        JobOutputVertex sync = KMeansIterativeNepheleITCase.createSync(jobGraph, numIterations, numSubTasks);
        JobOutputVertex output = KMeansIterativeNepheleITCase.createOutput(jobGraph, resultPath, numSubTasks, serializer);
        JobGraphUtils.connect((AbstractJobVertex)points, (AbstractJobVertex)mapper, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        JobGraphUtils.connect((AbstractJobVertex)centers, (AbstractJobVertex)head, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)mapper, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        new TaskConfig(mapper.getConfiguration()).setBroadcastGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
        new TaskConfig(mapper.getConfiguration()).setInputCached(0, true);
        new TaskConfig(mapper.getConfiguration()).setInputMaterializationMemory(0, 0x200000L);
        JobGraphUtils.connect((AbstractJobVertex)mapper, (AbstractJobVertex)reducer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        new TaskConfig(reducer.getConfiguration()).setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
        JobGraphUtils.connect((AbstractJobVertex)reducer, (AbstractJobVertex)fakeTailOutput, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)output, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)sync, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        points.setVertexToShareInstancesWith((AbstractJobVertex)output);
        centers.setVertexToShareInstancesWith((AbstractJobVertex)output);
        head.setVertexToShareInstancesWith((AbstractJobVertex)output);
        mapper.setVertexToShareInstancesWith((AbstractJobVertex)output);
        reducer.setVertexToShareInstancesWith((AbstractJobVertex)output);
        fakeTailOutput.setVertexToShareInstancesWith((AbstractJobVertex)output);
        sync.setVertexToShareInstancesWith((AbstractJobVertex)output);
        return jobGraph;
    }
}

