package org.pragmaticminds.crunch.execution;

import akka.Done;
import akka.NotUsed;
import akka.japi.function.Function;
import akka.japi.function.Function2;
import akka.japi.function.Predicate;
import akka.stream.ClosedShape;
import akka.stream.SinkShape;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.GraphDSL;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletionStage;
import org.pragmaticminds.crunch.api.pipe.ChannelFilter;
import org.pragmaticminds.crunch.api.pipe.EvaluationFunction;
import org.pragmaticminds.crunch.api.pipe.EvaluationPipeline;
import org.pragmaticminds.crunch.api.pipe.RecordHandler;
import org.pragmaticminds.crunch.api.pipe.SubStream;
import org.pragmaticminds.crunch.api.records.MRecord;

/* loaded from: input_file:org/pragmaticminds/crunch/execution/GraphFactory.class */
class GraphFactory<T extends Serializable> {

    /* loaded from: input_file:org/pragmaticminds/crunch/execution/GraphFactory$MergeFunction.class */
    static class MergeFunction extends UntypedValuesMergeFunction implements Function<MRecord, MRecord> {
        MergeFunction() {
        }

        public MRecord apply(MRecord mRecord) {
            return merge(mRecord);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RunnableGraph<CompletionStage<Done>> create(MRecordSource mRecordSource, EvaluationPipeline<T> evaluationPipeline, EventSink<T> eventSink, Long l) {
        return RunnableGraph.fromGraph(GraphDSL.create(Sink.ignore(), buildGraph(Source.fromGraph(new MRecordSourceWrapper(mRecordSource)), evaluationPipeline, eventSink, l)));
    }

    private Function2<GraphDSL.Builder<CompletionStage<Done>>, SinkShape<Object>, ClosedShape> buildGraph(Source<MRecord, NotUsed> source, EvaluationPipeline<T> evaluationPipeline, EventSink<T> eventSink, Long l) {
        return (builder, sinkShape) -> {
            GraphDSL.Builder.ForwardOps from = builder.from(builder.add(source).out());
            for (SubStream<T> subStream : evaluationPipeline.getSubStreams()) {
                List<RecordHandler> recordHandlers = subStream.getRecordHandlers();
                recordHandlers.forEach((v0) -> {
                    v0.init();
                });
                List<EvaluationFunction<T>> evalFunctions = subStream.getEvalFunctions();
                evalFunctions.forEach((v0) -> {
                    v0.init();
                });
                from.via(builder.add(Flow.of(MRecord.class).filter(mRecord -> {
                    return subStream.getPredicate().validate(mRecord).booleanValue();
                }).filter(createChannelFilter(subStream)).map(new MergeFunction()))).via(builder.add(new SortGraphFlow(l))).via(builder.add(toFlow(recordHandlers, evalFunctions, eventSink))).to(sinkShape);
            }
            return ClosedShape.getInstance();
        };
    }

    private Predicate<MRecord> createChannelFilter(SubStream<T> subStream) {
        ChannelFilter channelFilter = new ChannelFilter(subStream);
        channelFilter.getClass();
        return channelFilter::filter;
    }

    private Flow<MRecord, MRecord, NotUsed> toFlow(List<RecordHandler> list, List<EvaluationFunction<T>> list2, EventSink<T> eventSink) {
        return Flow.of(MRecord.class).map(mRecord -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                ((RecordHandler) it.next()).apply(mRecord);
            }
            return mRecord;
        }).map(mRecord2 -> {
            EventSinkContext eventSinkContext = new EventSinkContext(eventSink);
            eventSinkContext.setCurrent(mRecord2);
            Iterator it = list2.iterator();
            while (it.hasNext()) {
                ((EvaluationFunction) it.next()).eval(eventSinkContext);
            }
            return mRecord2;
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2024326772:
                if (implMethodName.equals("lambda$toFlow$eb596bc7$1")) {
                    z = 4;
                    break;
                }
                break;
            case -1274492040:
                if (implMethodName.equals("filter")) {
                    z = false;
                    break;
                }
                break;
            case -1273205910:
                if (implMethodName.equals("lambda$toFlow$50b6d5dc$1")) {
                    z = 3;
                    break;
                }
                break;
            case -196939405:
                if (implMethodName.equals("lambda$null$5bf5e021$1")) {
                    z = true;
                    break;
                }
                break;
            case 168478459:
                if (implMethodName.equals("lambda$buildGraph$1ad53713$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Predicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("test") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/pragmaticminds/crunch/api/pipe/ChannelFilter") && serializedLambda.getImplMethodSignature().equals("(Lorg/pragmaticminds/crunch/api/records/MRecord;)Z")) {
                    ChannelFilter channelFilter = (ChannelFilter) serializedLambda.getCapturedArg(0);
                    return channelFilter::filter;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Predicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("test") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/pragmaticminds/crunch/execution/GraphFactory") && serializedLambda.getImplMethodSignature().equals("(Lorg/pragmaticminds/crunch/api/pipe/SubStream;Lorg/pragmaticminds/crunch/api/records/MRecord;)Z")) {
                    SubStream subStream = (SubStream) serializedLambda.getCapturedArg(0);
                    return mRecord -> {
                        return subStream.getPredicate().validate(mRecord).booleanValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/pragmaticminds/crunch/execution/GraphFactory") && serializedLambda.getImplMethodSignature().equals("(Lakka/stream/javadsl/Source;Lorg/pragmaticminds/crunch/api/pipe/EvaluationPipeline;Ljava/lang/Long;Lorg/pragmaticminds/crunch/execution/EventSink;Lakka/stream/javadsl/GraphDSL$Builder;Lakka/stream/SinkShape;)Lakka/stream/ClosedShape;")) {
                    GraphFactory graphFactory = (GraphFactory) serializedLambda.getCapturedArg(0);
                    Source source = (Source) serializedLambda.getCapturedArg(1);
                    EvaluationPipeline evaluationPipeline = (EvaluationPipeline) serializedLambda.getCapturedArg(2);
                    Long l = (Long) serializedLambda.getCapturedArg(3);
                    EventSink eventSink = (EventSink) serializedLambda.getCapturedArg(4);
                    return (builder, sinkShape) -> {
                        GraphDSL.Builder.ForwardOps from = builder.from(builder.add(source).out());
                        for (SubStream subStream2 : evaluationPipeline.getSubStreams()) {
                            List<RecordHandler> recordHandlers = subStream2.getRecordHandlers();
                            recordHandlers.forEach((v0) -> {
                                v0.init();
                            });
                            List<EvaluationFunction<T>> evalFunctions = subStream2.getEvalFunctions();
                            evalFunctions.forEach((v0) -> {
                                v0.init();
                            });
                            from.via(builder.add(Flow.of(MRecord.class).filter(mRecord2 -> {
                                return subStream2.getPredicate().validate(mRecord2).booleanValue();
                            }).filter(createChannelFilter(subStream2)).map(new MergeFunction()))).via(builder.add(new SortGraphFlow(l))).via(builder.add(toFlow(recordHandlers, evalFunctions, eventSink))).to(sinkShape);
                        }
                        return ClosedShape.getInstance();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/pragmaticminds/crunch/execution/GraphFactory") && serializedLambda.getImplMethodSignature().equals("(Lorg/pragmaticminds/crunch/execution/EventSink;Ljava/util/List;Lorg/pragmaticminds/crunch/api/records/MRecord;)Lorg/pragmaticminds/crunch/api/records/MRecord;")) {
                    EventSink eventSink2 = (EventSink) serializedLambda.getCapturedArg(0);
                    List list = (List) serializedLambda.getCapturedArg(1);
                    return mRecord2 -> {
                        EventSinkContext eventSinkContext = new EventSinkContext(eventSink2);
                        eventSinkContext.setCurrent(mRecord2);
                        Iterator it = list.iterator();
                        while (it.hasNext()) {
                            ((EvaluationFunction) it.next()).eval(eventSinkContext);
                        }
                        return mRecord2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/pragmaticminds/crunch/execution/GraphFactory") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;Lorg/pragmaticminds/crunch/api/records/MRecord;)Lorg/pragmaticminds/crunch/api/records/MRecord;")) {
                    List list2 = (List) serializedLambda.getCapturedArg(0);
                    return mRecord3 -> {
                        Iterator it = list2.iterator();
                        while (it.hasNext()) {
                            ((RecordHandler) it.next()).apply(mRecord3);
                        }
                        return mRecord3;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
