package org.eclipse.ditto.edge.service.acknowledgements;

import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorRefFactory;
import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.ReceiveTimeout;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.AcknowledgementCorrelationIdMissingException;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.ErrorResponse;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;

/* loaded from: input_file:org/eclipse/ditto/edge/service/acknowledgements/AcknowledgementForwarderActor.class */
public final class AcknowledgementForwarderActor extends AbstractActor {
    static final String ACTOR_NAME_PREFIX = "ackForwarder-";
    private final ActorSelection commandForwarder;
    private final String correlationId;

    @Nullable
    private final String ackgregatorAddressFallback;
    private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

    private AcknowledgementForwarderActor(ActorSelection actorSelection, DittoHeaders dittoHeaders, Duration duration) {
        this.commandForwarder = actorSelection;
        this.correlationId = dittoHeaders.getCorrelationId().orElseGet(() -> {
            return getSelf().path().name().replace(ACTOR_NAME_PREFIX, "");
        });
        this.ackgregatorAddressFallback = dittoHeaders.get(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey());
        getContext().setReceiveTimeout(dittoHeaders.getTimeout().orElse(duration));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(ActorSelection actorSelection, DittoHeaders dittoHeaders, Duration duration) {
        return Props.create((Class<?>) AcknowledgementForwarderActor.class, actorSelection, dittoHeaders, duration);
    }

    @Override // org.apache.pekko.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(CommandResponse.class, (v1) -> {
            forwardCommandResponse(v1);
        }).match(ReceiveTimeout.class, this::handleReceiveTimeout).matchAny(obj -> {
            this.log.warning("Received unexpected message: <{}>", obj);
        }).build();
    }

    private void forwardCommandResponse(WithDittoHeaders withDittoHeaders) {
        DittoHeaders dittoHeaders = withDittoHeaders.getDittoHeaders();
        String orDefault = dittoHeaders.getOrDefault(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), this.ackgregatorAddressFallback);
        if (null == orDefault || !((withDittoHeaders instanceof Acknowledgement) || (withDittoHeaders instanceof ErrorResponse))) {
            this.log.withCorrelationId(withDittoHeaders).debug("Received live CommandResponse <{}>, forwarding to command forwarder: {}", withDittoHeaders.getClass().getSimpleName(), dittoHeaders);
            this.commandForwarder.tell(withDittoHeaders, ActorRef.noSender());
        } else {
            ActorSelection actorSelection = getContext().actorSelection(orDefault);
            this.log.withCorrelationId(withDittoHeaders).debug("Received Acknowledgement / ErrorResponse, forwarding to acknowledgement aggregator <{}>: <{}>", actorSelection, withDittoHeaders);
            actorSelection.tell(withDittoHeaders, getSender());
        }
    }

    private void handleReceiveTimeout(ReceiveTimeout receiveTimeout) {
        this.log.withCorrelationId(this.correlationId).debug("Timed out waiting for requested acknowledgements, stopping myself ...");
        getContext().cancelReceiveTimeout();
        getContext().stop(getSelf());
    }

    public static String determineActorName(DittoHeaders dittoHeaders) {
        ConditionChecker.checkNotNull(dittoHeaders, "dittoHeaders");
        return "ackForwarder-" + URLEncoder.encode(dittoHeaders.getCorrelationId().orElseThrow(() -> {
            return AcknowledgementCorrelationIdMissingException.newBuilder().dittoHeaders(dittoHeaders).build();
        }), Charset.defaultCharset());
    }

    static Optional<ActorRef> startAcknowledgementForwarderForTest(ActorRefFactory actorRefFactory, ActorRef actorRef, ActorSelection actorSelection, EntityId entityId, Signal<?> signal, AcknowledgementConfig acknowledgementConfig) {
        return AcknowledgementForwarderActorStarter.getInstance(actorRefFactory, actorRef, actorSelection, entityId, signal, acknowledgementConfig, acknowledgementLabel -> {
            return true;
        }).get();
    }

    /* JADX WARN: Type inference failed for: r1v4, types: [org.eclipse.ditto.base.model.headers.DittoHeaders] */
    public static Signal<?> startAcknowledgementForwarder(ActorRefFactory actorRefFactory, ActorRef actorRef, ActorSelection actorSelection, EntityId entityId, Signal<?> signal, AcknowledgementConfig acknowledgementConfig, Predicate<AcknowledgementLabel> predicate) {
        AcknowledgementForwarderActorStarter acknowledgementForwarderActorStarter = AcknowledgementForwarderActorStarter.getInstance(actorRefFactory, actorRef, actorSelection, entityId, signal, acknowledgementConfig, predicate);
        DittoHeadersBuilder<?, ?> builder = signal.getDittoHeaders().toBuilder();
        Optional<String> conflictFree = acknowledgementForwarderActorStarter.getConflictFree();
        Objects.requireNonNull(builder);
        conflictFree.ifPresent((v1) -> {
            r1.correlationId(v1);
        });
        if (!signal.getDittoHeaders().getAcknowledgementRequests().isEmpty()) {
            builder.acknowledgementRequests(acknowledgementForwarderActorStarter.getAllowedAckRequests());
        }
        return (Signal) signal.setDittoHeaders(builder.build());
    }
}
