/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.example.java.record.relational;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.Program;
import eu.stratosphere.api.common.ProgramDescription;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.io.FileOutputFormat;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.common.operators.util.FieldSet;
import eu.stratosphere.api.java.record.functions.CoGroupFunction;
import eu.stratosphere.api.java.record.functions.FunctionAnnotation;
import eu.stratosphere.api.java.record.functions.JoinFunction;
import eu.stratosphere.api.java.record.functions.MapFunction;
import eu.stratosphere.api.java.record.io.CsvInputFormat;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.api.java.record.operators.CoGroupOperator;
import eu.stratosphere.api.java.record.operators.JoinOperator;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.util.Collector;
import java.io.Serializable;
import java.util.Iterator;

public class WebLogAnalysis
implements Program,
ProgramDescription {
    public Plan getPlan(String ... args) {
        int numSubTasks = args.length > 0 ? Integer.parseInt(args[0]) : 1;
        String docsInput = args.length > 1 ? args[1] : "";
        String ranksInput = args.length > 2 ? args[2] : "";
        String visitsInput = args.length > 3 ? args[3] : "";
        String output = args.length > 4 ? args[4] : "";
        CsvInputFormat docsFormat = new CsvInputFormat('|', new Class[]{StringValue.class, StringValue.class});
        FileDataSource docs = new FileDataSource((FileInputFormat)docsFormat, docsInput, "Docs Input");
        FileDataSource ranks = new FileDataSource((FileInputFormat)new CsvInputFormat(), ranksInput, "Ranks input");
        ((CsvInputFormat.ConfigBuilder)((CsvInputFormat.ConfigBuilder)((CsvInputFormat.ConfigBuilder)((CsvInputFormat.ConfigBuilder)CsvInputFormat.configureRecordFormat((FileDataSource)ranks).recordDelimiter('\n')).fieldDelimiter('|')).field(StringValue.class, 1)).field(IntValue.class, 0)).field(IntValue.class, 2);
        CsvInputFormat visitsFormat = new CsvInputFormat('|', new Class[]{null, StringValue.class, StringValue.class});
        FileDataSource visits = new FileDataSource((FileInputFormat)visitsFormat, visitsInput, "Visits input:q");
        MapOperator filterDocs = MapOperator.builder((MapFunction)new FilterDocs()).input(new Operator[]{docs}).name("Filter Docs").build();
        filterDocs.getCompilerHints().setAvgRecordsEmittedPerStubCall(0.15f);
        filterDocs.getCompilerHints().setAvgBytesPerRecord(60.0f);
        filterDocs.getCompilerHints().setAvgNumRecordsPerDistinctFields(new FieldSet(new int[]{0}), 1.0f);
        MapOperator filterRanks = MapOperator.builder((MapFunction)new FilterRanks()).input(new Operator[]{ranks}).name("Filter Ranks").build();
        filterRanks.getCompilerHints().setAvgRecordsEmittedPerStubCall(0.25f);
        filterRanks.getCompilerHints().setAvgNumRecordsPerDistinctFields(new FieldSet(new int[]{0}), 1.0f);
        MapOperator filterVisits = MapOperator.builder((MapFunction)new FilterVisits()).input(new Operator[]{visits}).name("Filter Visits").build();
        filterVisits.getCompilerHints().setAvgBytesPerRecord(60.0f);
        filterVisits.getCompilerHints().setAvgRecordsEmittedPerStubCall(0.2f);
        JoinOperator joinDocsRanks = JoinOperator.builder((JoinFunction)new JoinDocRanks(), StringValue.class, (int)0, (int)0).input1(new Operator[]{filterDocs}).input2(new Operator[]{filterRanks}).name("Join Docs Ranks").build();
        joinDocsRanks.setDegreeOfParallelism(numSubTasks);
        CoGroupOperator antiJoinVisits = CoGroupOperator.builder((CoGroupFunction)new AntiJoinVisits(), StringValue.class, (int)0, (int)0).input1(new Operator[]{joinDocsRanks}).input2(new Operator[]{filterVisits}).name("Antijoin DocsVisits").build();
        antiJoinVisits.getCompilerHints().setAvgRecordsEmittedPerStubCall(0.8f);
        FileDataSink result = new FileDataSink((FileOutputFormat)new CsvOutputFormat(), output, (Operator)antiJoinVisits, "Result");
        result.setDegreeOfParallelism(numSubTasks);
        ((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)CsvOutputFormat.configureRecordFormat((FileDataSink)result).recordDelimiter('\n')).fieldDelimiter('|')).lenient(true)).field(IntValue.class, 1)).field(StringValue.class, 0)).field(IntValue.class, 2);
        Plan p = new Plan((GenericDataSink)result, "Weblog Analysis");
        p.setDefaultParallelism(numSubTasks);
        return p;
    }

    public String getDescription() {
        return "Parameters: [numSubTasks], [docs], [ranks], [visits], [output]";
    }

    @FunctionAnnotation.ConstantFieldsFirstExcept(value={})
    public static class AntiJoinVisits
    extends CoGroupFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;

        public void coGroup(Iterator<Record> ranks, Iterator<Record> visits, Collector<Record> out) {
            if (!visits.hasNext()) {
                while (ranks.hasNext()) {
                    out.collect((Object)ranks.next());
                }
            }
        }
    }

    @FunctionAnnotation.ConstantFieldsSecondExcept(value={})
    public static class JoinDocRanks
    extends JoinFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;

        public void match(Record document, Record rank, Collector<Record> out) throws Exception {
            out.collect((Object)rank);
        }
    }

    @FunctionAnnotation.ConstantFieldsExcept(value={1})
    public static class FilterVisits
    extends MapFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private static final int YEARFILTER = 2010;

        public void map(Record record, Collector<Record> out) throws Exception {
            String dateString = ((StringValue)record.getField(1, StringValue.class)).getValue();
            int year = Integer.parseInt(dateString.substring(0, 4));
            if (year == 2010) {
                record.setNull(1);
                out.collect((Object)record);
            }
        }
    }

    @FunctionAnnotation.ConstantFieldsExcept(value={})
    public static class FilterRanks
    extends MapFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private static final int RANKFILTER = 50;

        public void map(Record record, Collector<Record> out) throws Exception {
            if (((IntValue)record.getField(1, IntValue.class)).getValue() > 50) {
                out.collect((Object)record);
            }
        }
    }

    @FunctionAnnotation.ConstantFieldsExcept(value={1})
    public static class FilterDocs
    extends MapFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private static final String[] KEYWORDS = new String[]{" editors ", " oscillations ", " convection "};

        public void map(Record record, Collector<Record> out) throws Exception {
            String docText = ((StringValue)record.getField(1, StringValue.class)).toString();
            boolean allContained = true;
            for (String kw : KEYWORDS) {
                if (docText.contains(kw)) continue;
                allContained = false;
                break;
            }
            if (allContained) {
                record.setNull(1);
                out.collect((Object)record);
            }
        }
    }
}

