/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.recordJobs.relational;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.Program;
import eu.stratosphere.api.common.ProgramDescription;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.record.functions.JoinFunction;
import eu.stratosphere.api.java.record.functions.MapFunction;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.record.io.FileOutputFormat;
import eu.stratosphere.api.java.record.operators.JoinOperator;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.test.recordJobs.util.IntTupleDataInFormat;
import eu.stratosphere.test.recordJobs.util.Tuple;
import eu.stratosphere.types.DoubleValue;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;
import java.io.IOException;
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.util.Iterator;

public class TPCHQuery10
implements Program,
ProgramDescription {
    public String getDescription() {
        return "TPC-H Query 10";
    }

    public Plan getPlan(String ... args) throws IllegalArgumentException {
        if (args.length < 6) {
            throw new IllegalArgumentException("Invalid number of parameters");
        }
        int degreeOfParallelism = Integer.parseInt(args[0]);
        String ordersPath = args[1];
        String lineitemsPath = args[2];
        String customersPath = args[3];
        String nationsPath = args[4];
        String resultPath = args[5];
        FileDataSource orders = new FileDataSource((FileInputFormat)new IntTupleDataInFormat(), ordersPath, "Orders");
        FileDataSource lineitems = new FileDataSource((FileInputFormat)new IntTupleDataInFormat(), lineitemsPath, "LineItems");
        FileDataSource customers = new FileDataSource((FileInputFormat)new IntTupleDataInFormat(), customersPath, "Customers");
        FileDataSource nations = new FileDataSource((FileInputFormat)new IntTupleDataInFormat(), nationsPath, "Nations");
        MapOperator mapO = MapOperator.builder(FilterO.class).name("FilterO").build();
        MapOperator mapLi = MapOperator.builder(FilterLI.class).name("FilterLi").build();
        MapOperator projectC = MapOperator.builder(ProjectC.class).name("ProjectC").build();
        MapOperator projectN = MapOperator.builder(ProjectN.class).name("ProjectN").build();
        JoinOperator joinOL = JoinOperator.builder(JoinOL.class, IntValue.class, (int)0, (int)0).name("JoinOL").build();
        JoinOperator joinCOL = JoinOperator.builder(JoinCOL.class, IntValue.class, (int)0, (int)0).name("JoinCOL").build();
        JoinOperator joinNCOL = JoinOperator.builder(JoinNCOL.class, IntValue.class, (int)4, (int)0).name("JoinNCOL").build();
        ReduceOperator reduce = ReduceOperator.builder(Sum.class).keyField(IntValue.class, 0).keyField(StringValue.class, 1).keyField(StringValue.class, 3).keyField(StringValue.class, 4).keyField(StringValue.class, 5).keyField(StringValue.class, 6).keyField(StringValue.class, 7).name("Reduce").build();
        FileDataSink result = new FileDataSink((eu.stratosphere.api.common.io.FileOutputFormat)new TupleOutputFormat(), resultPath, "Output");
        result.setInput((Operator)reduce);
        reduce.setInput((Operator)joinNCOL);
        joinNCOL.setFirstInput((Operator)joinCOL);
        joinNCOL.setSecondInput((Operator)projectN);
        joinCOL.setFirstInput((Operator)projectC);
        joinCOL.setSecondInput((Operator)joinOL);
        joinOL.setFirstInput((Operator)mapO);
        joinOL.setSecondInput((Operator)mapLi);
        projectC.setInput((Operator)customers);
        projectN.setInput((Operator)nations);
        mapLi.setInput((Operator)lineitems);
        mapO.setInput((Operator)orders);
        Plan p = new Plan((GenericDataSink)result, "TPCH Q10");
        p.setDefaultParallelism(degreeOfParallelism);
        return p;
    }

    public static class TupleOutputFormat
    extends FileOutputFormat {
        private static final long serialVersionUID = 1L;
        private final DecimalFormat formatter;
        private final StringBuilder buffer = new StringBuilder();

        public TupleOutputFormat() {
            DecimalFormatSymbols decimalFormatSymbol = new DecimalFormatSymbols();
            decimalFormatSymbol.setDecimalSeparator('.');
            this.formatter = new DecimalFormat("#.####");
            this.formatter.setDecimalFormatSymbols(decimalFormatSymbol);
        }

        public void writeRecord(Record record) throws IOException {
            this.buffer.setLength(0);
            this.buffer.append(((IntValue)record.getField(0, IntValue.class)).toString()).append('|');
            this.buffer.append(((StringValue)record.getField(1, StringValue.class)).toString()).append('|');
            this.buffer.append(this.formatter.format(((DoubleValue)record.getField(2, DoubleValue.class)).getValue())).append('|');
            this.buffer.append(((StringValue)record.getField(3, StringValue.class)).toString()).append('|');
            this.buffer.append(((StringValue)record.getField(4, StringValue.class)).toString()).append('|');
            this.buffer.append(((StringValue)record.getField(5, StringValue.class)).toString()).append('|');
            this.buffer.append(((StringValue)record.getField(6, StringValue.class)).toString()).append('|');
            this.buffer.append(((StringValue)record.getField(7, StringValue.class)).toString()).append('|');
            this.buffer.append('\n');
            byte[] bytes = this.buffer.toString().getBytes();
            this.stream.write(bytes);
        }
    }

    @ReduceOperator.Combinable
    public static class Sum
    extends ReduceFunction {
        private final DoubleValue d = new DoubleValue();

        public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
            Record record = null;
            double sum = 0.0;
            while (records.hasNext()) {
                record = records.next();
                sum += ((DoubleValue)record.getField(2, DoubleValue.class)).getValue();
            }
            this.d.setValue(sum);
            record.setField(2, (Value)this.d);
            out.collect((Object)record);
        }

        public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
            this.reduce(records, out);
        }
    }

    public static class JoinNCOL
    extends JoinFunction {
        public void join(Record colRecord, Record nation, Collector<Record> out) throws Exception {
            colRecord.setField(4, nation.getField(1, StringValue.class));
            out.collect((Object)colRecord);
        }
    }

    public static class JoinCOL
    extends JoinFunction {
        private final DoubleValue d = new DoubleValue();

        public void join(Record custRecord, Record olRecord, Collector<Record> out) throws Exception {
            Tuple t = (Tuple)olRecord.getField(1, Tuple.class);
            double extPrice = Double.parseDouble(t.getStringValueAt(0));
            double discount = Double.parseDouble(t.getStringValueAt(1));
            this.d.setValue(extPrice * (1.0 - discount));
            custRecord.setField(2, (Value)this.d);
            out.collect((Object)custRecord);
        }
    }

    public static class JoinOL
    extends JoinFunction {
        public void join(Record order, Record lineitem, Collector<Record> out) throws Exception {
            lineitem.setField(0, order.getField(1, IntValue.class));
            out.collect((Object)lineitem);
        }
    }

    public static class ProjectN
    extends MapFunction {
        private final Tuple tuple = new Tuple();
        private final StringValue nationName = new StringValue();

        public void map(Record record, Collector<Record> out) throws Exception {
            Tuple t = (Tuple)record.getField(1, (Value)this.tuple);
            this.nationName.setValue((CharSequence)t.getStringValueAt(1));
            record.setField(1, (Value)this.nationName);
            out.collect((Object)record);
        }
    }

    public static class ProjectC
    extends MapFunction {
        private final Tuple tuple = new Tuple();
        private final StringValue custName = new StringValue();
        private final StringValue balance = new StringValue();
        private final IntValue nationKey = new IntValue();
        private final StringValue address = new StringValue();
        private final StringValue phone = new StringValue();
        private final StringValue comment = new StringValue();

        public void map(Record record, Collector<Record> out) throws Exception {
            Tuple t = (Tuple)record.getField(1, (Value)this.tuple);
            this.custName.setValue((CharSequence)t.getStringValueAt(1));
            this.address.setValue((CharSequence)t.getStringValueAt(2));
            this.nationKey.setValue((int)t.getLongValueAt(3));
            this.phone.setValue((CharSequence)t.getStringValueAt(4));
            this.balance.setValue((CharSequence)t.getStringValueAt(5));
            this.comment.setValue((CharSequence)t.getStringValueAt(7));
            record.setField(1, (Value)this.custName);
            record.setField(3, (Value)this.balance);
            record.setField(4, (Value)this.nationKey);
            record.setField(5, (Value)this.address);
            record.setField(6, (Value)this.phone);
            record.setField(7, (Value)this.comment);
            out.collect((Object)record);
        }
    }

    public static class FilterLI
    extends MapFunction {
        private final Tuple tuple = new Tuple();

        public void map(Record record, Collector<Record> out) throws Exception {
            Tuple t = (Tuple)record.getField(1, (Value)this.tuple);
            if (t.getStringValueAt(8).equals("R")) {
                t.project(96);
                record.setField(1, (Value)t);
                out.collect((Object)record);
            }
        }
    }

    public static class FilterO
    extends MapFunction {
        private static final int YEAR_FILTER = 1990;
        private final IntValue custKey = new IntValue();

        public void map(Record record, Collector<Record> out) throws Exception {
            Tuple t = (Tuple)record.getField(1, Tuple.class);
            if (Integer.parseInt(t.getStringValueAt(4).substring(0, 4)) > 1990) {
                this.custKey.setValue((int)t.getLongValueAt(1));
                record.setField(1, (Value)this.custKey);
                out.collect((Object)record);
            }
        }
    }
}

