package org.eclipse.ditto.services.concierge.batch.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.Props;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.AbstractPersistentActor;
import akka.persistence.RecoveryCompleted;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.DittoHeadersBuilder;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.batch.ExecuteBatch;
import org.eclipse.ditto.signals.commands.batch.ExecuteBatchResponse;
import org.eclipse.ditto.signals.commands.batch.exceptions.BatchAlreadyExecutingException;
import org.eclipse.ditto.signals.commands.things.ThingErrorResponse;
import org.eclipse.ditto.signals.events.base.Event;
import org.eclipse.ditto.signals.events.batch.BatchCommandExecuted;
import org.eclipse.ditto.signals.events.batch.BatchExecutionFinished;
import org.eclipse.ditto.signals.events.batch.BatchExecutionStarted;
import scala.concurrent.duration.Duration;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/eclipse/ditto/services/concierge/batch/actors/BatchCoordinatorActor.class */
public final class BatchCoordinatorActor extends AbstractPersistentActor {
    static final String ACTOR_NAME_PREFIX = "batch-coordinator-";
    private static final String PERSISTENCE_ID_PREFIX = "batch:";
    private static final String JOURNAL_PLUGIN_ID = "akka-contrib-mongodb-persistence-batch-journal";
    private static final String SNAPSHOT_PLUGIN_ID = "akka-contrib-mongodb-persistence-batch-snapshots";
    private static final int SHUTDOWN_TIMEOUT_SECONDS = 60;
    private static final String BATCH_ID_FIELD = "batchId";
    private static final String RANDOM_FIELD = "random";
    private static final String ORIGINAL_CORRELATION_ID = "originalCorrelationId";
    private final DiagnosticLoggingAdapter log;
    private final ActorRef eventRecipient;
    private final ActorRef conciergeForwarder;
    private final Set<String> pendingCommands;
    private final Map<String, Command> commands;
    private final List<CommandResponse> commandResponses;
    private String batchId;
    private ActorRef originalSender;
    private Cancellable shutdown;

    private BatchCoordinatorActor(String str, ActorRef actorRef, ActorRef actorRef2) {
        this.log = LogUtil.obtain(this);
        this.batchId = str;
        this.conciergeForwarder = actorRef2;
        this.eventRecipient = actorRef;
        this.pendingCommands = new HashSet();
        this.commands = new HashMap();
        this.commandResponses = new ArrayList();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(final String str, final ActorRef actorRef, final ActorRef actorRef2) {
        return Props.create(BatchCoordinatorActor.class, new Creator<BatchCoordinatorActor>() { // from class: org.eclipse.ditto.services.concierge.batch.actors.BatchCoordinatorActor.1
            private static final long serialVersionUID = 1;

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // akka.japi.Creator
            public BatchCoordinatorActor create() {
                return new BatchCoordinatorActor(str, actorRef, actorRef2);
            }
        });
    }

    @Override // akka.persistence.PersistenceIdentity
    public String persistenceId() {
        return "batch:" + this.batchId;
    }

    @Override // akka.persistence.AbstractPersistentActor, akka.persistence.PersistenceIdentity
    public String journalPluginId() {
        return JOURNAL_PLUGIN_ID;
    }

    @Override // akka.persistence.AbstractPersistentActor, akka.persistence.PersistenceIdentity
    public String snapshotPluginId() {
        return SNAPSHOT_PLUGIN_ID;
    }

    @Override // akka.persistence.AbstractPersistentActor, akka.persistence.AbstractPersistentActorLike
    public AbstractActor.Receive createReceiveRecover() {
        return ReceiveBuilder.create().match(BatchExecutionStarted.class, batchExecutionStarted -> {
            this.batchId = batchExecutionStarted.getBatchId();
            batchExecutionStarted.getCommands().forEach(command -> {
                String str = command.getDittoHeaders().getCorrelationId().get();
                this.commands.put(str, command);
                this.pendingCommands.add(str);
            });
        }).match(BatchCommandExecuted.class, batchCommandExecuted -> {
            CommandResponse response = batchCommandExecuted.getResponse();
            this.pendingCommands.remove(response.getDittoHeaders().getCorrelationId().orElseThrow(() -> {
                return new IllegalStateException("Received a CommandResponse without Correlation ID!");
            }));
            this.commandResponses.add(response);
        }).match(BatchExecutionFinished.class, batchExecutionFinished -> {
            this.log.debug("Recovered finished batch '{}'.", this.batchId);
        }).match(RecoveryCompleted.class, recoveryCompleted -> {
            this.log.debug("Recovery completed");
            if (this.pendingCommands.isEmpty()) {
                this.log.debug("No pending commands - shutting down in {} seconds.", 60);
                scheduleShutdown();
            } else {
                this.log.debug("Resuming execution of batch '{}'.", this.batchId);
                this.pendingCommands.forEach(str -> {
                    this.conciergeForwarder.tell(this.commands.get(str), getSelf());
                });
                becomeCommandResponseAwaiting();
            }
        }).matchAny(obj -> {
            this.log.warning("Unknown recover message: {}", obj);
        }).build();
    }

    @Override // akka.persistence.AbstractPersistentActor, akka.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(ExecuteBatch.class, this::executeDryRun).matchAny(obj -> {
            this.log.warning("Got unknown message, expected an 'ExecuteBatch' command: {}", obj);
        }).build();
    }

    private void executeDryRun(ExecuteBatch executeBatch) {
        cancelShutdown();
        this.originalSender = getSender();
        executeBatch.getCommands().forEach(this::tellCommandAsDryRun);
        becomeDryRunCommandResponseAwaiting();
    }

    private void tellCommandAsDryRun(Command command) {
        String encodeCorrelationId = encodeCorrelationId(command.getDittoHeaders());
        DittoHeadersBuilder correlationId = command.getDittoHeaders().toBuilder().correlationId(encodeCorrelationId);
        this.commands.put(encodeCorrelationId, command.setDittoHeaders2(correlationId.build()));
        this.pendingCommands.add(encodeCorrelationId);
        this.conciergeForwarder.tell(command.setDittoHeaders2(correlationId.dryRun(true).build()), getSelf());
    }

    private void becomeDryRunCommandResponseAwaiting() {
        getContext().become(ReceiveBuilder.create().match(ExecuteBatch.class, this::batchAlreadyExecuting).match(CommandResponse.class, commandResponse -> {
            this.pendingCommands.remove(commandResponse.getDittoHeaders().getCorrelationId().orElseThrow(() -> {
                return new IllegalStateException("Received a CommandResponse without Correlation ID!");
            }));
            if (this.pendingCommands.isEmpty()) {
                BatchExecutionStarted of = BatchExecutionStarted.of(this.batchId, Instant.now(), new ArrayList(this.commands.values()), buildDittoHeaders());
                persist(of, batchExecutionStarted -> {
                    this.log.info("Batch with ID '{}' started.", this.batchId);
                    this.originalSender.tell(ExecuteBatchResponse.of(this.batchId, buildDittoHeaders()), getSelf());
                    this.commands.forEach((str, command) -> {
                        this.conciergeForwarder.tell(command, getSelf());
                        this.pendingCommands.add(str);
                    });
                    notifyEventRecipient(of);
                    becomeCommandResponseAwaiting();
                });
            }
        }).match(DittoRuntimeException.class, dittoRuntimeException -> {
            String orElseThrow = dittoRuntimeException.getDittoHeaders().getCorrelationId().orElseThrow(() -> {
                return new IllegalStateException("Received a DittoRuntimeException without Correlation ID!");
            });
            ThingErrorResponse of = ThingErrorResponse.of(dittoRuntimeException, buildDittoHeaders());
            this.pendingCommands.remove(orElseThrow);
            this.originalSender.tell(of, getSelf());
            scheduleShutdown();
            becomeShutdownAwaiting();
        }).matchAny(obj -> {
            this.log.warning("Got unknown message: {}", obj);
        }).build());
    }

    private void becomeCommandResponseAwaiting() {
        getContext().become(ReceiveBuilder.create().match(ExecuteBatch.class, this::batchAlreadyExecuting).match(CommandResponse.class, this::handleCommandResponse).match(DittoRuntimeException.class, this::handleDittoRuntimeException).matchAny(obj -> {
            this.log.warning("Got unknown message: {}", obj);
        }).build());
    }

    private void batchAlreadyExecuting(ExecuteBatch executeBatch) {
        this.originalSender.tell(ThingErrorResponse.of(BatchAlreadyExecutingException.newBuilder(this.batchId).dittoHeaders(executeBatch.getDittoHeaders()).build()), getSelf());
    }

    private void handleDittoRuntimeException(DittoRuntimeException dittoRuntimeException) {
        handleCommandResponse(ThingErrorResponse.of(dittoRuntimeException));
    }

    private void handleCommandResponse(CommandResponse commandResponse) {
        String orElseThrow = commandResponse.getDittoHeaders().getCorrelationId().orElseThrow(() -> {
            return new IllegalStateException("Received a CommandResponse without Correlation ID!");
        });
        persist(BatchCommandExecuted.of(commandResponse.getDittoHeaders().getCorrelationId().orElseThrow(() -> {
            return new IllegalStateException("encountered CommandResponse without correlationId");
        }), commandResponse, Instant.now()), batchCommandExecuted -> {
            this.log.info("Received '{}' for Batch with ID '{}'.", commandResponse.getName(), this.batchId);
            this.pendingCommands.remove(orElseThrow);
            this.commandResponses.add(unfixCorrelationId(commandResponse));
            if (this.pendingCommands.isEmpty()) {
                BatchExecutionFinished of = BatchExecutionFinished.of(this.batchId, Instant.now(), this.commandResponses, buildDittoHeaders());
                persist(of, batchExecutionFinished -> {
                    this.log.info("Batch with ID '{}' finished.", this.batchId);
                    notifyEventRecipient(of);
                    scheduleShutdown();
                    becomeShutdownAwaiting();
                });
            }
        });
    }

    private void notifyEventRecipient(Event event) {
        if (this.eventRecipient != null) {
            this.eventRecipient.tell(event, getSelf());
        }
    }

    private DittoHeaders buildDittoHeaders() {
        return DittoHeaders.newBuilder().correlationId(this.batchId).build();
    }

    private String encodeCorrelationId(DittoHeaders dittoHeaders) {
        return JsonObject.newBuilder().set(BATCH_ID_FIELD, this.batchId).set(RANDOM_FIELD, UUID.randomUUID().toString()).set(ORIGINAL_CORRELATION_ID, dittoHeaders.getCorrelationId().orElse(UUID.randomUUID().toString())).build().toString();
    }

    private JsonObject decodeCommandCorrelationId(String str) {
        return JsonFactory.newObject(str);
    }

    private CommandResponse unfixCorrelationId(CommandResponse commandResponse) {
        return commandResponse.setDittoHeaders2(commandResponse.getDittoHeaders().toBuilder().correlationId((String) commandResponse.getDittoHeaders().getCorrelationId().flatMap(str -> {
            return decodeCommandCorrelationId(str).getValue(ORIGINAL_CORRELATION_ID);
        }).map((v0) -> {
            return v0.asString();
        }).orElse(null)).build());
    }

    private void scheduleShutdown() {
        this.shutdown = getContext().system().scheduler().scheduleOnce(Duration.create(60L, TimeUnit.SECONDS), () -> {
            if (null != getContext()) {
                getContext().stop(getSelf());
            }
        }, getContext().system().dispatcher());
    }

    private void becomeShutdownAwaiting() {
        getContext().become(ReceiveBuilder.create().matchAny(obj -> {
            this.log.debug("Got message while waiting for shutdown: {}", obj);
        }).build());
    }

    private void cancelShutdown() {
        if (null != this.shutdown) {
            this.shutdown.cancel();
            getContext().unbecome();
        }
    }
}
