package com.netflix.atlas.eval.stream;

import com.fasterxml.jackson.module.scala.JavaTypeable$;
import com.netflix.atlas.core.model.DataExpr;
import com.netflix.atlas.core.model.Expr;
import com.netflix.atlas.core.model.Query$;
import com.netflix.atlas.eval.model.AggrDatapoint;
import com.netflix.atlas.eval.model.AggrDatapoint$;
import com.netflix.atlas.eval.model.AggrDatapoint$AggregatorSettings$;
import com.netflix.atlas.eval.model.AggrValuesInfo;
import com.netflix.atlas.eval.model.AggrValuesInfo$;
import com.netflix.atlas.eval.model.ExprType;
import com.netflix.atlas.eval.model.LwcExpression;
import com.netflix.atlas.eval.model.LwcExpression$;
import com.netflix.atlas.eval.model.LwcMessages$;
import com.netflix.atlas.eval.model.TimeGroup;
import com.netflix.atlas.eval.model.TimeGroup$;
import com.netflix.atlas.eval.model.TimeGroupsTuple;
import com.netflix.atlas.eval.stream.EddaSource;
import com.netflix.atlas.eval.stream.Evaluator;
import com.netflix.atlas.json.Json$;
import com.netflix.atlas.json.JsonSupport;
import com.netflix.atlas.pekko.ClusterOps;
import com.netflix.atlas.pekko.ClusterOps$;
import com.netflix.atlas.pekko.ClusterOps$Cluster$;
import com.netflix.atlas.pekko.ClusterOps$Data$;
import com.netflix.atlas.pekko.ClusterOps$GroupByContext$;
import com.netflix.atlas.pekko.StreamOps;
import com.netflix.atlas.pekko.StreamOps$;
import com.netflix.atlas.pekko.ThreadPools$;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.typesafe.config.Config;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pekko.NotUsed;
import org.apache.pekko.NotUsed$;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.http.scaladsl.Http$;
import org.apache.pekko.http.scaladsl.HttpExt;
import org.apache.pekko.http.scaladsl.model.Uri;
import org.apache.pekko.http.scaladsl.model.Uri$;
import org.apache.pekko.http.scaladsl.model.ws.BinaryMessage;
import org.apache.pekko.http.scaladsl.model.ws.BinaryMessage$;
import org.apache.pekko.http.scaladsl.model.ws.BinaryMessage$Strict$;
import org.apache.pekko.http.scaladsl.model.ws.TextMessage;
import org.apache.pekko.http.scaladsl.model.ws.WebSocketRequest$;
import org.apache.pekko.stream.FlowShape$;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.OverflowStrategy$;
import org.apache.pekko.stream.ThrottleMode$Shaping$;
import org.apache.pekko.stream.UniformFanInShape;
import org.apache.pekko.stream.UniformFanOutShape;
import org.apache.pekko.stream.scaladsl.Broadcast$;
import org.apache.pekko.stream.scaladsl.BroadcastHub$;
import org.apache.pekko.stream.scaladsl.FileIO$;
import org.apache.pekko.stream.scaladsl.Flow;
import org.apache.pekko.stream.scaladsl.Flow$;
import org.apache.pekko.stream.scaladsl.GraphDSL$;
import org.apache.pekko.stream.scaladsl.GraphDSL$Implicits$;
import org.apache.pekko.stream.scaladsl.Keep$;
import org.apache.pekko.stream.scaladsl.Merge$;
import org.apache.pekko.stream.scaladsl.Sink$;
import org.apache.pekko.stream.scaladsl.Source;
import org.apache.pekko.stream.scaladsl.Source$;
import org.apache.pekko.util.ByteString;
import org.apache.pekko.util.ByteString$;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.$less$colon$less$;
import scala.Function2;
import scala.MatchError;
import scala.PartialFunction;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.SeqOps;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future$;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.FiniteDuration$;
import scala.concurrent.duration.package;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

/* compiled from: EvaluatorImpl.scala */
/* loaded from: input_file:com/netflix/atlas/eval/stream/EvaluatorImpl.class */
public abstract class EvaluatorImpl {
    private final Config config;
    private final Registry registry;
    private final ActorSystem system;
    private final Materializer materializer;
    private final long uniqueTimeout;
    private final int lwcapiVersion;
    private final Counter badMessages;
    private final ExecutionContext parsingEC;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final StreamContext validationStreamContext = newStreamContext(newStreamContext$default$1());
    private final int parsingNumThreads = scala.math.package$.MODULE$.max(Runtime.getRuntime().availableProcessors() / 2, 2);

    public EvaluatorImpl(Config config, Registry registry, ActorSystem actorSystem, Materializer materializer) {
        this.config = config;
        this.registry = registry;
        this.system = actorSystem;
        this.materializer = materializer;
        this.uniqueTimeout = config.getDuration("atlas.eval.stream.unique-timeout").toMillis();
        this.lwcapiVersion = config.getInt("atlas.eval.stream.lwcapi-version");
        this.badMessages = registry.counter("atlas.eval.badMessages");
        this.parsingEC = ThreadPools$.MODULE$.fixedSize(registry, "AtlasEvalParsing", this.parsingNumThreads);
    }

    public ActorSystem system() {
        return this.system;
    }

    public Materializer materializer() {
        return this.materializer;
    }

    private StreamContext newStreamContext(Function2<Evaluator.DataSource, JsonSupport, BoxedUnit> function2) {
        Config config = this.config;
        HttpExt apply = Http$.MODULE$.apply(system());
        return new StreamContext(config, apply.superPool(apply.superPool$default$1(), apply.superPool$default$2(), apply.superPool$default$3()), materializer(), this.registry, function2);
    }

    private Function2<Evaluator.DataSource, JsonSupport, BoxedUnit> newStreamContext$default$1() {
        return (dataSource, jsonSupport) -> {
        };
    }

    public void validateImpl(Evaluator.DataSource dataSource) {
        this.validationStreamContext.validateDataSource(dataSource).get();
    }

    public void writeInputToFileImpl(String str, Path path, Duration duration) {
        writeInputToFileImpl(Uri$.MODULE$.apply(str), path, FiniteDuration$.MODULE$.apply(duration.toMillis(), TimeUnit.MILLISECONDS));
    }

    public void writeInputToFileImpl(Uri uri, Path path, FiniteDuration finiteDuration) {
        StreamRef run = EvaluationFlows$.MODULE$.run(Source$.MODULE$.apply(new $colon.colon(Evaluator.DataSources.of(new Evaluator.DataSource("_", uri.toString())), Nil$.MODULE$)).via(createInputFlow(newStreamContext(newStreamContext$default$1()))), Flow$.MODULE$.apply().map(obj -> {
            return ByteString$.MODULE$.apply(Json$.MODULE$.encode(obj, JavaTypeable$.MODULE$.gen0JavaTypeable(ClassTag$.MODULE$.apply(Object.class))));
        }).map(byteString -> {
            return (ByteString) byteString.filterNot(obj2 -> {
                return $anonfun$2$$anonfun$1(BoxesRunTime.unboxToByte(obj2));
            });
        }).filterNot(byteString2 -> {
            return byteString2.isEmpty();
        }).map(byteString3 -> {
            return byteString3.$plus$plus(ByteString$.MODULE$.apply("\n\n"));
        }).toMat(FileIO$.MODULE$.toPath(path, (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new OpenOption[]{StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING}))), Keep$.MODULE$.right()), materializer());
        try {
            Await$.MODULE$.ready((Awaitable) run.value(), finiteDuration);
        } catch (TimeoutException unused) {
            run.killSwitch().shutdown();
            Await$.MODULE$.ready((Awaitable) run.value(), finiteDuration);
        }
    }

    public Publisher<JsonSupport> createPublisherImpl(String str) {
        return createPublisherImpl(Uri$.MODULE$.apply(str));
    }

    public Publisher<JsonSupport> createPublisherImpl(Uri uri) {
        Evaluator.DataSources of = Evaluator.DataSources.of(new Evaluator.DataSource("_", uri.toString()));
        return (Publisher) (uri.scheme().startsWith("http") ? (Source) Source$.MODULE$.repeat(of).throttle(1, new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(1)).minute(), 1, ThrottleMode$Shaping$.MODULE$) : Source$.MODULE$.single(of)).via(createProcessorFlow()).map(messageEnvelope -> {
            return messageEnvelope.message();
        }).toMat(Sink$.MODULE$.asPublisher(true), Keep$.MODULE$.right()).run(materializer());
    }

    public Processor<Evaluator.DataSources, Evaluator.MessageEnvelope> createStreamsProcessorImpl() {
        return (Processor) createStreamsFlow().toProcessor().run(materializer());
    }

    public Flow<Evaluator.DataSources, Evaluator.MessageEnvelope, NotUsed> createStreamsFlow() {
        return (Flow) Flow$.MODULE$.apply().map(dataSources -> {
            return groupByHost(dataSources);
        }).via(new FillRemovedKeysWith(str -> {
            return Evaluator.DataSources.empty();
        })).flatMapMerge(Integer.MAX_VALUE, map -> {
            return Source$.MODULE$.apply(map.toList());
        }).groupBy(Integer.MAX_VALUE, tuple2 -> {
            return (String) tuple2._1();
        }, true).map(tuple22 -> {
            return (Evaluator.DataSources) tuple22._2();
        }).via(createProcessorFlow()).mergeSubstreams();
    }

    public Map<String, Evaluator.DataSources> groupByHost(Evaluator.DataSources dataSources) {
        return CollectionConverters$.MODULE$.SetHasAsScala(dataSources.sources()).asScala().groupBy(dataSource -> {
            return getHost(dataSource);
        }).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            String str = (String) tuple2._1();
            scala.collection.mutable.Set set = (scala.collection.mutable.Set) tuple2._2();
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc(str), new Evaluator.DataSources(CollectionConverters$.MODULE$.MutableSetHasAsJava(set).asJava()));
        });
    }

    private String getHost(Evaluator.DataSource dataSource) {
        return dataSource.isLocal() ? "_" : Uri$.MODULE$.apply(dataSource.uri()).authority().host().address();
    }

    public Flow<Evaluator.DataSources, Evaluator.MessageEnvelope, NotUsed> createProcessorFlow() {
        Tuple2 tuple2 = (Tuple2) StreamOps$.MODULE$.blockingQueue(this.registry, "DataSourceLogger", 10).toMat(BroadcastHub$.MODULE$.sink(1), Keep$.MODULE$.both()).run(materializer());
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        Tuple2 apply = Tuple2$.MODULE$.apply((StreamOps.SourceQueue) tuple2._1(), (Source) tuple2._2());
        StreamOps.SourceQueue sourceQueue = (StreamOps.SourceQueue) apply._1();
        Source source = (Source) apply._2();
        StreamContext newStreamContext = newStreamContext((dataSource, jsonSupport) -> {
            sourceQueue.offer(new Evaluator.MessageEnvelope(dataSource.id(), jsonSupport));
        });
        return ((Flow) Flow$.MODULE$.apply().map(dataSources -> {
            return ReplayLogging$.MODULE$.log(dataSources);
        }).map(dataSources2 -> {
            Evaluator.DataSources validate = newStreamContext.validate(dataSources2);
            newStreamContext.setDataSources(validate);
            return validate;
        }).via(GraphDSL$.MODULE$.create(builder -> {
            UniformFanOutShape add = builder.add(Broadcast$.MODULE$.apply(2, Broadcast$.MODULE$.apply$default$2()));
            UniformFanInShape add2 = builder.add(Merge$.MODULE$.apply(2, Merge$.MODULE$.apply$default$2()));
            GraphDSL$Implicits$.MODULE$.port2flow(add.out(0), builder).buffer(1, OverflowStrategy$.MODULE$.dropTail()).$tilde$greater(((Flow) createInputFlow(newStreamContext).via(newStreamContext.monitorFlow("10_InputBatches")).via(new LwcToAggrDatapoint(newStreamContext)).flatMapConcat(datapointsTuple -> {
                return Source$.MODULE$.apply(datapointsTuple.groupByStep());
            }).groupBy(Integer.MAX_VALUE, datapointsTuple2 -> {
                return datapointsTuple2.step();
            }, true).via(new TimeGrouped(newStreamContext)).mergeSubstreams()).via(newStreamContext.monitorFlow("11_GroupedDatapoints")), builder).$tilde$greater(add2.in(0), builder);
            GraphDSL$Implicits$.MODULE$.port2flow(add.out(1), builder).buffer(1, OverflowStrategy$.MODULE$.dropTail()).$tilde$greater(add2.in(1), builder);
            return FlowShape$.MODULE$.apply(add.in(), add2.out());
        })).flatMapConcat(obj -> {
            return Source$.MODULE$.apply(splitByStep(obj));
        }).groupBy(Integer.MAX_VALUE, stepSize(), true).via(new FinalExprEval(newStreamContext.interpreter())).mergeSubstreams()).via(newStreamContext.monitorFlow("12_OutputSources")).flatMapConcat(source2 -> {
            return source2;
        }).via(newStreamContext.monitorFlow("13_OutputMessages")).via(new OnUpstreamFinish(() -> {
            sourceQueue.complete();
            return BoxedUnit.UNIT;
        })).merge(source, false);
    }

    public Processor<Evaluator.DatapointGroup, Evaluator.MessageEnvelope> createDatapointProcessorImpl(Evaluator.DataSources dataSources) {
        long stepSize = dataSources.stepSize();
        Tuple2 tuple2 = (Tuple2) StreamOps$.MODULE$.blockingQueue(this.registry, "DataSourceLogger", 10).toMat(BroadcastHub$.MODULE$.sink(1), Keep$.MODULE$.both()).run(materializer());
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        Tuple2 apply = Tuple2$.MODULE$.apply((StreamOps.SourceQueue) tuple2._1(), (Source) tuple2._2());
        StreamOps.SourceQueue sourceQueue = (StreamOps.SourceQueue) apply._1();
        Source source = (Source) apply._2();
        StreamContext newStreamContext = newStreamContext((dataSource, jsonSupport) -> {
            sourceQueue.offer(new Evaluator.MessageEnvelope(dataSource.id(), jsonSupport));
        });
        newStreamContext.validate(dataSources);
        newStreamContext.setDataSources(dataSources);
        ExprInterpreter interpreter = newStreamContext.interpreter();
        List list = (List) ((IterableOnceOps) ((IterableOps) CollectionConverters$.MODULE$.SetHasAsScala(dataSources.sources()).asScala().flatMap(dataSource2 -> {
            return interpreter.eval(Uri$.MODULE$.apply(dataSource2.uri())).exprs();
        })).flatMap(styleExpr -> {
            return styleExpr.expr().dataExprs();
        })).toList().distinct();
        return (Processor) Flow$.MODULE$.apply().map(datapointGroup -> {
            return toTimeGroup(stepSize, list, datapointGroup, newStreamContext);
        }).merge(Source$.MODULE$.single(dataSources), false).via(new FinalExprEval(interpreter)).flatMapConcat(source2 -> {
            return source2;
        }).via(new OnUpstreamFinish(() -> {
            sourceQueue.complete();
            return BoxedUnit.UNIT;
        })).merge(source, false).toProcessor().run(materializer());
    }

    private TimeGroup toTimeGroup(long j, List<DataExpr> list, Evaluator.DatapointGroup datapointGroup, StreamContext streamContext) {
        AggrDatapoint.AggregatorSettings apply = AggrDatapoint$AggregatorSettings$.MODULE$.apply(streamContext.maxInputDatapointsPerExpression(), streamContext.maxIntermediateDatapointsPerExpression(), streamContext.registry());
        return TimeGroup$.MODULE$.apply(datapointGroup.timestamp(), j, ((IterableOps) ((IterableOps) CollectionConverters$.MODULE$.ListHasAsScala(datapointGroup.datapoints()).asScala().zipWithIndex()).flatMap(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Evaluator.Datapoint datapoint = (Evaluator.Datapoint) tuple2._1();
            int unboxToInt = BoxesRunTime.unboxToInt(tuple2._2());
            scala.collection.immutable.Map map = CollectionConverters$.MODULE$.MapHasAsScala(datapoint.tags()).asScala().toMap($less$colon$less$.MODULE$.refl());
            return list.filter(dataExpr -> {
                return dataExpr.query().matches(map);
            }).map(dataExpr2 -> {
                Set $plus$plus = Query$.MODULE$.exactKeys(dataExpr2.query()).$plus$plus(dataExpr2.finalGrouping());
                scala.collection.immutable.Map<String, String> map2 = (scala.collection.immutable.Map) map.filter(tuple2 -> {
                    return $plus$plus.contains(tuple2._1());
                });
                double value = datapoint.value();
                return AggrDatapoint$.MODULE$.apply(datapointGroup.timestamp(), j, dataExpr2, BoxesRunTime.boxToInteger(unboxToInt).toString(), map2, (!isCount(dataExpr2) || Predef$.MODULE$.double2Double(value).isNaN()) ? value : 1.0d);
            });
        })).groupBy(aggrDatapoint -> {
            return aggrDatapoint.expr();
        }).map(tuple22 -> {
            AggrValuesInfo apply2;
            DataExpr dataExpr = (DataExpr) Predef$.MODULE$.ArrowAssoc(tuple22._1());
            Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
            Some aggregate = AggrDatapoint$.MODULE$.aggregate(((IterableOnceOps) tuple22._2()).toList(), apply);
            if (aggregate instanceof Some) {
                AggrDatapoint.Aggregator aggregator = (AggrDatapoint.Aggregator) aggregate.value();
                if (aggregator.limitExceeded()) {
                    streamContext.logDatapointsExceeded(datapointGroup.timestamp(), ((DataExpr) tuple22._1()).toString());
                    apply2 = AggrValuesInfo$.MODULE$.apply(scala.package$.MODULE$.Nil(), ((SeqOps) tuple22._2()).size());
                } else {
                    apply2 = AggrValuesInfo$.MODULE$.apply(aggregator.datapoints(), ((SeqOps) tuple22._2()).size());
                }
            } else {
                apply2 = AggrValuesInfo$.MODULE$.apply(scala.package$.MODULE$.Nil(), ((SeqOps) tuple22._2()).size());
            }
            return predef$ArrowAssoc$.$minus$greater$extension(dataExpr, apply2);
        }));
    }

    private boolean isCount(DataExpr dataExpr) {
        DataExpr dataExpr2;
        while (true) {
            dataExpr2 = dataExpr;
            if (!(dataExpr2 instanceof DataExpr.GroupBy)) {
                if (!(dataExpr2 instanceof DataExpr.Consolidation)) {
                    break;
                }
                dataExpr = ((DataExpr.Consolidation) dataExpr2).af();
            } else {
                dataExpr = ((DataExpr.GroupBy) dataExpr2).af();
            }
        }
        return dataExpr2 instanceof DataExpr.Count;
    }

    private PartialFunction<Object, Object> stepSize() {
        return new EvaluatorImpl$$anon$1();
    }

    private List<Object> splitByStep(Object obj) {
        return obj instanceof Evaluator.DataSources ? ((IterableOnceOps) CollectionConverters$.MODULE$.SetHasAsScala(((Evaluator.DataSources) obj).sources()).asScala().groupBy(dataSource -> {
            return dataSource.step().toMillis();
        }).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return new Evaluator.DataSources(CollectionConverters$.MODULE$.MutableSetHasAsJava((scala.collection.mutable.Set) tuple2._2()).asJava());
        })).toList() : obj instanceof TimeGroupsTuple ? ((TimeGroupsTuple) obj).groupByStep() : new $colon.colon<>(obj, Nil$.MODULE$);
    }

    public Flow<Evaluator.DataSources, List<Object>, NotUsed> createInputFlow(StreamContext streamContext) {
        return Flow$.MODULE$.apply().via(GraphDSL$.MODULE$.create(builder -> {
            UniformFanOutShape add = builder.add(Broadcast$.MODULE$.apply(2, Broadcast$.MODULE$.apply$default$2()));
            UniformFanInShape add2 = builder.add(Merge$.MODULE$.apply(2, Merge$.MODULE$.apply$default$2()));
            Flow via = createClusterLookupFlow(streamContext).via(createClusterStreamFlow(streamContext));
            Flow map = Flow$.MODULE$.apply().flatMapMerge(Integer.MAX_VALUE, dataSources -> {
                return Source$.MODULE$.apply(CollectionConverters$.MODULE$.SetHasAsScala(dataSources.sources()).asScala().toList());
            }).flatMapMerge(Integer.MAX_VALUE, dataSource -> {
                return streamContext.localSource(Uri$.MODULE$.apply(dataSource.uri()));
            }).map(byteString -> {
                return parseMessage(byteString);
            });
            GraphDSL$Implicits$.MODULE$.port2flow(add.out(0), builder).map(dataSources2 -> {
                return dataSources2.remoteOnly();
            }).$tilde$greater(via, builder).$tilde$greater(add2.in(0), builder);
            GraphDSL$Implicits$.MODULE$.port2flow(add.out(1), builder).map(dataSources3 -> {
                return dataSources3.localOnly();
            }).$tilde$greater(map, builder).$tilde$greater(add2.in(1), builder);
            return FlowShape$.MODULE$.apply(add.in(), add2.out());
        }));
    }

    private Flow<Evaluator.DataSources, Tuple2<Evaluator.DataSources, EddaSource.Groups>, NotUsed> createClusterLookupFlow(StreamContext streamContext) {
        return Flow$.MODULE$.apply().conflate((dataSources, dataSources2) -> {
            return dataSources2;
        }).throttle(1, new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(1)).second(), 1, ThrottleMode$Shaping$.MODULE$).via(streamContext.monitorFlow("00_DataSourceUpdates")).via(new EddaGroupsLookup(streamContext, new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(30)).seconds())).via(streamContext.monitorFlow("01_EurekaGroups")).flatMapMerge(Integer.MAX_VALUE, source -> {
            return source;
        }).via(streamContext.monitorFlow("01_EurekaRefresh"));
    }

    private Flow<Tuple2<Evaluator.DataSources, EddaSource.Groups>, List<Object>, NotUsed> createClusterStreamFlow(StreamContext streamContext) {
        return Flow$.MODULE$.apply().via(StreamOps$.MODULE$.unique(StreamOps$.MODULE$.unique$default$1(), StreamOps$.MODULE$.unique$default$2())).flatMapConcat(tuple2 -> {
            Set set = ((EddaSource.Groups) tuple2._2()).groups().flatMap(groupResponse -> {
                return groupResponse.instances();
            }).toSet();
            Set<LwcExpression> exprSet = toExprSet((Evaluator.DataSources) tuple2._1(), streamContext.interpreter());
            return Source$.MODULE$.apply(new $colon.colon(ClusterOps$Cluster$.MODULE$.apply(set), new $colon.colon(ClusterOps$Data$.MODULE$.apply(((IterableOnceOps) set.map(instance -> {
                return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((EddaSource.Instance) Predef$.MODULE$.ArrowAssoc(instance), exprSet);
            })).toMap($less$colon$less$.MODULE$.refl())), Nil$.MODULE$)));
        }).via(StreamOps$.MODULE$.repeatLastReceived(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(5)).seconds())).via(ClusterOps$.MODULE$.groupBy(createGroupByContext(streamContext))).mapAsync(this.parsingNumThreads, byteString -> {
            return Future$.MODULE$.apply(() -> {
                return r1.createClusterStreamFlow$$anonfun$2$$anonfun$1(r2);
            }, this.parsingEC);
        });
    }

    private ClusterOps.GroupByContext<EddaSource.Instance, Set<LwcExpression>, ByteString> createGroupByContext(StreamContext streamContext) {
        return ClusterOps$GroupByContext$.MODULE$.apply(instance -> {
            return createWebSocketFlow(instance);
        }, this.registry, ClusterOps$GroupByContext$.MODULE$.$lessinit$greater$default$3(), 10);
    }

    private Set<LwcExpression> toExprSet(Evaluator.DataSources dataSources, ExprInterpreter exprInterpreter) {
        return ((IterableOnceOps) CollectionConverters$.MODULE$.SetHasAsScala(dataSources.sources()).asScala().flatMap(dataSource -> {
            Tuple2<ExprType, List<Expr>> parseQuery = exprInterpreter.parseQuery(Uri$.MODULE$.apply(dataSource.uri()));
            if (parseQuery == null) {
                throw new MatchError(parseQuery);
            }
            Tuple2 apply = Tuple2$.MODULE$.apply((ExprType) parseQuery._1(), (List) parseQuery._2());
            ExprType exprType = (ExprType) apply._1();
            return ((List) apply._2()).map(expr -> {
                return LwcExpression$.MODULE$.apply(expr.toString(), exprType, exprType.isTimeSeriesType() ? dataSource.step().toMillis() : 0L);
            });
        })).toSet();
    }

    private Flow<Set<LwcExpression>, ByteString, NotUsed> createWebSocketFlow(EddaSource.Instance instance) {
        String str = instance.substitute("ws://{ip}:{port}") + "/api/v" + this.lwcapiVersion + "/subscribe/" + UUID.randomUUID().toString();
        HttpExt apply = Http$.MODULE$.apply(system());
        return Flow$.MODULE$.apply().via(StreamOps$.MODULE$.unique(this.uniqueTimeout, StreamOps$.MODULE$.unique$default$2())).map(set -> {
            return BinaryMessage$.MODULE$.apply(LwcMessages$.MODULE$.encodeBatch(set.toSeq()));
        }).via(apply.webSocketClientFlow(WebSocketRequest$.MODULE$.apply(Uri$.MODULE$.apply(str), WebSocketRequest$.MODULE$.$lessinit$greater$default$2(), WebSocketRequest$.MODULE$.$lessinit$greater$default$3()), apply.webSocketClientFlow$default$2(), apply.webSocketClientFlow$default$3(), apply.webSocketClientFlow$default$4(), apply.webSocketClientFlow$default$5())).flatMapConcat(message -> {
            if (message instanceof TextMessage) {
                throw new MatchError("text messages are not supported");
            }
            if (message instanceof BinaryMessage.Strict) {
                return Source$.MODULE$.single(BinaryMessage$Strict$.MODULE$.unapply((BinaryMessage.Strict) message)._1());
            }
            if (message instanceof BinaryMessage) {
                return ((BinaryMessage) message).dataStream().fold(ByteString$.MODULE$.empty(), (byteString, byteString2) -> {
                    return byteString.$plus$plus(byteString2);
                });
            }
            throw new MatchError(message);
        }).mapMaterializedValue(notUsed -> {
            return NotUsed$.MODULE$;
        });
    }

    private List<Object> parseBatch(ByteString byteString) {
        try {
            ReplayLogging$.MODULE$.log(byteString);
            return LwcMessages$.MODULE$.parseBatch(byteString);
        } catch (Exception e) {
            this.logger.warn("failed to process message [" + byteString + "]", e);
            this.badMessages.increment();
            return scala.package$.MODULE$.List().empty();
        }
    }

    private List<Object> parseMessage(ByteString byteString) {
        try {
            ReplayLogging$.MODULE$.log(byteString);
            return new $colon.colon<>(LwcMessages$.MODULE$.parse(byteString), Nil$.MODULE$);
        } catch (Exception e) {
            this.logger.warn("failed to process message [" + toString(byteString) + "]", e);
            this.badMessages.increment();
            return scala.package$.MODULE$.List().empty();
        }
    }

    private String toString(ByteString byteString) {
        StringBuilder sb = new StringBuilder();
        byteString.foreach(obj -> {
            return toString$$anonfun$1(sb, BoxesRunTime.unboxToByte(obj));
        });
        return sb.toString();
    }

    private boolean isPrintable(int i) {
        return i >= 32 && i < 127;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final /* synthetic */ boolean $anonfun$2$$anonfun$1(byte b) {
        return b == 13;
    }

    private final List createClusterStreamFlow$$anonfun$2$$anonfun$1(ByteString byteString) {
        return parseBatch(byteString);
    }

    private final /* synthetic */ StringBuilder toString$$anonfun$1(StringBuilder sb, byte b) {
        int i = b & 255;
        return isPrintable(i) ? sb.append((char) i) : i <= 15 ? sb.append("\\x0").append(Integer.toHexString(i)) : sb.append("\\x").append(Integer.toHexString(i));
    }
}
