package org.eclipse.ditto.internal.utils.search;

import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.japi.pf.PFBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.RemoteStreamRefActorTerminatedException;
import org.apache.pekko.stream.SourceRef;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.Source;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.pekko.controlflow.ResumeSource;
import org.eclipse.ditto.internal.utils.pekko.controlflow.ResumeSourceBuilder;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse;
import org.eclipse.ditto.thingsearch.api.commands.sudo.StreamThings;
import org.eclipse.ditto.thingsearch.api.events.ThingsOutOfSync;

/* loaded from: input_file:org/eclipse/ditto/internal/utils/search/SearchSource.class */
public final class SearchSource {
    private static final ThreadSafeDittoLogger LOGGER = DittoLoggerFactory.getThreadSafeLogger((Class<?>) SearchSource.class);
    private final ActorRef pubSubMediator;
    private final ActorSelection commandForwarder;
    private final Duration thingsAskTimeout;
    private final Duration searchAskTimeout;

    @Nullable
    private final JsonFieldSelector fields;
    private final JsonFieldSelector sortFields;
    private final StreamThings streamThings;
    private final boolean thingIdOnly;
    private final String lastThingId;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SearchSource(ActorRef actorRef, ActorSelection actorSelection, Duration duration, Duration duration2, @Nullable JsonFieldSelector jsonFieldSelector, JsonFieldSelector jsonFieldSelector2, StreamThings streamThings, String str) {
        this.pubSubMediator = actorRef;
        this.commandForwarder = actorSelection;
        this.thingsAskTimeout = duration;
        this.searchAskTimeout = duration2;
        this.fields = jsonFieldSelector;
        this.sortFields = jsonFieldSelector2;
        this.streamThings = streamThings;
        this.thingIdOnly = jsonFieldSelector != null && jsonFieldSelector.getSize() == 1 && jsonFieldSelector.getPointers().contains(Thing.JsonFields.ID.getPointer());
        this.lastThingId = str;
    }

    public static SearchSourceBuilder newBuilder() {
        return new SearchSourceBuilder();
    }

    public Source<JsonObject, NotUsed> start(Consumer<ResumeSourceBuilder<?, ?>> consumer) {
        return startAsPair(consumer).map((v0) -> {
            return v0.second();
        });
    }

    public Source<Pair<String, JsonObject>, NotUsed> startAsPair(Consumer<ResumeSourceBuilder<?, ?>> consumer) {
        ResumeSourceBuilder<?, ?> mapError = ResumeSource.newBuilder().minBackoff(Duration.ofSeconds(1L)).maxBackoff(Duration.ofSeconds(20L)).recovery(Duration.ofSeconds(60L)).maxRestarts(25).initialSeed(this.lastThingId).resume(this::resume).nextSeed(this::nextSeed).mapError(this::mapError);
        consumer.accept(mapError);
        return mapError.build();
    }

    private Optional<Throwable> mapError(Throwable th) {
        if (!(th instanceof RemoteStreamRefActorTerminatedException)) {
            return Optional.of(DittoRuntimeException.asDittoRuntimeException(th, th2 -> {
                LOGGER.withCorrelationId(this.streamThings).error("Unexpected error", th2);
                return DittoInternalErrorException.newBuilder().build();
            }));
        }
        LOGGER.withCorrelationId(this.streamThings).info("Resuming from: {}", th.toString());
        return Optional.empty();
    }

    private Source<Pair<String, JsonObject>, NotUsed> resume(String str) {
        return streamThingsFrom(str).mapAsync(1, streamThings -> {
            return Patterns.ask(this.commandForwarder, streamThings, this.searchAskTimeout);
        }).via(expectMsgClass(SourceRef.class)).flatMapConcat((v0) -> {
            return v0.source();
        }).flatMapConcat(obj -> {
            return retrieveThingForElement((String) obj);
        });
    }

    private String nextSeed(List<Pair<String, JsonObject>> list) {
        return list.isEmpty() ? this.lastThingId : list.get(list.size() - 1).first();
    }

    private Source<StreamThings, NotUsed> streamThingsFrom(String str) {
        if (str.isEmpty()) {
            return Source.single(this.streamThings);
        }
        Source<JsonArray, NotUsed> retrieveSortValues = retrieveSortValues(str);
        StreamThings streamThings = this.streamThings;
        Objects.requireNonNull(streamThings);
        return retrieveSortValues.map(streamThings::setSortValues);
    }

    private Source<JsonArray, NotUsed> retrieveSortValues(String str) {
        return retrieveThing(str, this.sortFields).map(jsonObject -> {
            return (JsonArray) this.sortFields.getPointers().stream().map(jsonPointer -> {
                return jsonObject.getValue(jsonPointer).orElse(JsonFactory.nullLiteral());
            }).collect(JsonCollectors.valuesToArray());
        });
    }

    private Source<Pair<String, JsonObject>, NotUsed> retrieveThingForElement(String str) {
        return this.thingIdOnly ? Source.single(Pair.create(str, JsonObject.newBuilder().set((JsonFieldDefinition<JsonFieldDefinition<String>>) Thing.JsonFields.ID, (JsonFieldDefinition<String>) str).mo9562build())) : retrieveThing(str, this.fields).map(jsonObject -> {
            return Pair.create(str, jsonObject);
        }).recoverWithRetries(1, new PFBuilder().match(ThingNotAccessibleException.class, thingNotAccessibleException -> {
            this.pubSubMediator.tell(DistPubSubAccess.publishViaGroup(ThingsOutOfSync.TYPE, ThingsOutOfSync.of(Collections.singletonList(ThingId.of(str)), getDittoHeaders())), ActorRef.noSender());
            return Source.empty();
        }).build());
    }

    private Source<JsonObject, NotUsed> retrieveThing(String str, @Nullable JsonFieldSelector jsonFieldSelector) {
        return Source.completionStage(Patterns.ask(this.commandForwarder, RetrieveThing.getBuilder(ThingId.of(str), getDittoHeaders()).withSelectedFields(jsonFieldSelector).build(), this.thingsAskTimeout)).via(expectMsgClass(RetrieveThingResponse.class)).map(retrieveThingResponse -> {
            return retrieveThingResponse.getEntity().asObject();
        });
    }

    private DittoHeaders getDittoHeaders() {
        return this.streamThings.getDittoHeaders();
    }

    private <T> Flow<Object, T, NotUsed> expectMsgClass(Class<T> cls) {
        return Flow.create().flatMapConcat(obj -> {
            return cls.isInstance(obj) ? Source.single(cls.cast(obj)) : obj instanceof Throwable ? Source.failed((Throwable) obj) : Source.failed(new ClassCastException(String.format("Expect <%s>, got <%s>", cls.getCanonicalName(), obj)));
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -906279820:
                if (implMethodName.equals("second")) {
                    z = 8;
                    break;
                }
                break;
            case -896505829:
                if (implMethodName.equals("source")) {
                    z = 4;
                    break;
                }
                break;
            case -568454749:
                if (implMethodName.equals("lambda$expectMsgClass$ac26af2f$1")) {
                    z = 7;
                    break;
                }
                break;
            case -168667550:
                if (implMethodName.equals("setSortValues")) {
                    z = false;
                    break;
                }
                break;
            case -46182469:
                if (implMethodName.equals("lambda$retrieveThing$faa01eef$1")) {
                    z = 3;
                    break;
                }
                break;
            case 398018409:
                if (implMethodName.equals("lambda$retrieveSortValues$4fb26f98$1")) {
                    z = true;
                    break;
                }
                break;
            case 486762670:
                if (implMethodName.equals("lambda$resume$81e257cb$1")) {
                    z = 6;
                    break;
                }
                break;
            case 486762671:
                if (implMethodName.equals("lambda$resume$81e257cb$2")) {
                    z = 5;
                    break;
                }
                break;
            case 1112512140:
                if (implMethodName.equals("lambda$retrieveThingForElement$27842c4e$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pekko/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/thingsearch/api/commands/sudo/StreamThings") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/json/JsonArray;)Lorg/eclipse/ditto/thingsearch/api/commands/sudo/StreamThings;")) {
                    StreamThings streamThings = (StreamThings) serializedLambda.getCapturedArg(0);
                    return streamThings::setSortValues;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pekko/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/internal/utils/search/SearchSource") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/json/JsonObject;)Lorg/eclipse/ditto/json/JsonArray;")) {
                    SearchSource searchSource = (SearchSource) serializedLambda.getCapturedArg(0);
                    return jsonObject -> {
                        return (JsonArray) this.sortFields.getPointers().stream().map(jsonPointer -> {
                            return jsonObject.getValue(jsonPointer).orElse(JsonFactory.nullLiteral());
                        }).collect(JsonCollectors.valuesToArray());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pekko/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/internal/utils/search/SearchSource") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/eclipse/ditto/json/JsonObject;)Lorg/apache/pekko/japi/Pair;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    return jsonObject2 -> {
                        return Pair.create(str, jsonObject2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pekko/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/internal/utils/search/SearchSource") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/things/model/signals/commands/query/RetrieveThingResponse;)Lorg/eclipse/ditto/json/JsonObject;")) {
                    return retrieveThingResponse -> {
                        return retrieveThingResponse.getEntity().asObject();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pekko/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/pekko/stream/SourceRef") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/pekko/stream/scaladsl/Source;")) {
                    return (v0) -> {
                        return v0.source();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pekko/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/internal/utils/search/SearchSource") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    SearchSource searchSource2 = (SearchSource) serializedLambda.getCapturedArg(0);
                    return obj -> {
                        return retrieveThingForElement((String) obj);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pekko/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/internal/utils/search/SearchSource") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/thingsearch/api/commands/sudo/StreamThings;)Ljava/util/concurrent/CompletionStage;")) {
                    SearchSource searchSource3 = (SearchSource) serializedLambda.getCapturedArg(0);
                    return streamThings2 -> {
                        return Patterns.ask(this.commandForwarder, streamThings2, this.searchAskTimeout);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pekko/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/internal/utils/search/SearchSource") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Class;Ljava/lang/Object;)Lorg/apache/pekko/stream/Graph;")) {
                    Class cls = (Class) serializedLambda.getCapturedArg(0);
                    return obj2 -> {
                        return cls.isInstance(obj2) ? Source.single(cls.cast(obj2)) : obj2 instanceof Throwable ? Source.failed((Throwable) obj2) : Source.failed(new ClassCastException(String.format("Expect <%s>, got <%s>", cls.getCanonicalName(), obj2)));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pekko/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/pekko/japi/Pair") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.second();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
