/*
 * Decompiled with CFR 0.152.
 */
package cascading.operation;

import cascading.CascadingException;
import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.OperationCall;
import cascading.provider.FactoryLoader;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.util.TupleHasher;
import cascading.tuple.util.TupleViews;
import cascading.util.cache.BaseCacheFactory;
import cascading.util.cache.CacheEvictionCallback;
import cascading.util.cache.CascadingCache;
import cascading.util.cache.LRUHashMapCacheFactory;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;

public class CompositeFunction
extends BaseOperation<Context>
implements Function<Context> {
    public static final String COMPOSITE_FUNCTION_CAPACITY = "cascading.function.composite.cache.capacity";
    public static final Class<? extends BaseCacheFactory> DEFAULT_CACHE_FACTORY_CLASS = LRUHashMapCacheFactory.class;
    public static String COMPOSITE_FUNCTION_CACHE_FACTORY = "cascading.function.composite.cachefactory.classname";
    public static int COMPOSITE_FUNCTION_DEFAULT_CAPACITY = 10000;
    private final Fields groupingFields;
    private final Fields[] argumentFields;
    private final Fields[] functorFields;
    private final CoFunction[] coFunctions;
    private final TupleHasher tupleHasher;
    private int capacity = 0;

    public CompositeFunction(Fields groupingFields, Fields argumentFields, CoFunction coFunction, int capacity) {
        this(groupingFields, Fields.fields(argumentFields), new CoFunction[]{coFunction}, capacity);
    }

    public CompositeFunction(Fields groupingFields, Fields[] argumentFields, CoFunction[] coFunctions, int capacity) {
        super(CompositeFunction.getFields(groupingFields, coFunctions));
        this.groupingFields = groupingFields;
        this.argumentFields = argumentFields;
        this.coFunctions = coFunctions;
        this.capacity = capacity;
        this.functorFields = new Fields[coFunctions.length];
        for (int i = 0; i < coFunctions.length; ++i) {
            this.functorFields[i] = coFunctions[i].getDeclaredFields();
        }
        Comparator[] hashers = TupleHasher.merge(this.functorFields);
        this.tupleHasher = !TupleHasher.isNull(hashers) ? new TupleHasher(null, hashers) : null;
    }

    private static Fields getFields(Fields groupingFields, CoFunction[] coFunctions) {
        Fields fields = groupingFields;
        for (CoFunction functor : coFunctions) {
            fields = fields.append(functor.getDeclaredFields());
        }
        return fields;
    }

    @Override
    public void prepare(final FlowProcess flowProcess, final OperationCall<Context> operationCall) {
        Fields[] fields = new Fields[this.coFunctions.length + 1];
        fields[0] = this.groupingFields;
        for (int i = 0; i < this.coFunctions.length; ++i) {
            fields[i + 1] = this.coFunctions[i].getDeclaredFields();
        }
        final Context context = new Context();
        context.arguments = new TupleEntry[this.coFunctions.length];
        for (int i = 0; i < context.arguments.length; ++i) {
            Fields resolvedArgumentFields = operationCall.getArgumentFields();
            int[] pos = this.argumentFields[i].isAll() ? resolvedArgumentFields.getPos() : resolvedArgumentFields.getPos(this.argumentFields[i]);
            Tuple narrow = TupleViews.createNarrow(pos);
            Fields currentFields = this.argumentFields[i].isSubstitution() ? resolvedArgumentFields.select(this.argumentFields[i]) : Fields.asDeclaration(this.argumentFields[i]);
            context.arguments[i] = new TupleEntry(currentFields, narrow);
        }
        context.result = TupleViews.createComposite(fields);
        BaseCacheFactory<Tuple, Tuple[], ?> factory = this.loadCacheFactory(flowProcess);
        Object cache = factory.create(flowProcess);
        class Eviction
        implements CacheEvictionCallback<Tuple, Tuple[]> {
            Eviction() {
            }

            @Override
            public void evict(Map.Entry<Tuple, Tuple[]> entry) {
                CompositeFunction.this.completeFunctors(flowProcess, ((FunctionCall)operationCall).getOutputCollector(), context.result, entry);
                CompositeFunction.this.incrementNumKeysFlushed(flowProcess);
            }
        }
        cache.setCacheEvictionCallback(new Eviction());
        Integer cacheCapacity = this.capacity;
        if (this.capacity == 0) {
            cacheCapacity = this.getCacheCapacity(flowProcess);
        }
        cache.setCapacity(cacheCapacity);
        cache.initialize();
        context.lru = cache;
        operationCall.setContext(context);
    }

    protected Integer getCacheCapacity(FlowProcess flowProcess) {
        return this.getCacheCapacity(flowProcess, COMPOSITE_FUNCTION_CAPACITY, COMPOSITE_FUNCTION_DEFAULT_CAPACITY);
    }

    protected BaseCacheFactory<Tuple, Tuple[], ?> loadCacheFactory(FlowProcess flowProcess) {
        return this.loadCacheFactory(flowProcess, COMPOSITE_FUNCTION_CACHE_FACTORY, DEFAULT_CACHE_FACTORY_CLASS);
    }

    protected Integer getCacheCapacity(FlowProcess flowProcess, String property, int defaultValue) {
        Integer cacheCapacity = flowProcess.getIntegerProperty(property);
        if (cacheCapacity == null) {
            cacheCapacity = defaultValue;
        }
        return cacheCapacity;
    }

    protected BaseCacheFactory<Tuple, Tuple[], ?> loadCacheFactory(FlowProcess flowProcess, String property, Class<? extends BaseCacheFactory> type) {
        FactoryLoader loader = FactoryLoader.getInstance();
        BaseCacheFactory factory = loader.loadFactoryFrom(flowProcess, property, type);
        if (factory == null) {
            throw new CascadingException("unable to load cache factory, please check your '" + property + "' setting.");
        }
        return factory;
    }

    @Override
    public void operate(FlowProcess flowProcess, FunctionCall<Context> functionCall) {
        TupleEntry arguments = functionCall.getArguments();
        Tuple key = TupleHasher.wrapTuple(this.tupleHasher, arguments.selectTupleCopy(this.groupingFields));
        Context context = (Context)functionCall.getContext();
        Tuple[] functorContext = (Tuple[])context.lru.get(key);
        if (functorContext == null) {
            functorContext = new Tuple[this.coFunctions.length];
            context.lru.put(key, functorContext);
            this.incrementNumKeysMissed(flowProcess);
        } else {
            this.incrementNumKeysHit(flowProcess);
        }
        for (int i = 0; i < this.coFunctions.length; ++i) {
            TupleViews.reset(context.arguments[i].getTuple(), arguments.getTuple());
            functorContext[i] = this.coFunctions[i].aggregate(flowProcess, context.arguments[i], functorContext[i]);
        }
    }

    protected void incrementNumKeysFlushed(FlowProcess flowProcess) {
        flowProcess.increment(Cache.Num_Keys_Flushed, 1L);
    }

    protected void incrementNumKeysHit(FlowProcess flowProcess) {
        flowProcess.increment(Cache.Num_Keys_Hit, 1L);
    }

    protected void incrementNumKeysMissed(FlowProcess flowProcess) {
        flowProcess.increment(Cache.Num_Keys_Missed, 1L);
    }

    @Override
    public void flush(FlowProcess flowProcess, OperationCall<Context> operationCall) {
        TupleEntryCollector collector = ((FunctionCall)operationCall).getOutputCollector();
        Tuple result = operationCall.getContext().result;
        CascadingCache<Tuple, Tuple[]> context = operationCall.getContext().lru;
        for (Map.Entry<Tuple, Tuple[]> entry : context.entrySet()) {
            this.completeFunctors(flowProcess, collector, result, entry);
        }
        context.clear();
    }

    @Override
    public void cleanup(FlowProcess flowProcess, OperationCall<Context> operationCall) {
        operationCall.setContext(null);
    }

    private void completeFunctors(FlowProcess flowProcess, TupleEntryCollector outputCollector, Tuple result, Map.Entry<Tuple, Tuple[]> entry) {
        Tuple[] results = new Tuple[this.coFunctions.length + 1];
        results[0] = entry.getKey();
        Tuple[] values = entry.getValue();
        for (int i = 0; i < this.coFunctions.length; ++i) {
            results[i + 1] = this.coFunctions[i].complete(flowProcess, values[i]);
        }
        TupleViews.reset(result, results);
        outputCollector.add(result);
    }

    @Override
    public boolean equals(Object object) {
        if (this == object) {
            return true;
        }
        if (!(object instanceof CompositeFunction)) {
            return false;
        }
        if (!super.equals(object)) {
            return false;
        }
        CompositeFunction that = (CompositeFunction)object;
        if (!Arrays.equals(this.argumentFields, that.argumentFields)) {
            return false;
        }
        if (!Arrays.equals(this.functorFields, that.functorFields)) {
            return false;
        }
        if (!Arrays.equals(this.coFunctions, that.coFunctions)) {
            return false;
        }
        return !(this.groupingFields != null ? !this.groupingFields.equals(that.groupingFields) : that.groupingFields != null);
    }

    @Override
    public int hashCode() {
        int result = super.hashCode();
        result = 31 * result + (this.groupingFields != null ? this.groupingFields.hashCode() : 0);
        result = 31 * result + (this.argumentFields != null ? Arrays.hashCode(this.argumentFields) : 0);
        result = 31 * result + (this.functorFields != null ? Arrays.hashCode(this.functorFields) : 0);
        result = 31 * result + (this.coFunctions != null ? Arrays.hashCode(this.coFunctions) : 0);
        return result;
    }

    public static class Context {
        CascadingCache<Tuple, Tuple[]> lru;
        TupleEntry[] arguments;
        Tuple result;
    }

    public static interface CoFunction
    extends Serializable {
        public Fields getDeclaredFields();

        public Tuple aggregate(FlowProcess var1, TupleEntry var2, Tuple var3);

        public Tuple complete(FlowProcess var1, Tuple var2);
    }

    public static enum Cache {
        Num_Keys_Flushed,
        Num_Keys_Hit,
        Num_Keys_Missed;

    }
}

