package org.kinotic.continuum.gateway.internal.api.security;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.kinotic.continuum.core.api.event.StreamData;
import org.kinotic.continuum.core.api.security.SessionMetadata;
import org.kinotic.continuum.gateway.api.security.SessionInformationService;
import org.kinotic.continuum.internal.core.api.aignite.IgniteContinuousQueryObserver;
import org.kinotic.continuum.internal.core.api.security.DefaultSessionMetadata;
import org.kinotic.continuum.internal.util.IgniteUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@Component
/* loaded from: input_file:org/kinotic/continuum/gateway/internal/api/security/DefaultSessionInformationService.class */
public class DefaultSessionInformationService implements SessionInformationService {
    private static final Logger log = LoggerFactory.getLogger(DefaultSessionInformationService.class);
    private final Vertx vertx;
    private final Ignite ignite;
    private final IgniteCache<String, DefaultSessionMetadata> sessionCache;
    private final Scheduler scheduler;

    public DefaultSessionInformationService(Vertx vertx, @Autowired(required = false) Ignite ignite) {
        this.vertx = vertx;
        this.ignite = ignite;
        if (ignite != null) {
            this.sessionCache = ignite.cache("__continuumSessionCache");
        } else {
            this.sessionCache = null;
        }
        this.scheduler = Schedulers.fromExecutor(runnable -> {
            vertx.executeBlocking(promise -> {
                runnable.run();
            }, (Handler) null);
        });
    }

    @Override // org.kinotic.continuum.gateway.api.security.SessionInformationService
    public Flux<Long> countActiveSessionsContinuous() {
        return IgniteUtils.countCacheEntriesContinuous(this.ignite, this.vertx, this.sessionCache);
    }

    @Override // org.kinotic.continuum.gateway.api.security.SessionInformationService
    public Flux<StreamData<String, SessionMetadata>> listActiveSessionsContinuous() {
        if (this.ignite == null) {
            throw new IllegalStateException("This method is not available when ignite is disabled");
        }
        return IgniteUtils.observerToFlux(() -> {
            return new IgniteContinuousQueryObserver(this.vertx, this.sessionCache, new ScanQuery());
        }).subscribeOn(this.scheduler);
    }

    private void safeCloseCursor(QueryCursor<?> queryCursor) {
        if (queryCursor != null) {
            try {
                queryCursor.close();
            } catch (Exception e) {
                log.warn("Exception closing continuous query", e);
            }
        }
    }
}
