package org.eclipse.ditto.edge.service.acknowledgements;

import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorRefFactory;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Address;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.japi.pf.PFBuilder;
import org.eclipse.ditto.base.model.acks.AbstractCommandAckRequestSetter;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.acks.CommandResponseAcknowledgementProvider;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoHeaderInvalidException;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.translator.HeaderTranslator;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.internal.models.signal.correlation.MatchingValidationResult;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.protocol.TopicPath;
import scala.PartialFunction;

/* loaded from: input_file:org/eclipse/ditto/edge/service/acknowledgements/AcknowledgementAggregatorActorStarter.class */
public final class AcknowledgementAggregatorActorStarter {
    private static final ThreadSafeDittoLogger LOGGER = DittoLoggerFactory.getThreadSafeLogger((Class<?>) AcknowledgementAggregatorActorStarter.class);
    private final ActorRefFactory actorRefFactory;
    private final Duration maxTimeout;
    private final HeaderTranslator headerTranslator;
    private final PartialFunction<Signal<?>, Signal<?>> ackRequestSetter;
    private final Collection<CommandResponseAcknowledgementProvider<?>> responseAcknowledgementProviders;

    @Nullable
    private final Consumer<MatchingValidationResult.Failure> matchingValidationFailureConsumer;
    private final Address selfRemoteAddress;
    private int childCounter = 0;

    private AcknowledgementAggregatorActorStarter(ActorRefFactory actorRefFactory, Duration duration, HeaderTranslator headerTranslator, @Nullable Consumer<MatchingValidationResult.Failure> consumer, PartialFunction<Signal<?>, Signal<?>> partialFunction, Collection<CommandResponseAcknowledgementProvider<?>> collection) {
        this.actorRefFactory = (ActorRefFactory) ConditionChecker.checkNotNull(actorRefFactory, "actorRefFactory");
        this.maxTimeout = (Duration) ConditionChecker.checkNotNull(duration, "maxTimeout");
        this.headerTranslator = (HeaderTranslator) ConditionChecker.checkNotNull(headerTranslator, "headerTranslator");
        this.matchingValidationFailureConsumer = consumer;
        this.ackRequestSetter = partialFunction;
        this.responseAcknowledgementProviders = collection;
        this.selfRemoteAddress = Cluster.get((ActorSystem) actorRefFactory.systemImpl()).selfUniqueAddress().address();
    }

    public static AcknowledgementAggregatorActorStarter of(ActorRefFactory actorRefFactory, AcknowledgementConfig acknowledgementConfig, HeaderTranslator headerTranslator, @Nullable Consumer<MatchingValidationResult.Failure> consumer, Collection<AbstractCommandAckRequestSetter<?>> collection, Collection<CommandResponseAcknowledgementProvider<?>> collection2) {
        return of(actorRefFactory, acknowledgementConfig.getForwarderFallbackTimeout(), headerTranslator, consumer, collection, collection2);
    }

    public static AcknowledgementAggregatorActorStarter of(ActorRefFactory actorRefFactory, Duration duration, HeaderTranslator headerTranslator, @Nullable Consumer<MatchingValidationResult.Failure> consumer, Collection<AbstractCommandAckRequestSetter<?>> collection, Collection<CommandResponseAcknowledgementProvider<?>> collection2) {
        return new AcknowledgementAggregatorActorStarter(actorRefFactory, duration, headerTranslator, consumer, buildAckRequestSetter(collection), collection2);
    }

    public static boolean shouldStartForIncoming(Signal<?> signal) {
        boolean isChannelLive = Signal.isChannelLive(signal);
        boolean isChannelSmart = Signal.isChannelSmart(signal);
        Set<AcknowledgementRequest> acknowledgementRequests = signal.getDittoHeaders().getAcknowledgementRequests();
        return ((signal instanceof Command) && Command.Category.isEntityModifyingCommand(((Command) signal).getCategory()) && Command.isThingCommand(signal) && !isChannelLive) ? acknowledgementRequests.stream().anyMatch(AcknowledgementForwarderActorStarter::isNotLiveResponse) : (Command.isMessageCommand(signal) || (isChannelLive && Command.isThingCommand(signal))) ? acknowledgementRequests.stream().anyMatch(AcknowledgementForwarderActorStarter::isNotTwinPersisted) : isChannelSmart;
    }

    public <T> T start(Signal<?> signal, @Nullable Duration duration, Function<Object, T> function, BiFunction<Signal<?>, ActorRef, T> biFunction, Function<Signal<?>, T> function2) {
        return (T) preprocess(signal, (signal2, bool) -> {
            Optional<EntityId> entityId = WithEntityId.getEntityId(signal);
            if (!bool.booleanValue() || !entityId.isPresent() || !(signal2 instanceof Command)) {
                return function2.apply(signal2);
            }
            EntityId entityId2 = entityId.get();
            Objects.requireNonNull(function);
            return doStart(entityId2, (Command) signal2, duration, function::apply, (actorRef, signal2) -> {
                return biFunction.apply(signal2, actorRef);
            });
        }, function);
    }

    public <T> T preprocess(Signal<?> signal, BiFunction<Signal<?>, Boolean, T> biFunction, Function<? super DittoHeaderInvalidException, T> function) {
        Signal<?> apply = this.ackRequestSetter.mo4620apply(signal);
        return (T) getDittoHeaderInvalidException(apply).map(function).orElseGet(() -> {
            return biFunction.apply(apply, Boolean.valueOf(shouldStartForIncoming(apply)));
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v6, types: [org.eclipse.ditto.base.model.headers.DittoHeadersBuilder] */
    public <T> T doStart(EntityId entityId, Command<?> command, @Nullable Duration duration, Consumer<Object> consumer, BiFunction<ActorRef, Signal<?>, T> biFunction) {
        ActorRef startAckAggregatorActor = startAckAggregatorActor(entityId, command, duration, consumer);
        return (T) biFunction.apply(startAckAggregatorActor, command.setDittoHeaders(command.getDittoHeaders().toBuilder().putHeader(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), startAckAggregatorActor.path().toSerializationFormatWithAddress(this.selfRemoteAddress)).build()));
    }

    private <C extends Command<?>> ActorRef startAckAggregatorActor(EntityId entityId, C c, @Nullable Duration duration, Consumer<Object> consumer) {
        return this.actorRefFactory.actorOf(AcknowledgementAggregatorActor.props(entityId, c, duration, this.maxTimeout, this.headerTranslator, consumer, this.matchingValidationFailureConsumer, findRelevantAcknowledgementProvider(c).orElseThrow(() -> {
            LOGGER.withCorrelationId(c).error("Tried to start acknowledgement aggregator for command <{}> but don't know any applicable acknowledgement providers for this.", c);
            return DittoInternalErrorException.newBuilder().dittoHeaders(c.getDittoHeaders()).build();
        })), getNextActorName(c.getDittoHeaders()));
    }

    private <C extends Command<?>> Optional<CommandResponseAcknowledgementProvider<C>> findRelevantAcknowledgementProvider(C c) {
        return this.responseAcknowledgementProviders.stream().filter(commandResponseAcknowledgementProvider -> {
            return commandResponseAcknowledgementProvider.getCommandClass().isAssignableFrom(c.getClass());
        }).map(commandResponseAcknowledgementProvider2 -> {
            return commandResponseAcknowledgementProvider2;
        }).findAny();
    }

    private String getNextActorName(DittoHeaders dittoHeaders) {
        String str = (String) dittoHeaders.getCorrelationId().map(str2 -> {
            return URLEncoder.encode(str2, StandardCharsets.UTF_8);
        }).orElse(TopicPath.ID_PLACEHOLDER);
        int i = this.childCounter;
        this.childCounter = i + 1;
        return String.format("ackr%x-%s", Integer.valueOf(i), str);
    }

    private static PartialFunction<Signal<?>, Signal<?>> buildAckRequestSetter(Collection<AbstractCommandAckRequestSetter<?>> collection) {
        PFBuilder pFBuilder = new PFBuilder();
        for (AbstractCommandAckRequestSetter<?> abstractCommandAckRequestSetter : collection) {
            Class<?> matchedClass = abstractCommandAckRequestSetter.getMatchedClass();
            Objects.requireNonNull(abstractCommandAckRequestSetter);
            pFBuilder = pFBuilder.match(matchedClass, abstractCommandAckRequestSetter::isApplicable, dittoHeadersSettable -> {
                return (Signal) abstractCommandAckRequestSetter.apply((AbstractCommandAckRequestSetter) dittoHeadersSettable);
            });
        }
        return pFBuilder.matchAny(signal -> {
            return signal;
        }).build();
    }

    private static Optional<DittoHeaderInvalidException> getDittoHeaderInvalidException(Signal<?> signal) {
        Optional<DittoHeaderInvalidException> empty;
        DittoHeaders dittoHeaders = signal.getDittoHeaders();
        if (isTimeoutHeaderInvalid(dittoHeaders)) {
            String key = DittoHeaderDefinition.TIMEOUT.getKey();
            empty = Optional.of(DittoHeaderInvalidException.newBuilder().withInvalidHeaderKey(key).message(String.format("The value of the header '%s' must not be zero if response or acknowledgements are requested.", key)).description("Please provide a positive timeout.").dittoHeaders(dittoHeaders).build());
        } else {
            empty = Optional.empty();
        }
        return empty;
    }

    private static boolean isTimeoutHeaderInvalid(DittoHeaders dittoHeaders) {
        boolean z;
        if (!dittoHeaders.getTimeout().filter((v0) -> {
            return v0.isZero();
        }).isPresent()) {
            z = false;
        } else if (dittoHeaders.isResponseRequired()) {
            z = true;
        } else {
            z = !dittoHeaders.getAcknowledgementRequests().isEmpty();
        }
        return z;
    }
}
