/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.runtime.core.functions;

import io.mantisrx.common.MantisGroup;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.computation.Computation;
import io.mantisrx.runtime.computation.GroupToScalarComputation;
import io.mantisrx.runtime.computation.ScalarComputation;
import io.mantisrx.runtime.core.WindowSpec;
import io.mantisrx.runtime.core.functions.FilterFunction;
import io.mantisrx.runtime.core.functions.FlatMapFunction;
import io.mantisrx.runtime.core.functions.MantisFunction;
import io.mantisrx.runtime.core.functions.MapFunction;
import io.mantisrx.runtime.core.functions.ReduceFunction;
import io.mantisrx.runtime.core.functions.SimpleReduceFunction;
import io.mantisrx.runtime.core.functions.WindowFunction;
import io.mantisrx.shaded.com.google.common.annotations.VisibleForTesting;
import io.mantisrx.shaded.com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import rx.Observable;

public class FunctionCombinator<T, R> {
    private final boolean isKeyed;
    private final List<MantisFunction> functions;

    public FunctionCombinator(boolean isKeyed) {
        this(isKeyed, (List<MantisFunction>)ImmutableList.of());
    }

    public FunctionCombinator(boolean isKeyed, List<MantisFunction> functions) {
        this.isKeyed = isKeyed;
        this.functions = functions;
    }

    public <IN, OUT> FunctionCombinator<T, OUT> add(MantisFunction fn) {
        ImmutableList functions = ImmutableList.builder().addAll(this.functions).add((Object)fn).build();
        return new FunctionCombinator<T, R>(this.isKeyed, (List<MantisFunction>)functions);
    }

    public int size() {
        return this.functions.size();
    }

    @VisibleForTesting
    <U, V> ScalarComputation<U, V> makeScalarStage() {
        return new ScalarComputation<U, V>(){

            @Override
            public void init(Context context) {
                FunctionCombinator.this.functions.forEach(MantisFunction::init);
            }

            public Observable<V> call(Context context, Observable<U> uObservable) {
                Observable current = uObservable;
                for (MantisFunction fn : FunctionCombinator.this.functions) {
                    if (fn instanceof FilterFunction) {
                        current = current.filter(((FilterFunction)fn)::apply);
                        continue;
                    }
                    if (fn instanceof MapFunction) {
                        current = current.map(e -> ((MapFunction)fn).apply(e));
                        continue;
                    }
                    if (!(fn instanceof FlatMapFunction)) continue;
                    current = current.flatMap(e -> Observable.from(((FlatMapFunction)fn).apply(e)));
                }
                return current;
            }
        };
    }

    @VisibleForTesting
    <K, U, V> GroupToScalarComputation<K, U, V> makeGroupToScalarStage() {
        return new GroupToScalarComputation<K, U, V>(){

            @Override
            public void init(Context context) {
                FunctionCombinator.this.functions.forEach(MantisFunction::init);
            }

            public Observable<V> call(Context context, Observable<MantisGroup<K, U>> gobs) {
                Observable observable = gobs.groupBy(MantisGroup::getKeyValue).flatMap(gob -> {
                    Observable current = gob.map(MantisGroup::getValue);
                    Object key = gob.getKey();
                    for (MantisFunction fn : FunctionCombinator.this.functions) {
                        if (fn instanceof FilterFunction) {
                            current = current.filter(((FilterFunction)fn)::apply);
                            continue;
                        }
                        if (fn instanceof MapFunction) {
                            current = current.map(x -> ((MapFunction)fn).apply(x));
                            continue;
                        }
                        if (fn instanceof FlatMapFunction) {
                            current = current.flatMap(x -> Observable.from(((FlatMapFunction)fn).apply(x)));
                            continue;
                        }
                        if (fn instanceof WindowFunction) {
                            current = FunctionCombinator.this.handleWindows(current, (WindowFunction)fn);
                            continue;
                        }
                        if (!(fn instanceof ReduceFunction)) continue;
                        ReduceFunction reduceFn = (ReduceFunction)fn;
                        current = current.map(obs -> obs.reduce(reduceFn.initialValue(), (acc, e) -> reduceFn.reduce(acc, e))).flatMap(x -> x).filter(x -> x != SimpleReduceFunction.EMPTY);
                    }
                    return current;
                });
                return observable;
            }
        };
    }

    private Observable<? extends Observable<?>> handleWindows(Observable<?> obs, WindowFunction<?> windowFn) {
        WindowSpec spec = windowFn.getSpec();
        switch (spec.getType()) {
            case ELEMENT: 
            case ELEMENT_SLIDING: {
                return obs.window(spec.getNumElements(), spec.getElementOffset());
            }
            case TUMBLING: 
            case SLIDING: {
                return obs.window(spec.getWindowLength().toMillis(), spec.getWindowOffset().toMillis(), TimeUnit.MILLISECONDS);
            }
        }
        throw new UnsupportedOperationException("Unknown WindowSpec must be one of " + Arrays.toString((Object[])WindowSpec.WindowType.values()));
    }

    public Computation makeStage() {
        if (this.size() == 0) {
            return null;
        }
        if (this.isKeyed) {
            return this.makeGroupToScalarStage();
        }
        return this.makeScalarStage();
    }

    public boolean isKeyed() {
        return this.isKeyed;
    }
}

