/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.recordJobs.relational;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.Program;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.io.FileOutputFormat;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.record.functions.FunctionAnnotation;
import eu.stratosphere.api.java.record.functions.JoinFunction;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.record.io.CsvInputFormat;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.api.java.record.operators.JoinOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.util.Collector;
import java.util.Iterator;

public class MergeOnlyJoin
implements Program {
    private static final long serialVersionUID = 1L;

    public Plan getPlan(String ... args) {
        int numSubtasks = args.length > 0 ? Integer.parseInt(args[0]) : 1;
        String input1Path = args.length > 1 ? args[1] : "";
        String input2Path = args.length > 2 ? args[2] : "";
        String output = args.length > 3 ? args[3] : "";
        int numSubtasksInput2 = args.length > 4 ? Integer.parseInt(args[4]) : 1;
        CsvInputFormat format1 = new CsvInputFormat('|', new Class[]{IntValue.class, IntValue.class});
        FileDataSource input1 = new FileDataSource((FileInputFormat)format1, input1Path, "Input 1");
        ReduceOperator aggInput1 = ReduceOperator.builder(DummyReduce.class, IntValue.class, (int)0).input(new Operator[]{input1}).name("AggOrders").build();
        CsvInputFormat format2 = new CsvInputFormat('|', new Class[]{IntValue.class, IntValue.class});
        FileDataSource input2 = new FileDataSource((FileInputFormat)format2, input2Path, "Input 2");
        input2.setDegreeOfParallelism(numSubtasksInput2);
        ReduceOperator aggInput2 = ReduceOperator.builder(DummyReduce.class, IntValue.class, (int)0).input(new Operator[]{input2}).name("AggLines").build();
        aggInput2.setDegreeOfParallelism(numSubtasksInput2);
        JoinOperator joinLiO = JoinOperator.builder(JoinInputs.class, IntValue.class, (int)0, (int)0).input1(new Operator[]{aggInput1}).input2(new Operator[]{aggInput2}).name("JoinLiO").build();
        FileDataSink result = new FileDataSink((FileOutputFormat)new CsvOutputFormat(), output, (Operator)joinLiO, "Output");
        ((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)CsvOutputFormat.configureRecordFormat((FileDataSink)result).recordDelimiter('\n')).fieldDelimiter('|')).lenient(true)).field(IntValue.class, 0)).field(IntValue.class, 1)).field(IntValue.class, 2);
        Plan plan = new Plan((GenericDataSink)result, "Merge Only Join");
        plan.setDefaultParallelism(numSubtasks);
        return plan;
    }

    @FunctionAnnotation.ConstantFieldsExcept(value={})
    public static class DummyReduce
    extends ReduceFunction {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterator<Record> values, Collector<Record> out) {
            while (values.hasNext()) {
                out.collect((Object)values.next());
            }
        }
    }

    @FunctionAnnotation.ConstantFieldsFirstExcept(value={2})
    public static class JoinInputs
    extends JoinFunction {
        private static final long serialVersionUID = 1L;

        public void join(Record input1, Record input2, Collector<Record> out) {
            input1.setField(2, input2.getField(1, IntValue.class));
            out.collect((Object)input1);
        }
    }
}

