package org.eclipse.ditto.gateway.service.endpoints.actors;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ReceiveTimeout;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.ErrorResponse;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectivityInternalErrorException;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityErrorResponse;
import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionsAmountIllegalException;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveAllConnectionIds;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveAllConnectionIdsResponse;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnection;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnectionResponse;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnections;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnectionsResponse;
import org.eclipse.ditto.gateway.service.util.config.DittoGatewayConfig;
import org.eclipse.ditto.gateway.service.util.config.endpoints.CommandConfig;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonFieldSelector;

/* loaded from: input_file:org/eclipse/ditto/gateway/service/endpoints/actors/AbstractConnectionsRetrievalActor.class */
public abstract class AbstractConnectionsRetrievalActor extends AbstractActor {
    protected final ThreadSafeDittoLogger logger = DittoLoggerFactory.getThreadSafeLogger(getClass());
    protected final ActorRef edgeCommandForwarder;
    protected final ActorRef sender;
    protected final int connectionsRetrieveLimit;
    protected final Duration defaultTimeout;
    protected RetrieveConnections initialCommand;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractConnectionsRetrievalActor(ActorRef actorRef, ActorRef actorRef2) {
        this.edgeCommandForwarder = actorRef;
        this.sender = actorRef2;
        CommandConfig commandConfig = DittoGatewayConfig.of(DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config())).getCommandConfig();
        this.connectionsRetrieveLimit = commandConfig.connectionsRetrieveLimit();
        this.defaultTimeout = commandConfig.getDefaultTimeout();
        getContext().setReceiveTimeout(this.defaultTimeout);
    }

    @Override // org.apache.pekko.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(RetrieveConnections.class, this::handleRetrieveConnections).match(ReceiveTimeout.class, receiveTimeout -> {
            handleTimeout();
        }).matchAny(obj -> {
            this.logger.warn("Unknown message: <{}>", obj);
        }).build();
    }

    protected abstract void retrieveConnections(RetrieveConnections retrieveConnections);

    /* JADX INFO: Access modifiers changed from: protected */
    public void retrieveAllConnectionsIds(RetrieveConnections retrieveConnections) {
        this.edgeCommandForwarder.tell(RetrieveAllConnectionIds.of(retrieveConnections.getDittoHeaders()), getSelf());
    }

    protected void retrieveConnectionsById(RetrieveAllConnectionIdsResponse retrieveAllConnectionIdsResponse) {
        Set<String> allConnectionIds = retrieveAllConnectionIdsResponse.getAllConnectionIds();
        if (this.initialCommand.getIdsOnly()) {
            this.sender.tell(RetrieveConnectionsResponse.of(JsonArray.of((Iterable) allConnectionIds), this.initialCommand.getDittoHeaders()), getSelf());
            stop();
        } else if (allConnectionIds.size() <= this.connectionsRetrieveLimit) {
            retrieveConnections(allConnectionIds.stream().map((v0) -> {
                return ConnectionId.of(v0);
            }).toList(), this.initialCommand.getSelectedFields().orElse(null), this.initialCommand.getDittoHeaders()).thenAccept(retrieveConnectionsResponse -> {
                this.sender.tell(retrieveConnectionsResponse, getSelf());
                stop();
            }).exceptionally(th -> {
                this.sender.tell(ConnectivityErrorResponse.of(ConnectivityInternalErrorException.newBuilder().dittoHeaders(this.initialCommand.getDittoHeaders()).cause2(th).build(), this.initialCommand.getDittoHeaders()), getSelf());
                stop();
                return null;
            });
        } else {
            this.sender.tell(ConnectionsAmountIllegalException.newBuilder(this.connectionsRetrieveLimit).dittoHeaders(this.initialCommand.getDittoHeaders()).build(), getSelf());
            stop();
        }
    }

    private AbstractActor.Receive responseAwaitingBehavior() {
        return ReceiveBuilder.create().match(RetrieveAllConnectionIdsResponse.class, this::retrieveConnectionsById).match(ReceiveTimeout.class, receiveTimeout -> {
            handleTimeout();
        }).matchAny(obj -> {
            this.logger.warn("Unknown message: <{}>", obj);
        }).build();
    }

    private void handleRetrieveConnections(RetrieveConnections retrieveConnections) {
        getContext().become(responseAwaitingBehavior());
        this.initialCommand = retrieveConnections;
        retrieveConnections(retrieveConnections);
    }

    private CompletionStage<RetrieveConnectionsResponse> retrieveConnections(Collection<ConnectionId> collection, @Nullable JsonFieldSelector jsonFieldSelector, DittoHeaders dittoHeaders) {
        ConditionChecker.checkNotNull(collection, "connectionIds");
        ConditionChecker.checkNotNull(dittoHeaders, "dittoHeaders");
        List list = collection.parallelStream().map(connectionId -> {
            return retrieveConnection(RetrieveConnection.of(connectionId, jsonFieldSelector, dittoHeaders));
        }).map((v0) -> {
            return v0.toCompletableFuture();
        }).toList();
        ArrayList arrayList = new ArrayList(collection);
        Comparator comparator = (retrieveConnectionResponse, retrieveConnectionResponse2) -> {
            return Integer.compare(arrayList.indexOf(retrieveConnectionResponse.getEntityId()), arrayList.indexOf(retrieveConnectionResponse2.getEntityId()));
        };
        return CompletableFuture.allOf((CompletableFuture[]) list.toArray(new CompletableFuture[0])).thenApply(r4 -> {
            return list.stream().map((v0) -> {
                return v0.join();
            }).toList();
        }).thenApply((Function<? super U, ? extends U>) list2 -> {
            return RetrieveConnectionsResponse.of((JsonArray) list2.stream().sorted(comparator).map((v0) -> {
                return v0.getEntity();
            }).collect(JsonCollectors.valuesToArray()), dittoHeaders);
        });
    }

    private CompletionStage<RetrieveConnectionResponse> retrieveConnection(RetrieveConnection retrieveConnection) {
        return askConnectivity(retrieveConnection);
    }

    private CompletionStage<RetrieveConnectionResponse> askConnectivity(RetrieveConnection retrieveConnection) {
        ThreadSafeDittoLogger withCorrelationId = this.logger.withCorrelationId(retrieveConnection);
        withCorrelationId.debug("Sending command <{}> to connectivity.service.", retrieveConnection);
        return Patterns.ask(this.edgeCommandForwarder, ensureCommandHasCorrelationId(retrieveConnection), this.initialCommand.getDittoHeaders().getTimeout().orElse(this.defaultTimeout)).thenApply(obj -> {
            withCorrelationId.debug("Received response <{}> from connectivity service", obj);
            throwCauseIfErrorResponse(obj);
            throwCauseIfDittoRuntimeException(obj);
            RetrieveConnectionResponse retrieveConnectionResponse = (RetrieveConnectionResponse) mapToType(obj, RetrieveConnectionResponse.class, retrieveConnection);
            withCorrelationId.info("Received response of type <{}> for id <{}> from connectivity service", retrieveConnectionResponse.getType(), retrieveConnectionResponse.getEntityId());
            return retrieveConnectionResponse;
        });
    }

    /* JADX WARN: Type inference failed for: r0v7, types: [org.eclipse.ditto.base.model.signals.commands.Command, org.eclipse.ditto.base.model.signals.commands.Command<?>] */
    /* JADX WARN: Type inference failed for: r1v2, types: [org.eclipse.ditto.base.model.headers.DittoHeadersBuilder] */
    private static Command<?> ensureCommandHasCorrelationId(Command<?> command) {
        DittoHeaders dittoHeaders = command.getDittoHeaders();
        return dittoHeaders.getCorrelationId().isPresent() ? command : command.setDittoHeaders(dittoHeaders.toBuilder().randomCorrelationId().build());
    }

    private static void throwCauseIfErrorResponse(Object obj) {
        if (obj instanceof ErrorResponse) {
            throw ((ErrorResponse) obj).getDittoRuntimeException();
        }
    }

    private static void throwCauseIfDittoRuntimeException(Object obj) {
        if (obj instanceof DittoRuntimeException) {
            throw ((DittoRuntimeException) obj);
        }
    }

    private static <T> T mapToType(Object obj, Class<T> cls, WithDittoHeaders withDittoHeaders) {
        Class<?> cls2 = obj.getClass();
        if (cls.isAssignableFrom(cls2)) {
            return cls.cast(obj);
        }
        throw ConnectivityInternalErrorException.newBuilder().dittoHeaders(withDittoHeaders.getDittoHeaders()).cause2(new IllegalArgumentException(String.format("Expected <%s> response but got <%s>!", cls.getSimpleName(), cls2))).build();
    }

    protected void handleTimeout() {
        ConnectivityErrorResponse of;
        ConnectivityInternalErrorException.Builder newBuilder = ConnectivityInternalErrorException.newBuilder();
        if (this.initialCommand != null) {
            newBuilder.dittoHeaders(this.initialCommand.getDittoHeaders()).message("RetrieveConnections command timed out.");
            of = ConnectivityErrorResponse.of(newBuilder.build(), this.initialCommand.getDittoHeaders());
        } else {
            newBuilder.message("Actor time out. command timed out.");
            of = ConnectivityErrorResponse.of(ConnectivityInternalErrorException.newBuilder().build());
        }
        this.sender.tell(of, getSelf());
        stop();
    }

    protected void stop() {
        getContext().stop(getSelf());
    }
}
