/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.iterative;

import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.DeltaIteration;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.JoinFunction;
import eu.stratosphere.api.java.functions.KeySelector;
import eu.stratosphere.api.java.operators.DataSource;
import eu.stratosphere.api.java.operators.FlatMapOperator;
import eu.stratosphere.api.java.operators.JoinOperator;
import eu.stratosphere.api.java.operators.ReduceGroupOperator;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.test.util.JavaProgramTestBase;
import eu.stratosphere.util.Collector;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class DependencyConnectedComponentsITCase
extends JavaProgramTestBase {
    private static final int MAX_ITERATIONS = 20;
    private static final int DOP = 1;
    protected static List<Tuple2<Long, Long>> verticesInput = new ArrayList<Tuple2<Long, Long>>();
    protected static List<Tuple2<Long, Long>> edgesInput = new ArrayList<Tuple2<Long, Long>>();
    private String resultPath;
    private String expectedResult;

    protected void preSubmit() throws Exception {
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)1L, (Object)1L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)2L, (Object)2L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)3L, (Object)3L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)4L, (Object)4L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)5L, (Object)5L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)6L, (Object)6L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)7L, (Object)7L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)8L, (Object)8L));
        verticesInput.add((Tuple2<Long, Long>)new Tuple2((Object)9L, (Object)9L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)1L, (Object)2L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)1L, (Object)3L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)2L, (Object)3L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)2L, (Object)4L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)2L, (Object)1L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)3L, (Object)1L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)3L, (Object)2L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)4L, (Object)2L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)4L, (Object)6L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)5L, (Object)6L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)6L, (Object)4L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)6L, (Object)5L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)7L, (Object)8L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)7L, (Object)9L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)8L, (Object)7L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)8L, (Object)9L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)9L, (Object)7L));
        edgesInput.add((Tuple2<Long, Long>)new Tuple2((Object)9L, (Object)8L));
        this.resultPath = this.getTempDirPath("result");
        this.expectedResult = "(1, 1)\n(2, 1)\n(3, 1)\n(4, 1)\n(5, 1)\n(6, 1)\n(7, 7)\n(8, 7)\n(9, 7)\n";
    }

    protected void testProgram() throws Exception {
        DependencyConnectedComponentsProgram.runProgram(this.resultPath);
    }

    protected void postSubmit() throws Exception {
        this.compareResultsByLinesInMemory(this.expectedResult, this.resultPath);
    }

    public static final class MinimumIdFilter
    extends FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;

        public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> vertexWithNewAndOldId, Collector<Tuple2<Long, Long>> out) throws Exception {
            if ((Long)((Tuple2)vertexWithNewAndOldId.f0).f1 < (Long)((Tuple2)vertexWithNewAndOldId.f1).f1) {
                out.collect(vertexWithNewAndOldId.f0);
            }
        }
    }

    public static final class MinimumReduce
    extends GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;
        final Tuple2<Long, Long> resultVertex = new Tuple2();

        public void reduce(Iterator<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
            Tuple2<Long, Long> first = values.next();
            Long vertexId = (Long)first.f0;
            Long minimumCompId = (Long)first.f1;
            while (values.hasNext()) {
                Long candidateCompId = (Long)values.next().f1;
                if (candidateCompId >= minimumCompId) continue;
                minimumCompId = candidateCompId;
            }
            this.resultVertex.setField((Object)vertexId, 0);
            this.resultVertex.setField((Object)minimumCompId, 1);
            out.collect(this.resultVertex);
        }
    }

    public static final class NeighborWithComponentIDJoin
    extends JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;

        public Tuple2<Long, Long> join(Tuple2<Long, Long> edge, Tuple2<Long, Long> vertexWithCompId) throws Exception {
            vertexWithCompId.setField(edge.f1, 0);
            return vertexWithCompId;
        }
    }

    public static final class FindCandidatesDependenciesJoin
    extends JoinFunction<Long, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1L;

        public Tuple2<Long, Long> join(Long candidateId, Tuple2<Long, Long> edge) throws Exception {
            return edge;
        }
    }

    public static final class RemoveDuplicatesReduce
    extends GroupReduceFunction<Long, Long> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterator<Long> values, Collector<Long> out) throws Exception {
            out.collect((Object)values.next());
        }
    }

    public static final class FindCandidatesJoin
    extends JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long> {
        private static final long serialVersionUID = 1L;

        public Long join(Tuple2<Long, Long> vertexWithCompId, Tuple2<Long, Long> edge) throws Exception {
            return (Long)edge.f1;
        }
    }

    private static class DependencyConnectedComponentsProgram {
        private DependencyConnectedComponentsProgram() {
        }

        public static String runProgram(String resultPath) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setDegreeOfParallelism(1);
            DataSource initialSolutionSet = env.fromCollection(verticesInput);
            DataSource edges = env.fromCollection(edgesInput);
            int keyPosition = 0;
            DeltaIteration iteration = initialSolutionSet.iterateDelta((DataSet)initialSolutionSet, 20, new int[]{keyPosition});
            ReduceGroupOperator candidates = iteration.getWorkset().join((DataSet)edges).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new FindCandidatesJoin()).groupBy((KeySelector)new KeySelector<Long, Long>(){

                public Long getKey(Long id) {
                    return id;
                }
            }).reduceGroup((GroupReduceFunction)new RemoveDuplicatesReduce());
            JoinOperator.EquiJoin candidatesDependencies = candidates.join((DataSet)edges).where((KeySelector)new KeySelector<Long, Long>(){

                public Long getKey(Long id) {
                    return id;
                }
            }).equalTo((KeySelector)new KeySelector<Tuple2<Long, Long>, Long>(){

                public Long getKey(Tuple2<Long, Long> vertexWithId) {
                    return (Long)vertexWithId.f1;
                }
            }).with((JoinFunction)new FindCandidatesDependenciesJoin());
            ReduceGroupOperator verticesWithNewComponents = candidatesDependencies.join((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new NeighborWithComponentIDJoin()).groupBy(new int[]{0}).reduceGroup((GroupReduceFunction)new MinimumReduce());
            FlatMapOperator updatedComponentId = verticesWithNewComponents.join((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).flatMap((FlatMapFunction)new MinimumIdFilter());
            iteration.closeWith((DataSet)updatedComponentId, (DataSet)updatedComponentId).writeAsText(resultPath);
            env.execute();
            return resultPath;
        }
    }
}

