/*
 * Decompiled with CFR 0.152.
 */
package cascading.assembly;

import cascading.assembly.CrossTab;
import cascading.flow.FlowProcess;
import cascading.operation.AggregatorCall;
import cascading.pipe.Pipe;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import java.util.HashMap;
import java.util.Map;

public class PearsonDistance
extends CrossTab {
    public PearsonDistance(Pipe previous) {
        this(previous, Fields.size((int)3), new Fields(new Comparable[]{"n1", "n2", "pearson"}));
    }

    public PearsonDistance(Pipe previous, Fields argumentFieldSelector, Fields fieldDeclaration) {
        super(previous, argumentFieldSelector, new Pearson(), fieldDeclaration);
    }

    private static class Pearson
    extends CrossTab.CrossTabOperation<Map<String, Double>> {
        private static final String COUNT = "count";
        private static final String SUM1 = "sum1";
        private static final String SUM2 = "sum2";
        private static final String SUMSQRS1 = "sumsqrs1";
        private static final String SUMSQRS2 = "sumsqrs2";
        private static final String SUMPROD = "sumprod";

        public Pearson() {
            super(new Fields(new Comparable[]{"pearson"}));
        }

        public void start(FlowProcess flowProcess, AggregatorCall<Map<String, Double>> aggregatorCall) {
            if (aggregatorCall.getContext() == null) {
                aggregatorCall.setContext(new HashMap());
            }
            Map context = (Map)aggregatorCall.getContext();
            context.put(COUNT, 0.0);
            context.put(SUM1, 0.0);
            context.put(SUM2, 0.0);
            context.put(SUMSQRS1, 0.0);
            context.put(SUMSQRS2, 0.0);
            context.put(SUMPROD, 0.0);
        }

        public void aggregate(FlowProcess flowProcess, AggregatorCall<Map<String, Double>> aggregatorCall) {
            Map context = (Map)aggregatorCall.getContext();
            TupleEntry entry = aggregatorCall.getArguments();
            context.put(COUNT, (Double)context.get(COUNT) + 1.0);
            context.put(SUM1, (Double)context.get(SUM1) + entry.getTuple().getDouble(0));
            context.put(SUM2, (Double)context.get(SUM2) + entry.getTuple().getDouble(1));
            context.put(SUMSQRS1, (Double)context.get(SUMSQRS1) + Math.pow(entry.getTuple().getDouble(0), 2.0));
            context.put(SUMSQRS2, (Double)context.get(SUMSQRS2) + Math.pow(entry.getTuple().getDouble(1), 2.0));
            context.put(SUMPROD, (Double)context.get(SUMPROD) + entry.getTuple().getDouble(0) * entry.getTuple().getDouble(1));
        }

        public void complete(FlowProcess flowProcess, AggregatorCall<Map<String, Double>> aggregatorCall) {
            Map context = (Map)aggregatorCall.getContext();
            Double count = (Double)context.get(COUNT);
            Double sum1 = (Double)context.get(SUM1);
            Double sum2 = (Double)context.get(SUM2);
            double num = (Double)context.get(SUMPROD) - sum1 * sum2 / count;
            double den = Math.sqrt(((Double)context.get(SUMSQRS1) - Math.pow(sum1, 2.0) / count) * ((Double)context.get(SUMSQRS2) - Math.pow(sum2, 2.0) / count));
            if (den == 0.0) {
                aggregatorCall.getOutputCollector().add(new Tuple(new Object[]{0}));
            } else {
                aggregatorCall.getOutputCollector().add(new Tuple(new Object[]{num / den}));
            }
        }
    }
}

