package org.kinotic.continuum.gateway.internal.endpoints;

import io.vertx.core.Promise;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import javax.crypto.SecretKeyFactory;
import javax.crypto.spec.PBEKeySpec;
import javax.crypto.spec.SecretKeySpec;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.kinotic.continuum.api.exceptions.RpcMissingServiceException;
import org.kinotic.continuum.core.api.event.CRI;
import org.kinotic.continuum.core.api.event.Event;
import org.kinotic.continuum.core.api.security.Session;
import org.kinotic.continuum.internal.util.SecurityUtil;
import org.kinotic.continuum.internal.utils.ContinuumUtil;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/kinotic/continuum/gateway/internal/endpoints/EndpointConnectionHandler.class */
public class EndpointConnectionHandler {
    private static final Logger log = LoggerFactory.getLogger(EndpointConnectionHandler.class);
    private final Services services;
    private Session session;
    private final Map<String, BaseSubscriber<Event<byte[]>>> subscriptions = new HashMap();
    private long sessionTimer = -1;

    public EndpointConnectionHandler(Services services) {
        this.services = services;
    }

    public Promise<Map<String, String>> authenticate(Map<String, String> map) {
        Promise<Map<String, String>> promise = Promise.promise();
        if (map.containsKey("session")) {
            String str = map.get("session");
            this.services.sessionManager.findSession(str).subscribe(session -> {
                sessionActive(session);
                promise.complete(Collections.singletonMap("session", this.session.sessionId()));
            }, th -> {
                log.error("Session could not be found " + str, th);
                promise.fail("Session is invalid");
            });
        } else {
            String str2 = map.get("login");
            String str3 = map.get("passcode");
            if (StringUtils.isNotBlank(str2) && StringUtils.isNotBlank(str3)) {
                Mono flatMap = this.services.securityService.authenticate(str2, str3).flatMap(participant -> {
                    try {
                        return this.services.sessionManager.create(Base64.getUrlEncoder().withoutPadding().encodeToString(new SecretKeySpec(SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(new PBEKeySpec(str3.toCharArray(), SecurityUtil.generateSecretKey(128), 65536, 256)).getEncoded(), "AES").getEncoded()), participant);
                    } catch (Exception e) {
                        log.error("Session could not be created for identity: " + str2, e);
                        return Mono.error(new IllegalStateException("Session could not be created", e));
                    }
                });
                Consumer consumer = session2 -> {
                    sessionActive(session2);
                    promise.complete(Collections.singletonMap("session", this.session.sessionId()));
                };
                Objects.requireNonNull(promise);
                flatMap.subscribe(consumer, promise::fail);
            } else {
                promise.fail("The connection frame does not contain valid credentials");
            }
        }
        return promise;
    }

    private void sessionActive(Session session) {
        this.session = session;
        this.sessionTimer = this.services.vertx.setPeriodic(this.services.continuumProperties.getSessionTimeout() / 2, l -> {
            this.session.touch();
        });
    }

    public Mono<Void> send(Event<byte[]> event) {
        Mono<Void> error;
        if (this.session.sendAllowed(event.cri())) {
            event.metadata().put("sender", this.session.participant().getIdentity());
            if (event.cri().scheme().equals("srv")) {
                try {
                    validateReplyTo(event);
                    error = this.services.eventBusService.sendWithAck(event).onErrorMap(th -> {
                        boolean z = false;
                        if ((th instanceof ReplyException) && ((ReplyException) th).failureType() == ReplyFailure.NO_HANDLERS) {
                            z = true;
                        }
                        return z;
                    }, RpcMissingServiceException::new);
                } catch (Exception e) {
                    error = Mono.error(e);
                }
            } else {
                error = event.cri().scheme().equals("stream") ? Flux.concat(new Publisher[]{this.services.hftQueueManager.write(event).onErrorMap(th2 -> {
                    log.error("Error occurred writing to HFT Queue", th2);
                    return new IllegalStateException("Could not store");
                }), this.services.eventStreamService.send(event)}).then() : Mono.error(new IllegalArgumentException("CRI scheme not supported"));
            }
        } else {
            error = Mono.error(new IllegalArgumentException("Not Authorized to send to " + event.cri()));
        }
        return error;
    }

    private void validateReplyTo(Event<byte[]> event) {
        String str = event.metadata().get("reply-to");
        if (str != null) {
            if (str.contains("*")) {
                throw new IllegalArgumentException("reply-to header invalid * are not allowed");
            }
            try {
                String scope = CRI.create(str).scope();
                if (scope != null) {
                    int indexOf = scope.indexOf(":");
                    if (indexOf != -1) {
                        scope = scope.substring(0, indexOf);
                    }
                    if (!scope.equals(ContinuumUtil.safeEncodeURI(this.session.participant().getIdentity()))) {
                        throw new IllegalArgumentException("reply-to header invalid, scope: " + scope + " is not valid for authenticated participant");
                    }
                }
            } catch (Exception e) {
                throw new IllegalArgumentException("reply-to header invalid " + e.getMessage());
            }
        }
    }

    public void subscribe(CRI cri, String str, BaseSubscriber<Event<byte[]>> baseSubscriber) {
        Validate.notNull(cri, "CRI must not be null", new Object[0]);
        Validate.notEmpty(str, "subscriptionIdentifier must not be empty", new Object[0]);
        Validate.notNull(baseSubscriber, "Subscriber must not be null", new Object[0]);
        if (!this.session.subscribeAllowed(cri)) {
            throw new IllegalArgumentException("Not Authorized to subscribe to " + cri);
        }
        if (cri.scheme().equals("srv")) {
            this.services.eventBusService.listen(cri.baseResource()).doOnNext(event -> {
                String str2 = event.metadata().get("reply-to");
                if (str2 != null) {
                    if (str2.contains("*")) {
                        log.warn("reply-to header contains * and will NOT be ALLOWED for message " + event);
                    } else {
                        this.session.addTemporarySendAllowed(str2);
                    }
                }
            }).subscribe(baseSubscriber);
            this.subscriptions.put(str, baseSubscriber);
            if (log.isDebugEnabled()) {
                log.debug("New subscription cri: " + cri.raw() + " id: " + str + " for login: " + this.session.participant());
                return;
            }
            return;
        }
        if (!cri.scheme().equals("stream")) {
            throw new IllegalArgumentException("CRI scheme not supported");
        }
        this.services.eventStreamService.listen(cri).subscribe(baseSubscriber);
        this.subscriptions.put(str, baseSubscriber);
        if (log.isDebugEnabled()) {
            log.debug("New subscription cri: " + cri.raw() + " id: " + str + " for login: " + this.session.participant());
        }
    }

    public void unsubscribe(String str) {
        Validate.notEmpty(str, "subscriptionIdentifier must not be empty", new Object[0]);
        if (this.subscriptions.containsKey(str)) {
            this.subscriptions.remove(str).cancel();
        } else {
            log.debug("No subscription exists for subscriptionIdentifier: " + str);
        }
    }

    public void removeSession() {
        if (this.session != null) {
            this.services.sessionManager.removeSession(this.session.sessionId()).subscribe(bool -> {
                if (bool.booleanValue()) {
                    return;
                }
                log.error("Could not remove sessionId: " + this.session.sessionId());
            }, th -> {
                log.error("Could not remove sessionId: " + this.session.sessionId(), th);
            });
        } else {
            log.error("No session for connection was set");
        }
    }

    public void shutdown() {
        if (this.sessionTimer != -1) {
            this.services.vertx.cancelTimer(this.sessionTimer);
        }
        this.subscriptions.forEach((str, baseSubscriber) -> {
            baseSubscriber.cancel();
        });
        this.subscriptions.clear();
    }
}
