/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.runtime.core;

import io.mantisrx.common.MantisGroup;
import io.mantisrx.runtime.Config;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.computation.Computation;
import io.mantisrx.runtime.computation.ToGroupComputation;
import io.mantisrx.runtime.core.KeyedMantisStream;
import io.mantisrx.runtime.core.KeyedMantisStreamImpl;
import io.mantisrx.runtime.core.MantisGraph;
import io.mantisrx.runtime.core.MantisJobBuilder;
import io.mantisrx.runtime.core.MantisStream;
import io.mantisrx.runtime.core.OperandNode;
import io.mantisrx.runtime.core.functions.FilterFunction;
import io.mantisrx.runtime.core.functions.FlatMapFunction;
import io.mantisrx.runtime.core.functions.FunctionCombinator;
import io.mantisrx.runtime.core.functions.KeyByFunction;
import io.mantisrx.runtime.core.functions.MantisFunction;
import io.mantisrx.runtime.core.functions.MapFunction;
import io.mantisrx.runtime.core.functions.WindowFunction;
import io.mantisrx.runtime.core.sinks.ObservableSinkImpl;
import io.mantisrx.runtime.core.sinks.SinkFunction;
import io.mantisrx.runtime.core.sources.ObservableSourceImpl;
import io.mantisrx.runtime.core.sources.SourceFunction;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.shaded.com.google.common.collect.ImmutableList;
import io.mantisrx.shaded.com.google.common.graph.ImmutableValueGraph;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

public class MantisStreamImpl<T>
implements MantisStream<T> {
    private static final Logger log = LoggerFactory.getLogger(MantisStreamImpl.class);
    final OperandNode<T> currNode;
    final MantisGraph graph;
    final Iterable<ParameterDefinition<?>> params;

    MantisStreamImpl(OperandNode<T> newNode, MantisGraph graph) {
        this(newNode, graph, (Iterable<ParameterDefinition<?>>)ImmutableList.of());
    }

    MantisStreamImpl(OperandNode<T> node, MantisGraph graph, Iterable<ParameterDefinition<?>> params) {
        this.currNode = node;
        this.graph = graph;
        this.params = params;
    }

    public static <T> MantisStream<T> init() {
        OperandNode node = new OperandNode(0, "init");
        return new MantisStreamImpl(node, new MantisGraph().addNode(node));
    }

    @Override
    public <OUT> MantisStream<OUT> source(SourceFunction<OUT> source) {
        return this.updateGraph(source);
    }

    @Override
    public Config<T> sink(SinkFunction<T> sink) {
        MantisStreamImpl mantisStream = this.updateGraph(sink);
        ImmutableValueGraph<OperandNode<?>, MantisFunction> graphDag = mantisStream.graph.immutable();
        Iterable<OperandNode<?>> operandNodes = MantisStreamImpl.topSortTraversal(graphDag);
        MantisJobBuilder jobBuilder = this.makeMantisJob(graphDag, operandNodes);
        return jobBuilder.buildJobConfig();
    }

    <OUT> MantisStreamImpl<OUT> updateGraph(MantisFunction mantisFn) {
        OperandNode node = OperandNode.create(this.graph, mantisFn.getClass().getName() + "OUT");
        this.graph.putEdge(this.currNode, node, mantisFn);
        return new MantisStreamImpl(node, this.graph);
    }

    @Override
    public MantisStream<T> filter(FilterFunction<T> filterFn) {
        return this.updateGraph(filterFn);
    }

    @Override
    public <OUT> MantisStream<OUT> map(MapFunction<T, OUT> mapFn) {
        return this.updateGraph(mapFn);
    }

    @Override
    public <OUT> MantisStream<OUT> flatMap(FlatMapFunction<T, OUT> flatMapFn) {
        return this.updateGraph(flatMapFn);
    }

    @Override
    public MantisStream<T> materialize() {
        return this.materializeInternal();
    }

    private MantisStreamImpl<T> materializeInternal() {
        this.graph.putEdge(this.currNode, this.currNode, MantisFunction.empty());
        return new MantisStreamImpl<T>(this.currNode, this.graph);
    }

    private <K> KeyedMantisStreamImpl<K, T> keyByInternal(KeyByFunction<K, T> keyFn) {
        OperandNode node = OperandNode.create(this.graph, "keyByOut");
        this.graph.putEdge(this.currNode, node, keyFn);
        return new KeyedMantisStreamImpl(node, this.graph);
    }

    @Override
    public <K> KeyedMantisStream<K, T> keyBy(KeyByFunction<K, T> keyFn) {
        return super.keyByInternal(keyFn);
    }

    private MantisJobBuilder makeMantisJob(ImmutableValueGraph<OperandNode<?>, MantisFunction> graphDag, Iterable<OperandNode<?>> operandNodes) {
        MantisJobBuilder jobBuilder = new MantisJobBuilder();
        AtomicReference composite = new AtomicReference(new FunctionCombinator(false));
        for (OperandNode<?> n : operandNodes) {
            Set successorsNodes = graphDag.successors(n);
            if (successorsNodes.size() == 0) continue;
            Optional selfEdge = graphDag.edgeValue(n, n);
            Integer numSelfEdges = selfEdge.map((? super T x) -> 1).orElse(0);
            selfEdge.ifPresent(mantisFn -> {
                if (MantisFunction.empty().equals(mantisFn)) {
                    jobBuilder.addStage(((FunctionCombinator)composite.get()).makeStage(), n.getCodec(), n.getKeyCodec());
                    composite.set(new FunctionCombinator(false));
                } else if (mantisFn instanceof WindowFunction) {
                    composite.set(((FunctionCombinator)composite.get()).add((MantisFunction)mantisFn));
                }
            });
            if (successorsNodes.size() - numSelfEdges > 1) {
                log.warn("Found multi-output node {} with nbrs {}. Not supported yet!", n, (Object)successorsNodes);
            }
            for (OperandNode successorsNode : successorsNodes) {
                if (successorsNode == n) continue;
                graphDag.edgeValue(n, (Object)successorsNode).ifPresent(mantisFn -> {
                    if (mantisFn instanceof SourceFunction) {
                        if (mantisFn instanceof ObservableSourceImpl) {
                            jobBuilder.addStage(((ObservableSourceImpl)mantisFn).getSource());
                        }
                    } else if (mantisFn instanceof KeyByFunction) {
                        if (((FunctionCombinator)composite.get()).size() > 0) {
                            log.warn("Unempty composite found for KeyByFunction {}", composite.get());
                        }
                        jobBuilder.addStage(this.makeGroupComputation((KeyByFunction)mantisFn), n.getCodec(), n.getKeyCodec());
                        composite.set(new FunctionCombinator(true));
                    } else if (mantisFn instanceof SinkFunction) {
                        jobBuilder.addStage(((FunctionCombinator)composite.get()).makeStage(), n.getCodec());
                        if (mantisFn instanceof ObservableSinkImpl) {
                            jobBuilder.addStage(((ObservableSinkImpl)mantisFn).getSink());
                        }
                    } else {
                        composite.set(((FunctionCombinator)composite.get()).add((MantisFunction)mantisFn));
                    }
                });
            }
        }
        return jobBuilder;
    }

    private <A, K> Computation makeGroupComputation(final KeyByFunction<K, A> keyFn) {
        return new ToGroupComputation<A, K, A>(){

            @Override
            public void init(Context ctx) {
                keyFn.init();
            }

            public Observable<MantisGroup<K, A>> call(Context ctx, Observable<A> obs) {
                return obs.map(e -> new MantisGroup(keyFn.getKey(e), e));
            }
        };
    }

    static <V, E> Iterable<V> topSortTraversal(ImmutableValueGraph<V, E> graphDag) {
        Set nodes = graphDag.nodes();
        Map<Object, AtomicInteger> inDegreeMap = nodes.stream().collect(Collectors.toMap(x -> x, x -> new AtomicInteger(graphDag.inDegree(x) - (graphDag.hasEdgeConnecting(x, x) ? 1 : 0))));
        ArrayList<Object> nodeOrder = new ArrayList<Object>();
        HashSet<Object> visited = new HashSet<Object>();
        List<Object> starts = inDegreeMap.keySet().stream().filter((? super T x) -> ((AtomicInteger)inDegreeMap.get(x)).get() == 0).collect(Collectors.toList());
        while (!starts.isEmpty()) {
            starts.forEach(x -> graphDag.successors(x).forEach(nbr -> {
                if (nbr != x) {
                    ((AtomicInteger)inDegreeMap.get(nbr)).decrementAndGet();
                }
            }));
            visited.addAll(starts);
            nodeOrder.addAll(starts);
            starts = starts.stream().flatMap((? super T x) -> graphDag.successors(x).stream()).filter((? super T x) -> !visited.contains(x) && ((AtomicInteger)inDegreeMap.get(x)).get() == 0).collect(Collectors.toList());
        }
        return nodeOrder;
    }
}

