package io.reacted.drivers.channels.replay;

import io.reacted.core.config.ChannelId;
import io.reacted.core.drivers.local.LocalDriver;
import io.reacted.core.messages.Message;
import io.reacted.core.messages.reactors.DeliveryStatus;
import io.reacted.core.messages.reactors.DeliveryStatusUpdate;
import io.reacted.core.messages.reactors.EventExecutionAttempt;
import io.reacted.core.reactors.ReActorId;
import io.reacted.core.reactorsystem.ReActorContext;
import io.reacted.core.reactorsystem.ReActorSystem;
import io.reacted.drivers.channels.chroniclequeue.CQDriverConfig;
import io.reacted.patterns.NonNullByDefault;
import io.reacted.patterns.Try;
import io.reacted.patterns.UnChecked;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.wire.DocumentContext;
import org.apache.log4j.Logger;

@NonNullByDefault
/* loaded from: input_file:io/reacted/drivers/channels/replay/ReplayLocalDriver.class */
public class ReplayLocalDriver extends LocalDriver<CQDriverConfig> {
    private static final Logger LOGGER = Logger.getLogger(ReplayLocalDriver.class);
    private final Set<ReActorId> spawnedReActors;

    @Nullable
    private ChronicleQueue chronicle;

    @Nullable
    private ReActorSystem replayedActorSystem;

    public ReplayLocalDriver(CQDriverConfig cQDriverConfig) {
        super(cQDriverConfig);
        this.spawnedReActors = ConcurrentHashMap.newKeySet();
    }

    public void initDriverLoop(ReActorSystem reActorSystem) {
        this.chronicle = ChronicleQueue.singleBuilder(((CQDriverConfig) getDriverConfig()).getChronicleFilesDir()).build();
        this.replayedActorSystem = reActorSystem;
    }

    public UnChecked.CheckedRunnable getDriverLoop() {
        return () -> {
            replayerMainLoop((ReActorSystem) Objects.requireNonNull(this.replayedActorSystem), (ChronicleQueue) Objects.requireNonNull(this.chronicle));
        };
    }

    public ChannelId getChannelId() {
        return ChannelId.REPLAY_CHRONICLE_QUEUE.forChannelName(((CQDriverConfig) getDriverConfig()).getChannelName());
    }

    public Properties getChannelProperties() {
        return new Properties();
    }

    public Try<DeliveryStatus> sendMessage(ReActorContext reActorContext, Message message) {
        if (!(message.getPayload() instanceof DeliveryStatusUpdate)) {
            this.spawnedReActors.add(message.getDestination().getReActorId());
            this.spawnedReActors.add(message.getSender().getReActorId());
        }
        return Try.ofSuccess(DeliveryStatus.DELIVERED);
    }

    public CompletionStage<Try<DeliveryStatus>> sendAsyncMessage(ReActorContext reActorContext, Message message) {
        return CompletableFuture.completedFuture(sendMessage(reActorContext, message));
    }

    public CompletionStage<Try<Void>> cleanDriverLoop() {
        return CompletableFuture.completedFuture(Try.ofRunnable(() -> {
            ((ChronicleQueue) Objects.requireNonNull(this.chronicle)).close();
        }));
    }

    private void replayerMainLoop(ReActorSystem reActorSystem, ChronicleQueue chronicleQueue) {
        ExcerptTailer createTailer = chronicleQueue.createTailer();
        HashMap hashMap = new HashMap();
        Pauser millis = Pauser.millis(1000, 2000);
        while (!Thread.currentThread().isInterrupted() && !chronicleQueue.isClosed()) {
            Objects.requireNonNull(createTailer);
            Message message = (Message) ((Optional) Try.withResources(createTailer::readingDocument, ReplayLocalDriver::getNextMessage).orElse(Optional.empty(), th -> {
                LOGGER.error("Error reading message from CQ", th);
            })).orElse(null);
            if (message == null) {
                millis.pause();
                millis.reset();
            } else if (isForLocalReActorSystem(reActorSystem, message)) {
                EventExecutionAttempt payload = message.getPayload();
                if (payload instanceof EventExecutionAttempt) {
                    while (!isTargetReactorAlreadySpawned(message)) {
                        millis.pause();
                        millis.reset();
                    }
                    EventExecutionAttempt eventExecutionAttempt = payload;
                    Message message2 = (Message) ((Map) hashMap.getOrDefault(eventExecutionAttempt.getReActorId(), Collections.emptyMap())).remove(Long.valueOf(eventExecutionAttempt.getMsgSeqNum()));
                    reActorSystem.getReActor(eventExecutionAttempt.getReActorId()).filter(reActorContext -> {
                        return Objects.nonNull(message2);
                    }).ifPresentOrElse(reActorContext2 -> {
                        forwardMessageToLocalActor(reActorContext2, message2);
                    }, () -> {
                        reActorSystem.logError("Unable to delivery message {} for ReActor {}", new Serializable[]{eventExecutionAttempt.getReActorId(), message2, new IllegalStateException()});
                    });
                } else {
                    ((Map) hashMap.computeIfAbsent(message.getDestination().getReActorId(), reActorId -> {
                        return new HashMap();
                    })).put(Long.valueOf(message.getSequenceNumber()), message);
                }
            }
        }
    }

    private static Optional<Message> getNextMessage(DocumentContext documentContext) {
        return Optional.ofNullable(documentContext.isPresent() ? (Message) documentContext.wire().read().object(Message.class) : null);
    }

    private boolean isForLocalReActorSystem(ReActorSystem reActorSystem, Message message) {
        return message.getDestination().getReActorSystemRef().equals(reActorSystem.getLoopback());
    }

    private boolean isTargetReactorAlreadySpawned(Message message) {
        return this.spawnedReActors.contains(message.getSender().getReActorId());
    }
}
