package org.kinotic.continuum.gateway.internal.hft;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.concurrent.TimeUnit;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.DocumentContext;
import org.kinotic.continuum.core.api.event.Event;
import org.kinotic.continuum.gateway.api.config.ContinuumGatewayProperties;
import org.kinotic.continuum.gateway.internal.endpoints.stomp.GatewayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;

@Component
/* loaded from: input_file:org/kinotic/continuum/gateway/internal/hft/DefaultHFTQueueManager.class */
public class DefaultHFTQueueManager implements HFTQueueManager {
    private static final Logger log = LoggerFactory.getLogger(DefaultHFTQueueManager.class);
    private final LoadingCache<String, ChronicleQueue> cache;

    public DefaultHFTQueueManager(ContinuumGatewayProperties continuumGatewayProperties) {
        this.cache = Caffeine.newBuilder().expireAfterAccess(1L, TimeUnit.HOURS).removalListener((str, chronicleQueue, removalCause) -> {
            if (chronicleQueue != null) {
                chronicleQueue.close();
            }
        }).build(str2 -> {
            try {
                Path of = Path.of(continuumGatewayProperties.getDataDir(), str2);
                Files.createDirectories(of.getParent(), new FileAttribute[0]);
                return SingleChronicleQueueBuilder.binary(of).rollCycle(RollCycles.FAST_HOURLY).storeFileListener((i, file) -> {
                    log.debug("HFT releasing file " + file);
                }).build();
            } catch (Exception e) {
                log.error("Could not build HFT Queue for " + str2, e);
                throw e;
            }
        });
    }

    @Override // org.kinotic.continuum.gateway.internal.hft.HFTQueueManager
    public Mono<Void> write(Event<byte[]> event) {
        return Mono.create(monoSink -> {
            String replace = event.cri().resourceName().replace(".", "_");
            ChronicleQueue chronicleQueue = (ChronicleQueue) this.cache.get(replace);
            if (chronicleQueue == null) {
                monoSink.error(new IllegalStateException("No HFT Queue is available for " + replace));
                return;
            }
            HftRawEvent continuumEventToHftRawEvent = GatewayUtils.continuumEventToHftRawEvent(event);
            boolean z = false;
            try {
                DocumentContext writingDocument = chronicleQueue.acquireAppender().writingDocument();
                try {
                    GatewayUtils.writeHftRawEvent(continuumEventToHftRawEvent, writingDocument);
                    z = true;
                    if (writingDocument != null) {
                        writingDocument.close();
                    }
                } finally {
                }
            } catch (Exception e) {
                monoSink.error(new IllegalStateException("Error writing to HFT Queue", e));
            }
            if (z) {
                monoSink.success();
            }
        });
    }
}
