package io.reacted.drivers.channels.replay;

import io.reacted.core.config.ChannelId;
import io.reacted.core.drivers.system.LocalDriver;
import io.reacted.core.messages.Message;
import io.reacted.core.messages.reactors.DeliveryStatus;
import io.reacted.core.messages.reactors.DeliveryStatusUpdate;
import io.reacted.core.messages.reactors.EventExecutionAttempt;
import io.reacted.core.reactors.ReActorId;
import io.reacted.core.reactorsystem.ReActorContext;
import io.reacted.core.reactorsystem.ReActorSystem;
import io.reacted.drivers.channels.chroniclequeue.CQDriverConfig;
import io.reacted.patterns.Try;
import io.reacted.patterns.UnChecked;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.threads.TimingPauser;
import net.openhft.chronicle.wire.DocumentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/reacted/drivers/channels/replay/ReplayLocalDriver.class */
public class ReplayLocalDriver extends LocalDriver<CQDriverConfig> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReplayLocalDriver.class);
    private final Set<ReActorId> spawnedReActors;

    @Nullable
    private ChronicleQueue chronicle;

    @Nullable
    private ReActorSystem replayedActorSystem;

    public ReplayLocalDriver(CQDriverConfig cQDriverConfig) {
        super(cQDriverConfig);
        this.spawnedReActors = ConcurrentHashMap.newKeySet();
    }

    public void initDriverLoop(ReActorSystem reActorSystem) {
        this.chronicle = ChronicleQueue.singleBuilder(((CQDriverConfig) getDriverConfig()).getChronicleFilesDir()).build();
        this.replayedActorSystem = reActorSystem;
    }

    public UnChecked.CheckedRunnable getDriverLoop() {
        return () -> {
            replayerMainLoop((ReActorSystem) Objects.requireNonNull(this.replayedActorSystem), (ChronicleQueue) Objects.requireNonNull(this.chronicle));
        };
    }

    public ChannelId getChannelId() {
        return ChannelId.ChannelType.REPLAY_CHRONICLE_QUEUE.forChannelName(((CQDriverConfig) getDriverConfig()).getChannelName());
    }

    public Properties getChannelProperties() {
        return new Properties();
    }

    public DeliveryStatus sendMessage(ReActorContext reActorContext, Message message) {
        if (!(message.getPayload() instanceof DeliveryStatusUpdate)) {
            this.spawnedReActors.add(message.getDestination().getReActorId());
            this.spawnedReActors.add(message.getSender().getReActorId());
        }
        return DeliveryStatus.SENT;
    }

    public CompletionStage<DeliveryStatus> sendAsyncMessage(ReActorContext reActorContext, Message message) {
        return CompletableFuture.completedFuture(sendMessage(reActorContext, message));
    }

    public CompletionStage<Try<Void>> cleanDriverLoop() {
        return CompletableFuture.completedFuture(Try.ofRunnable(() -> {
            ((ChronicleQueue) Objects.requireNonNull(this.chronicle)).close();
        }));
    }

    private void replayerMainLoop(ReActorSystem reActorSystem, ChronicleQueue chronicleQueue) {
        HashMap hashMap = new HashMap();
        ExcerptTailer createTailer = chronicleQueue.createTailer();
        HashMap hashMap2 = new HashMap();
        TimingPauser balanced = Pauser.balanced();
        while (!Thread.currentThread().isInterrupted() && !chronicleQueue.isClosed()) {
            try {
                DocumentContext readingDocument = createTailer.readingDocument();
                try {
                    if (readingDocument.isPresent()) {
                        Message message = (Message) readingDocument.wire().read().object(Message.class);
                        if (message == null) {
                            balanced.pause();
                            if (readingDocument != null) {
                                readingDocument.close();
                            }
                        } else {
                            balanced.reset();
                            if (isForLocalReActorSystem(reActorSystem, message)) {
                                EventExecutionAttempt payload = message.getPayload();
                                if (payload instanceof EventExecutionAttempt) {
                                    EventExecutionAttempt eventExecutionAttempt = payload;
                                    while (!isTargetReactorAlreadySpawned(message)) {
                                        balanced.pause();
                                    }
                                    balanced.reset();
                                    Message message2 = (Message) ((Map) hashMap2.getOrDefault(eventExecutionAttempt.getReActorId(), hashMap)).remove(Long.valueOf(eventExecutionAttempt.getMsgSeqNum()));
                                    ReActorContext reActorCtx = reActorSystem.getReActorCtx(eventExecutionAttempt.getReActorId());
                                    if (reActorCtx == null || message2 == null) {
                                        LOGGER.error("Unable to delivery message {} for ReActor {}", new Object[]{message2, eventExecutionAttempt.getReActorId(), new IllegalStateException()});
                                    } else if (syncForwardMessageToLocalActor(reActorCtx, message2).isNotDelivered()) {
                                        LOGGER.error("Unable to delivery message {} for ReActor {}", message2, eventExecutionAttempt.getReActorId());
                                    }
                                    if (readingDocument != null) {
                                        readingDocument.close();
                                    }
                                } else {
                                    ((Map) hashMap2.computeIfAbsent(message.getDestination().getReActorId(), reActorId -> {
                                        return new HashMap();
                                    })).put(Long.valueOf(message.getSequenceNumber()), message);
                                    if (readingDocument != null) {
                                        readingDocument.close();
                                    }
                                }
                            } else if (readingDocument != null) {
                                readingDocument.close();
                            }
                        }
                    } else {
                        balanced.pause();
                        if (readingDocument != null) {
                            readingDocument.close();
                        }
                    }
                } catch (Throwable th) {
                    if (readingDocument != null) {
                        try {
                            readingDocument.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Exception e) {
                LOGGER.error("Error reading message from CQ", e);
                return;
            }
        }
    }

    private boolean isForLocalReActorSystem(ReActorSystem reActorSystem, Message message) {
        return message.getDestination().getReActorSystemRef().equals(reActorSystem.getLoopback());
    }

    private boolean isTargetReactorAlreadySpawned(Message message) {
        return this.spawnedReActors.contains(message.getSender().getReActorId());
    }
}
