/*
 * Decompiled with CFR 0.152.
 */
package org.reaktivity.nukleus.oauth.internal.stream;

import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.LongUnaryOperator;
import java.util.function.ToLongFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Long2ObjectHashMap;
import org.jose4j.jwk.JsonWebKey;
import org.jose4j.jws.JsonWebSignature;
import org.jose4j.jwt.JwtClaims;
import org.jose4j.jwt.MalformedClaimException;
import org.jose4j.jwt.NumericDate;
import org.jose4j.jwt.consumer.InvalidJwtException;
import org.jose4j.lang.JoseException;
import org.reaktivity.nukleus.concurrent.SignalingExecutor;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.function.MessagePredicate;
import org.reaktivity.nukleus.oauth.internal.OAuthConfiguration;
import org.reaktivity.nukleus.oauth.internal.stream.Writer;
import org.reaktivity.nukleus.oauth.internal.types.HttpHeaderFW;
import org.reaktivity.nukleus.oauth.internal.types.OctetsFW;
import org.reaktivity.nukleus.oauth.internal.types.String16FW;
import org.reaktivity.nukleus.oauth.internal.types.control.RouteFW;
import org.reaktivity.nukleus.oauth.internal.types.stream.AbortFW;
import org.reaktivity.nukleus.oauth.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.oauth.internal.types.stream.DataFW;
import org.reaktivity.nukleus.oauth.internal.types.stream.EndFW;
import org.reaktivity.nukleus.oauth.internal.types.stream.HttpBeginExFW;
import org.reaktivity.nukleus.oauth.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.oauth.internal.types.stream.SignalFW;
import org.reaktivity.nukleus.oauth.internal.types.stream.WindowFW;
import org.reaktivity.nukleus.oauth.internal.util.BufferUtil;
import org.reaktivity.nukleus.route.RouteManager;
import org.reaktivity.nukleus.stream.StreamFactory;

public class OAuthProxyFactory
implements StreamFactory {
    private static final long EXPIRES_NEVER = Long.MAX_VALUE;
    private static final long EXPIRES_IMMEDIATELY = 0L;
    private static final long REALM_MASK = -281474976710656L;
    private static final long TOKEN_EXPIRED_SIGNAL = 1L;
    private static final Pattern QUERY_PARAMS = Pattern.compile("(?:\\?|.*?&)access_token=([^&#]+)(?:&.*)?");
    private static final byte[] BEARER_PREFIX = "Bearer ".getBytes(StandardCharsets.US_ASCII);
    private static final byte[] QUERY_PREFIX = "?".getBytes(StandardCharsets.US_ASCII);
    private static final byte[] AUTHORIZATION = "authorization".getBytes(StandardCharsets.US_ASCII);
    private static final byte[] PATH = ":path".getBytes(StandardCharsets.US_ASCII);
    private final RouteFW routeRO = new RouteFW();
    private final BeginFW beginRO = new BeginFW();
    private final DataFW dataRO = new DataFW();
    private final EndFW endRO = new EndFW();
    private final HttpBeginExFW httpBeginExRO = new HttpBeginExFW();
    private final WindowFW windowRO = new WindowFW();
    private final ResetFW resetRO = new ResetFW();
    private final AbortFW abortRO = new AbortFW();
    private final SignalFW signalRO = new SignalFW();
    private final JsonWebSignature signature = new JsonWebSignature();
    private final OAuthConfiguration config;
    private final RouteManager router;
    private final LongUnaryOperator supplyInitialId;
    private final LongSupplier supplyTrace;
    private final LongUnaryOperator supplyReplyId;
    private final Function<String, JsonWebKey> lookupKey;
    private final ToLongFunction<JsonWebSignature> lookupAuthorization;
    private final SignalingExecutor executor;
    private final Long2ObjectHashMap<OAuthProxy> correlations;
    private final Writer writer;

    public OAuthProxyFactory(OAuthConfiguration config, MutableDirectBuffer writeBuffer, LongUnaryOperator supplyInitialId, LongSupplier supplyTrace, LongUnaryOperator supplyReplyId, Long2ObjectHashMap<OAuthProxy> correlations, Function<String, JsonWebKey> lookupKey, ToLongFunction<JsonWebSignature> lookupAuthorization, SignalingExecutor executor, RouteManager router) {
        this.config = config;
        this.router = Objects.requireNonNull(router);
        this.writer = new Writer(writeBuffer);
        this.supplyInitialId = Objects.requireNonNull(supplyInitialId);
        this.supplyReplyId = Objects.requireNonNull(supplyReplyId);
        this.supplyTrace = Objects.requireNonNull(supplyTrace);
        this.correlations = correlations;
        this.lookupKey = lookupKey;
        this.lookupAuthorization = lookupAuthorization;
        this.executor = executor;
    }

    public MessageConsumer newStream(int msgTypeId, DirectBuffer buffer, int index, int length, MessageConsumer source) {
        BeginFW begin = this.beginRO.wrap(buffer, index, index + length);
        long streamId = begin.streamId();
        MessageConsumer newStream = (streamId & 1L) != 0L ? this.newInitialStream(begin, source) : this.newReplyStream(begin, source);
        return newStream;
    }

    private MessageConsumer newInitialStream(BeginFW begin, MessageConsumer acceptReply) {
        long acceptAuthorization = begin.authorization();
        JsonWebSignature verified = this.verifiedSignature(begin);
        long connectAuthorization = acceptAuthorization;
        if (verified != null) {
            connectAuthorization = this.lookupAuthorization.applyAsLong(verified);
        }
        long acceptRouteId = begin.routeId();
        MessagePredicate filter = (t, b, o, l) -> true;
        RouteFW route = (RouteFW)this.router.resolve(acceptRouteId, connectAuthorization, filter, this::wrapRoute);
        MessageConsumer newStream = null;
        if (route != null) {
            long acceptInitialId = begin.streamId();
            long traceId = begin.trace();
            OctetsFW extension = begin.extension();
            long connectRouteId = route.correlationId();
            long connectInitialId = this.supplyInitialId.applyAsLong(connectRouteId);
            MessageConsumer connectInitial = this.router.supplyReceiver(connectInitialId);
            long connectReplyId = this.supplyReplyId.applyAsLong(connectInitialId);
            long expiresAtMillis = this.config.expireInFlightRequests() ? OAuthProxyFactory.expiresAtMillis(verified) : Long.MAX_VALUE;
            OAuthProxy initialStream = new OAuthProxy(acceptReply, acceptRouteId, acceptInitialId, acceptAuthorization, connectInitial, connectRouteId, connectInitialId, connectAuthorization, expiresAtMillis);
            this.correlations.put(connectReplyId, (Object)initialStream);
            this.writer.doBegin(connectInitial, connectRouteId, connectInitialId, traceId, connectAuthorization, extension);
            this.router.setThrottle(connectInitialId, (x$0, x$1, x$2, x$3) -> initialStream.onThrottleMessage(x$0, x$1, x$2, x$3));
            newStream = (x$0, x$1, x$2, x$3) -> initialStream.onStreamMessage(x$0, x$1, x$2, x$3);
        }
        return newStream;
    }

    private MessageConsumer newReplyStream(BeginFW begin, MessageConsumer sender) {
        long connectRouteId = begin.routeId();
        long connectReplyId = begin.streamId();
        long traceId = begin.trace();
        long authorization = begin.authorization();
        OctetsFW extension = begin.extension();
        OAuthProxy initialStream = (OAuthProxy)this.correlations.remove(connectReplyId);
        MessageConsumer newStream = null;
        if (initialStream != null) {
            long acceptRouteId = initialStream.sourceRouteId;
            MessageConsumer acceptReply = initialStream.source;
            long expiresAtMillis = initialStream.expiresAtMillis;
            long acceptReplyId = this.supplyReplyId.applyAsLong(initialStream.sourceStreamId);
            long acceptReplyAuthorization = initialStream.sourceAuthorization;
            long connectReplyAuthorization = initialStream.targetAuthorization;
            OAuthProxy replyStream = new OAuthProxy(sender, connectRouteId, connectReplyId, connectReplyAuthorization, acceptReply, acceptRouteId, acceptReplyId, acceptReplyAuthorization, expiresAtMillis);
            this.writer.doBegin(acceptReply, acceptRouteId, acceptReplyId, traceId, authorization, extension);
            this.router.setThrottle(acceptReplyId, (x$0, x$1, x$2, x$3) -> replyStream.onThrottleMessage(x$0, x$1, x$2, x$3));
            newStream = (x$0, x$1, x$2, x$3) -> replyStream.onStreamMessage(x$0, x$1, x$2, x$3);
        }
        return newStream;
    }

    private RouteFW wrapRoute(int msgTypeId, DirectBuffer buffer, int index, int length) {
        return this.routeRO.wrap(buffer, index, index + length);
    }

    private static long expiresAtMillis(JsonWebSignature verified) {
        long expiresAtMillis = Long.MAX_VALUE;
        if (verified != null) {
            try {
                JwtClaims claims = JwtClaims.parse((String)verified.getPayload());
                NumericDate expirationTime = claims.getExpirationTime();
                if (expirationTime != null) {
                    expiresAtMillis = expirationTime.getValueInMillis();
                }
            }
            catch (MalformedClaimException | InvalidJwtException | JoseException ex) {
                expiresAtMillis = 0L;
            }
        }
        return expiresAtMillis;
    }

    private JsonWebSignature verifiedSignature(BeginFW begin) {
        JsonWebSignature verified = null;
        String token = this.bearerToken(begin);
        if (token != null) {
            try {
                this.signature.setCompactSerialization(token);
                String kid = this.signature.getKeyIdHeaderValue();
                String algorithm = this.signature.getAlgorithmHeaderValue();
                JsonWebKey key = this.lookupKey.apply(kid);
                if (algorithm != null && key != null && algorithm.equals(key.getAlgorithm())) {
                    this.signature.setKey(null);
                    this.signature.setKey(key.getKey());
                    JwtClaims claims = JwtClaims.parse((String)this.signature.getPayload());
                    NumericDate expirationTime = claims.getExpirationTime();
                    NumericDate notBefore = claims.getNotBefore();
                    long now = System.currentTimeMillis();
                    if (!(expirationTime != null && now > expirationTime.getValueInMillis() || notBefore != null && now < notBefore.getValueInMillis() || !this.signature.verifySignature())) {
                        verified = this.signature;
                    }
                }
            }
            catch (MalformedClaimException | InvalidJwtException | JoseException throwable) {
                // empty catch block
            }
        }
        return verified;
    }

    private String bearerToken(BeginFW begin) {
        String token = null;
        HttpBeginExFW beginEx = begin.extension().get(this.httpBeginExRO::tryWrap);
        if (beginEx != null) {
            String16FW value;
            int tokenAt;
            HttpHeaderFW authorization;
            String query;
            Matcher matcher;
            String16FW value2;
            int queryAt;
            HttpHeaderFW path = beginEx.headers().matchFirst(h -> BufferUtil.equals(h.name(), PATH));
            if (path != null && (queryAt = BufferUtil.indexOfBytes(value2 = path.value(), QUERY_PREFIX)) != -1 && (matcher = QUERY_PARAMS.matcher(query = path.value().asString().substring(queryAt))).matches()) {
                token = matcher.group(1);
            }
            if ((authorization = beginEx.headers().matchFirst(h -> BufferUtil.equals(h.name(), AUTHORIZATION))) != null && (tokenAt = BufferUtil.limitOfBytes(value = authorization.value(), BEARER_PREFIX)) > 0) {
                DirectBuffer buffer = value.buffer();
                int limit = value.limit();
                token = buffer.getStringWithoutLengthUtf8(tokenAt, limit - tokenAt);
            }
        }
        return token;
    }

    final class OAuthProxy {
        private final MessageConsumer source;
        private final long sourceRouteId;
        private final long sourceStreamId;
        private final long sourceAuthorization;
        private final MessageConsumer target;
        private final long targetRouteId;
        private final long targetStreamId;
        private final long targetAuthorization;
        private final long expiresAtMillis;
        private Future<?> expiryFuture;

        private OAuthProxy(MessageConsumer source, long sourceRouteId, long sourceId, long sourceAuthorization, MessageConsumer target, long targetRouteId, long targetId, long targetAuthorization, long expiresAtMillis) {
            this.source = source;
            this.sourceRouteId = sourceRouteId;
            this.sourceStreamId = sourceId;
            this.sourceAuthorization = sourceAuthorization;
            this.target = target;
            this.targetRouteId = targetRouteId;
            this.targetStreamId = targetId;
            this.targetAuthorization = targetAuthorization;
            this.expiresAtMillis = expiresAtMillis;
        }

        private void onStreamMessage(int msgTypeId, DirectBuffer buffer, int index, int length) {
            switch (msgTypeId) {
                case 1: {
                    BeginFW begin = OAuthProxyFactory.this.beginRO.wrap(buffer, index, index + length);
                    this.onBegin(begin);
                    break;
                }
                case 2: {
                    DataFW data = OAuthProxyFactory.this.dataRO.wrap(buffer, index, index + length);
                    this.onData(data);
                    break;
                }
                case 3: {
                    EndFW end = OAuthProxyFactory.this.endRO.wrap(buffer, index, index + length);
                    this.onEnd(end);
                    break;
                }
                case 4: {
                    AbortFW abort = OAuthProxyFactory.this.abortRO.wrap(buffer, index, index + length);
                    this.onAbort(abort);
                    break;
                }
                case 5: {
                    SignalFW signal = OAuthProxyFactory.this.signalRO.wrap(buffer, index, index + length);
                    this.onSignal(signal);
                    break;
                }
                default: {
                    OAuthProxyFactory.this.writer.doReset(this.source, this.sourceRouteId, this.sourceStreamId, OAuthProxyFactory.this.supplyTrace.getAsLong(), this.sourceAuthorization);
                }
            }
        }

        private void onThrottleMessage(int msgTypeId, DirectBuffer buffer, int index, int length) {
            switch (msgTypeId) {
                case 0x40000002: {
                    WindowFW window = OAuthProxyFactory.this.windowRO.wrap(buffer, index, index + length);
                    this.onWindow(window);
                    break;
                }
                case 0x40000001: {
                    ResetFW reset = OAuthProxyFactory.this.resetRO.wrap(buffer, index, index + length);
                    this.onReset(reset);
                    break;
                }
            }
        }

        private void onBegin(BeginFW begin) {
            if (this.expiresAtMillis != Long.MAX_VALUE) {
                long delay = this.expiresAtMillis - System.currentTimeMillis();
                long routeId = begin.routeId();
                long streamId = begin.streamId();
                this.expiryFuture = OAuthProxyFactory.this.executor.schedule(delay, TimeUnit.MILLISECONDS, routeId, streamId, 1L);
            }
        }

        private void onData(DataFW data) {
            long traceId = data.trace();
            int padding = data.padding();
            long authorization = data.authorization();
            long groupId = data.groupId();
            OctetsFW payload = data.payload();
            OctetsFW extension = data.extension();
            OAuthProxyFactory.this.writer.doData(this.target, this.targetRouteId, this.targetStreamId, traceId, authorization, groupId, padding, payload, extension);
        }

        private void onEnd(EndFW end) {
            long traceId = end.trace();
            OctetsFW extension = end.extension();
            OAuthProxyFactory.this.writer.doEnd(this.target, this.targetRouteId, this.targetStreamId, traceId, this.targetAuthorization, extension);
            this.cancelTimerIfNecessary();
        }

        private void onAbort(AbortFW abort) {
            long traceId = abort.trace();
            OAuthProxyFactory.this.writer.doAbort(this.target, this.targetRouteId, this.targetStreamId, traceId, this.targetAuthorization);
            this.cleanupCorrelationIfNecessary();
            this.cancelTimerIfNecessary();
        }

        private void onSignal(SignalFW signal) {
            long signalId = signal.signalId();
            if (signalId == 1L) {
                long traceId = signal.trace();
                OAuthProxyFactory.this.writer.doAbort(this.target, this.targetRouteId, this.targetStreamId, traceId, this.targetAuthorization);
                OAuthProxyFactory.this.writer.doReset(this.source, this.sourceRouteId, this.sourceStreamId, traceId, this.sourceAuthorization);
                this.cleanupCorrelationIfNecessary();
            }
        }

        private void onWindow(WindowFW window) {
            int credit = window.credit();
            long traceId = window.trace();
            int padding = window.padding();
            long groupId = window.groupId();
            OAuthProxyFactory.this.writer.doWindow(this.source, this.sourceRouteId, this.sourceStreamId, traceId, this.sourceAuthorization, credit, padding, groupId);
        }

        private void onReset(ResetFW reset) {
            long traceId = reset.trace();
            OAuthProxyFactory.this.writer.doReset(this.source, this.sourceRouteId, this.sourceStreamId, traceId, this.sourceAuthorization);
            this.cleanupCorrelationIfNecessary();
            this.cancelTimerIfNecessary();
        }

        private void cleanupCorrelationIfNecessary() {
            long targetInitialId = this.targetStreamId | 1L;
            if (this.targetStreamId == targetInitialId) {
                long targetReplyId = OAuthProxyFactory.this.supplyReplyId.applyAsLong(targetInitialId);
                OAuthProxyFactory.this.correlations.remove(targetReplyId);
            }
        }

        private void cancelTimerIfNecessary() {
            if (this.expiryFuture != null) {
                this.expiryFuture.cancel(true);
                this.expiryFuture = null;
            }
        }
    }
}

