package org.kinotic.continuum.internal.core.api.event;

import org.kinotic.continuum.core.api.event.CRI;
import org.kinotic.continuum.core.api.event.Event;
import org.kinotic.continuum.core.api.event.EventBusService;
import org.kinotic.continuum.core.api.event.EventService;
import org.kinotic.continuum.core.api.event.EventStreamService;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
/* loaded from: input_file:org/kinotic/continuum/internal/core/api/event/DefaultEventService.class */
public class DefaultEventService implements EventService {
    private final EventBusService eventBusService;
    private final EventStreamService eventStreamService;

    public DefaultEventService(EventBusService eventBusService, EventStreamService eventStreamService) {
        this.eventBusService = eventBusService;
        this.eventStreamService = eventStreamService;
    }

    public Mono<Void> send(Event<byte[]> event) {
        Mono<Void> send;
        if (event.cri().scheme().equals("srv")) {
            send = this.eventBusService.sendWithAck(event);
        } else {
            if (!event.cri().scheme().equals("stream")) {
                throw new IllegalArgumentException("Event cri must begin with srv or stream");
            }
            send = this.eventStreamService.send(event);
        }
        return send;
    }

    public Flux<Event<byte[]>> listen(String str) {
        Flux<Event<byte[]>> listen;
        if (str.startsWith("srv")) {
            listen = this.eventBusService.listen(str);
        } else {
            if (!str.startsWith("stream")) {
                throw new IllegalArgumentException("Event cri must begin with srv or stream");
            }
            listen = this.eventStreamService.listen(CRI.create(str));
        }
        return listen;
    }
}
