/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.broadcastvars;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.io.FileOutputFormat;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.record.functions.JoinFunction;
import eu.stratosphere.api.java.record.functions.MapFunction;
import eu.stratosphere.api.java.record.io.CsvInputFormat;
import eu.stratosphere.api.java.record.operators.JoinOperator;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.test.operators.io.ContractITCaseIOFormats;
import eu.stratosphere.test.util.RecordAPITestBase;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;
import java.util.Collection;

public class BroadcastBranchingITCase
extends RecordAPITestBase {
    private static final String SC1_ID_ABC = "1 61 6 29\n2 7 13 10\n3 8 13 27\n";
    private static final String SC2_ID_X = "1 5\n2 3\n3 6";
    private static final String SC3_ID_Y = "1 2\n2 3\n3 7";
    private static final String RESULT = "2 112\n";
    private String sc1Path;
    private String sc2Path;
    private String sc3Path;
    private String resultPath;

    protected void preSubmit() throws Exception {
        this.sc1Path = this.createTempFile("broadcastBranchingInput/map_id_abc.txt", SC1_ID_ABC);
        this.sc2Path = this.createTempFile("broadcastBranchingInput/map_id_x.txt", SC2_ID_X);
        this.sc3Path = this.createTempFile("broadcastBranchingInput/map_id_y.txt", SC3_ID_Y);
        this.resultPath = this.getTempDirPath("result");
    }

    protected Plan getTestJob() {
        FileDataSource sc1 = new FileDataSource((FileInputFormat)new CsvInputFormat(), this.sc1Path);
        ((CsvInputFormat.ConfigBuilder)((CsvInputFormat.ConfigBuilder)((CsvInputFormat.ConfigBuilder)((CsvInputFormat.ConfigBuilder)CsvInputFormat.configureRecordFormat((FileDataSource)sc1).fieldDelimiter(' ')).field(StringValue.class, 0)).field(IntValue.class, 1)).field(IntValue.class, 2)).field(IntValue.class, 3);
        FileDataSource sc2 = new FileDataSource((FileInputFormat)new CsvInputFormat(), this.sc2Path);
        ((CsvInputFormat.ConfigBuilder)((CsvInputFormat.ConfigBuilder)CsvInputFormat.configureRecordFormat((FileDataSource)sc2).fieldDelimiter(' ')).field(StringValue.class, 0)).field(IntValue.class, 1);
        FileDataSource sc3 = new FileDataSource((FileInputFormat)new CsvInputFormat(), this.sc3Path);
        ((CsvInputFormat.ConfigBuilder)((CsvInputFormat.ConfigBuilder)CsvInputFormat.configureRecordFormat((FileDataSource)sc3).fieldDelimiter(' ')).field(StringValue.class, 0)).field(IntValue.class, 1);
        JoinOperator jn1 = JoinOperator.builder(Jn1.class, StringValue.class, (int)0, (int)0).input1(new Operator[]{sc2}).input2(new Operator[]{sc3}).build();
        JoinOperator jn2 = JoinOperator.builder(Jn2.class, StringValue.class, (int)0, (int)0).input1(new Operator[]{jn1}).input2(new Operator[]{sc1}).build();
        MapOperator mp1 = MapOperator.builder(Mp1.class).input(new Operator[]{jn1}).build();
        MapOperator mp2 = MapOperator.builder(Mp2.class).setBroadcastVariable("z", (Operator)mp1).input(new Operator[]{jn2}).build();
        FileDataSink output = new FileDataSink((FileOutputFormat)new ContractITCaseIOFormats.ContractITCaseOutputFormat(), this.resultPath);
        output.setDegreeOfParallelism(1);
        output.setInput((Operator)mp2);
        return new Plan((GenericDataSink)output);
    }

    protected void postSubmit() throws Exception {
        this.compareResultsByLinesInMemory(RESULT, this.resultPath);
    }

    public static class Mp2
    extends MapFunction {
        private static final long serialVersionUID = 1L;
        private Collection<Record> zs;

        public void open(Configuration parameters) throws Exception {
            this.zs = this.getRuntimeContext().getBroadcastVariable("z");
        }

        public void map(Record jn2, Collector<Record> out) throws Exception {
            int p = ((IntValue)jn2.getField(1, IntValue.class)).getValue();
            for (Record z : this.zs) {
                if (!((StringValue)z.getField(0, StringValue.class)).getValue().equals(((StringValue)jn2.getField(0, StringValue.class)).getValue()) || p % ((IntValue)z.getField(1, IntValue.class)).getValue() == 0) continue;
                out.collect((Object)jn2);
            }
        }
    }

    public static class Mp1
    extends MapFunction {
        private static final long serialVersionUID = 1L;

        public void map(Record jn1, Collector<Record> out) throws Exception {
            if (((IntValue)jn1.getField(1, IntValue.class)).getValue() == ((IntValue)jn1.getField(2, IntValue.class)).getValue()) {
                out.collect((Object)new Record(jn1.getField(0, StringValue.class), jn1.getField(1, IntValue.class)));
            }
        }
    }

    public static class Jn2
    extends JoinFunction {
        private static final long serialVersionUID = 1L;

        private static int p(int x, int a, int b, int c) {
            return a * x * x + b * x + c;
        }

        public void join(Record jn1, Record sc1, Collector<Record> out) throws Exception {
            int x = ((IntValue)jn1.getField(1, IntValue.class)).getValue();
            int y = ((IntValue)jn1.getField(2, IntValue.class)).getValue();
            int a = ((IntValue)sc1.getField(1, IntValue.class)).getValue();
            int b = ((IntValue)sc1.getField(2, IntValue.class)).getValue();
            int c = ((IntValue)sc1.getField(3, IntValue.class)).getValue();
            int p_x = Jn2.p(x, a, b, c);
            int p_y = Jn2.p(y, a, b, c);
            int min = Math.min(p_x, p_y);
            out.collect((Object)new Record(jn1.getField(0, StringValue.class), (Value)new IntValue(min)));
        }
    }

    public static class Jn1
    extends JoinFunction {
        private static final long serialVersionUID = 1L;

        public void join(Record sc2, Record sc3, Collector<Record> out) throws Exception {
            Record r = new Record(3);
            r.setField(0, sc2.getField(0, StringValue.class));
            r.setField(1, sc2.getField(1, IntValue.class));
            r.setField(2, sc3.getField(1, IntValue.class));
            out.collect((Object)r);
        }
    }
}

