package io.reacted.drivers.channels.chroniclequeue;

import io.reacted.core.config.ChannelId;
import io.reacted.core.drivers.local.LocalDriver;
import io.reacted.core.messages.Message;
import io.reacted.core.messages.reactors.DeliveryStatus;
import io.reacted.core.reactorsystem.ReActorContext;
import io.reacted.core.reactorsystem.ReActorSystem;
import io.reacted.patterns.NonNullByDefault;
import io.reacted.patterns.Try;
import io.reacted.patterns.UnChecked;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import javax.annotation.Nullable;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.threads.Pauser;
import org.apache.log4j.Logger;

@NonNullByDefault
/* loaded from: input_file:io/reacted/drivers/channels/chroniclequeue/CQLocalDriver.class */
public class CQLocalDriver extends LocalDriver<CQDriverConfig> {
    private static final Logger LOGGER = Logger.getLogger(CQLocalDriver.class);

    @Nullable
    private ChronicleQueue chronicle;

    @Nullable
    private ExcerptTailer cqTailer;

    public CQLocalDriver(CQDriverConfig cQDriverConfig) {
        super(cQDriverConfig);
    }

    public void initDriverLoop(ReActorSystem reActorSystem) {
        this.chronicle = ChronicleQueue.singleBuilder(((CQDriverConfig) getDriverConfig()).getChronicleFilesDir()).build();
        this.cqTailer = this.chronicle.createTailer().toEnd();
    }

    public UnChecked.CheckedRunnable getDriverLoop() {
        return () -> {
            chronicleMainLoop((ExcerptTailer) Objects.requireNonNull(this.cqTailer));
        };
    }

    public ChannelId getChannelId() {
        return ChannelId.LOCAL_CHRONICLE_QUEUE.forChannelName(((CQDriverConfig) getDriverConfig()).getChannelName());
    }

    public Properties getChannelProperties() {
        return ((CQDriverConfig) getDriverConfig()).getProperties();
    }

    public CompletionStage<Try<DeliveryStatus>> sendAsyncMessage(ReActorContext reActorContext, Message message) {
        return CompletableFuture.completedFuture(sendMessage(reActorContext, message));
    }

    public Try<DeliveryStatus> sendMessage(ReActorContext reActorContext, Message message) {
        return sendMessage(message);
    }

    public boolean channelRequiresDeliveryAck() {
        return ((CQDriverConfig) getDriverConfig()).isDeliveryAckRequiredByChannel();
    }

    public CompletionStage<Try<Void>> cleanDriverLoop() {
        return CompletableFuture.completedFuture(Try.ofRunnable(() -> {
            ((ChronicleQueue) Objects.requireNonNull(this.chronicle)).close();
        }));
    }

    private void chronicleMainLoop(ExcerptTailer excerptTailer) {
        Pauser millis = Pauser.millis(100, 500);
        while (!Thread.currentThread().isInterrupted()) {
            Objects.requireNonNull(excerptTailer);
            Message message = (Message) Try.withResources(excerptTailer::readingDocument, documentContext -> {
                if (documentContext.isPresent()) {
                    return (Message) documentContext.wire().read(((CQDriverConfig) getDriverConfig()).getTopic()).object(Message.class);
                }
                return null;
            }).orElse((Object) null, th -> {
                LOGGER.error("Unable to decode data", th);
            });
            if (message == null) {
                millis.pause();
                millis.reset();
            } else {
                offerMessage(message);
            }
        }
        Thread.currentThread().interrupt();
    }

    private Try<DeliveryStatus> sendMessage(Message message) {
        return Try.ofRunnable(() -> {
            ((ExcerptAppender) Objects.requireNonNull(((ChronicleQueue) Objects.requireNonNull(this.chronicle)).acquireAppender())).writeMessage(((CQDriverConfig) getDriverConfig()).getTopic(), message);
        }).map(r2 -> {
            return DeliveryStatus.DELIVERED;
        });
    }
}
