package org.reaktivity.nukleus.ws.internal.routable.stream;

import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.Base64;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.MessageHandler;
import org.reaktivity.nukleus.http2.internal.types.stream.Http2Flags;
import org.reaktivity.nukleus.ws.internal.routable.Route;
import org.reaktivity.nukleus.ws.internal.routable.Source;
import org.reaktivity.nukleus.ws.internal.routable.Target;
import org.reaktivity.nukleus.ws.internal.router.Correlation;
import org.reaktivity.nukleus.ws.internal.router.RouteKind;
import org.reaktivity.nukleus.ws.internal.types.OctetsFW;
import org.reaktivity.nukleus.ws.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.ws.internal.types.stream.DataFW;
import org.reaktivity.nukleus.ws.internal.types.stream.EndFW;
import org.reaktivity.nukleus.ws.internal.types.stream.FrameFW;
import org.reaktivity.nukleus.ws.internal.types.stream.HttpBeginExFW;
import org.reaktivity.nukleus.ws.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.ws.internal.types.stream.WindowFW;
import org.reaktivity.nukleus.ws.internal.types.stream.WsFrameFW;
import org.reaktivity.nukleus.ws.internal.util.function.LongObjectBiConsumer;

/* loaded from: input_file:org/reaktivity/nukleus/ws/internal/routable/stream/SourceInputStreamFactory.class */
public final class SourceInputStreamFactory {
    private static final byte[] HANDSHAKE_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StandardCharsets.UTF_8);
    private static final String WEBSOCKET_VERSION_13 = "13";
    private static final int HEADER_SIZE_PAYLOAD_8_WITH_MASKING_KEY = 6;
    private static final int HEADER_SIZE_EXTENDED_PAYLOAD_16_WITH_MASKING_KEY = 8;
    private static final int HEADER_SIZE_EXTENDED_PAYLOAD_64_WITH_MASKING_KEY = 14;
    private static final int SLAB_SLOT_NOT_ALLOCATED = -1;
    private final MessageDigest sha1 = initSHA1();
    private final FrameFW frameRO = new FrameFW();
    private final BeginFW beginRO = new BeginFW();
    private final DataFW dataRO = new DataFW();
    private final EndFW endRO = new EndFW();
    private final OctetsFW octetsRO = new OctetsFW();
    private final HttpBeginExFW httpBeginExRO = new HttpBeginExFW();
    private final WsFrameFW wsFrameRO = new WsFrameFW();
    private final WindowFW windowRO = new WindowFW();
    private final ResetFW resetRO = new ResetFW();
    private final Source source;
    private final LongFunction<List<Route>> supplyRoutes;
    private final LongSupplier supplyTargetId;
    private final LongObjectBiConsumer<Correlation> correlateNew;
    private final Slab slab;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/ws/internal/routable/stream/SourceInputStreamFactory$SourceInputStream.class */
    public final class SourceInputStream {
        private MessageHandler streamState;
        private long sourceId;
        private Target target;
        private long targetId;
        private int slabSlot;
        private int slabSlotLimit;
        private int slabSlotOffset;

        private SourceInputStream() {
            this.slabSlot = -1;
            this.slabSlotLimit = 0;
            this.slabSlotOffset = 0;
            this.streamState = (v1, v2, v3, v4) -> {
                beforeBegin(v1, v2, v3, v4);
            };
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void handleStream(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
            this.streamState.onMessage(i, mutableDirectBuffer, i2, i3);
        }

        private void beforeBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
            if (i == 1) {
                processBegin(directBuffer, i2, i3);
            } else {
                processUnexpected(directBuffer, i2, i3);
            }
        }

        private void afterBeginOrData(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 2:
                    processData(directBuffer, i2, i3);
                    return;
                case 3:
                    processEnd(directBuffer, i2, i3);
                    return;
                default:
                    processUnexpected(directBuffer, i2, i3);
                    return;
            }
        }

        private void afterEnd(int i, DirectBuffer directBuffer, int i2, int i3) {
            processUnexpected(directBuffer, i2, i3);
        }

        private void afterReplyOrReset(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
            if (i == 2) {
                SourceInputStreamFactory.this.dataRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3);
                SourceInputStreamFactory.this.source.doWindow(SourceInputStreamFactory.this.dataRO.streamId(), i3);
            } else if (i == 3) {
                SourceInputStreamFactory.this.endRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3);
                SourceInputStreamFactory.this.source.removeStream(SourceInputStreamFactory.this.endRO.streamId());
                this.streamState = (v1, v2, v3, v4) -> {
                    afterEnd(v1, v2, v3, v4);
                };
            }
        }

        private void processUnexpected(DirectBuffer directBuffer, int i, int i2) {
            SourceInputStreamFactory.this.frameRO.wrap(directBuffer, i, i + i2);
            SourceInputStreamFactory.this.source.doReset(SourceInputStreamFactory.this.frameRO.streamId());
            this.streamState = this::afterReplyOrReset;
        }

        private void processInvalidRequest(DirectBuffer directBuffer, int i, int i2, long j, String str) {
            Optional<Route> resolveReplyTo = resolveReplyTo(j);
            if (!resolveReplyTo.isPresent()) {
                processUnexpected(directBuffer, i, i2);
                return;
            }
            Route route = resolveReplyTo.get();
            Target target = route.target();
            long targetRef = route.targetRef();
            long asLong = SourceInputStreamFactory.this.supplyTargetId.getAsLong();
            target.doHttpBegin(asLong, targetRef, asLong, builder -> {
                builder.item(builder -> {
                    builder.name(":status").value(str);
                });
            });
            target.doHttpEnd(asLong);
            this.streamState = this::afterReplyOrReset;
        }

        private void processBegin(DirectBuffer directBuffer, int i, int i2) {
            BeginFW wrap = SourceInputStreamFactory.this.beginRO.wrap(directBuffer, i, i + i2);
            long streamId = wrap.streamId();
            long sourceRef = wrap.sourceRef();
            long correlationId = wrap.correlationId();
            OctetsFW extension = wrap.extension();
            HttpBeginExFW httpBeginExFW = SourceInputStreamFactory.this.httpBeginExRO;
            httpBeginExFW.getClass();
            HttpBeginExFW httpBeginExFW2 = (HttpBeginExFW) extension.get(httpBeginExFW::wrap);
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            httpBeginExFW2.headers().forEach(httpHeaderFW -> {
                linkedHashMap.merge(httpHeaderFW.name().asString(), httpHeaderFW.value().asString(), (str, str2) -> {
                    return String.format("%s, %s", str, str2);
                });
            });
            String str = (String) linkedHashMap.get("sec-websocket-version");
            String str2 = (String) linkedHashMap.get("sec-websocket-key");
            String str3 = (String) linkedHashMap.get("sec-websocket-protocol");
            if (str2 == null || !SourceInputStreamFactory.WEBSOCKET_VERSION_13.equals(str)) {
                processInvalidRequest(directBuffer, i, i2, sourceRef, "404");
            } else {
                Optional<Route> resolveTarget = resolveTarget(sourceRef, str3);
                if (resolveTarget.isPresent()) {
                    long asLong = SourceInputStreamFactory.this.supplyTargetId.getAsLong();
                    SourceInputStreamFactory.this.sha1.reset();
                    SourceInputStreamFactory.this.sha1.update(str2.getBytes(StandardCharsets.US_ASCII));
                    String str4 = new String(Base64.getEncoder().encode(SourceInputStreamFactory.this.sha1.digest(SourceInputStreamFactory.HANDSHAKE_GUID)), StandardCharsets.US_ASCII);
                    Route route = resolveTarget.get();
                    Target target = route.target();
                    long targetRef = route.targetRef();
                    String resolveProtocol = resolveProtocol(str3, route.protocol());
                    SourceInputStreamFactory.this.correlateNew.accept(asLong, (long) new Correlation(correlationId, SourceInputStreamFactory.this.source.routableName(), RouteKind.OUTPUT_ESTABLISHED, str4, resolveProtocol));
                    target.doWsBegin(asLong, targetRef, asLong, resolveProtocol);
                    target.addThrottle(asLong, (v1, v2, v3, v4) -> {
                        handleThrottle(v1, v2, v3, v4);
                    });
                    this.sourceId = streamId;
                    this.target = target;
                    this.targetId = asLong;
                } else {
                    processInvalidRequest(directBuffer, i, i2, sourceRef, "400");
                }
            }
            this.streamState = (v1, v2, v3, v4) -> {
                afterBeginOrData(v1, v2, v3, v4);
            };
        }

        private void processData(DirectBuffer directBuffer, int i, int i2) {
            DataFW wrap = SourceInputStreamFactory.this.dataRO.wrap(directBuffer, i, i + i2);
            OctetsFW payload = wrap.payload();
            if (this.slabSlotLimit != 0) {
                MutableDirectBuffer buffer = SourceInputStreamFactory.this.slab.buffer(this.slabSlot, this.slabSlotOffset);
                buffer.putBytes(this.slabSlotLimit, payload.buffer(), payload.offset(), payload.sizeof());
                this.slabSlotLimit += wrap.length();
                payload = SourceInputStreamFactory.this.octetsRO.wrap((DirectBuffer) buffer, 0, this.slabSlotLimit);
            }
            processPayload(payload, wrap.streamId());
        }

        private void processEnd(DirectBuffer directBuffer, int i, int i2) {
            SourceInputStreamFactory.this.endRO.wrap(directBuffer, i, i + i2);
            long streamId = SourceInputStreamFactory.this.endRO.streamId();
            this.target.doWsEnd(this.targetId, WsFrameFW.STATUS_NORMAL_CLOSURE);
            this.streamState = (v1, v2, v3, v4) -> {
                afterEnd(v1, v2, v3, v4);
            };
            SourceInputStreamFactory.this.source.removeStream(streamId);
            this.target.removeThrottle(this.targetId);
        }

        private int processPayload(OctetsFW octetsFW, long j) {
            int i;
            DirectBuffer buffer = octetsFW.buffer();
            int offset = octetsFW.offset();
            int limit = octetsFW.limit();
            int i2 = 0;
            int i3 = offset;
            while (true) {
                i = i3;
                if (i >= limit) {
                    break;
                }
                if (SourceInputStreamFactory.this.wsFrameRO.canWrap(buffer, i, limit)) {
                    SourceInputStreamFactory.this.wsFrameRO.wrap(buffer, i, limit);
                    if (!SourceInputStreamFactory.this.wsFrameRO.mask() || SourceInputStreamFactory.this.wsFrameRO.maskingKey() == 0) {
                        this.target.doWsEnd(this.targetId, WsFrameFW.STATUS_PROTOCOL_ERROR);
                    } else {
                        i2 = processFrame(i2);
                    }
                    i3 = SourceInputStreamFactory.this.wsFrameRO.limit();
                } else if (this.slabSlot == -1) {
                    this.slabSlot = SourceInputStreamFactory.this.slab.acquire(j);
                    SourceInputStreamFactory.this.slab.buffer(this.slabSlot).putBytes(0, buffer, i, limit);
                    this.slabSlotLimit = limit - i;
                } else {
                    this.slabSlotOffset = i;
                }
            }
            if (i == limit && this.slabSlot != -1) {
                SourceInputStreamFactory.this.slab.release(this.slabSlot);
                this.slabSlotOffset = 0;
                this.slabSlot = -1;
            }
            return i2;
        }

        private int processFrame(int i) {
            int maskingKey = SourceInputStreamFactory.this.wsFrameRO.maskingKey();
            DirectBuffer payload = SourceInputStreamFactory.this.wsFrameRO.payload();
            switch (SourceInputStreamFactory.this.wsFrameRO.opcode()) {
                case 1:
                    i += this.target.doWsData(this.targetId, 129, maskingKey, payload);
                    break;
                case 2:
                    i += this.target.doWsData(this.targetId, 130, maskingKey, payload);
                    break;
                case 8:
                    this.target.doWsEnd(this.targetId, payload.capacity() >= 2 ? payload.getShort(0) : (short) 1000);
                    break;
                default:
                    throw new IllegalStateException("not yet implemented");
            }
            return i;
        }

        private Optional<Route> resolveTarget(long j, String str) {
            List list = (List) SourceInputStreamFactory.this.supplyRoutes.apply(j);
            return list.stream().filter(Route.protocolMatches(str)).findFirst();
        }

        private String resolveProtocol(String str, String str2) {
            if (str == null || !str.contains(str2)) {
                return null;
            }
            return str2;
        }

        private Optional<Route> resolveReplyTo(long j) {
            List list = (List) SourceInputStreamFactory.this.supplyRoutes.apply(j);
            return list.stream().filter(Route.sourceMatches(SourceInputStreamFactory.this.source.routableName())).findFirst();
        }

        private void handleThrottle(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1073741825:
                    processReset(directBuffer, i2, i3);
                    return;
                case 1073741826:
                    processWindow(directBuffer, i2, i3);
                    return;
                default:
                    return;
            }
        }

        private void processWindow(DirectBuffer directBuffer, int i, int i2) {
            SourceInputStreamFactory.this.windowRO.wrap(directBuffer, i, i + i2);
            int update = SourceInputStreamFactory.this.windowRO.update();
            SourceInputStreamFactory.this.source.doWindow(this.sourceId, update + SourceInputStreamFactory.headerSize(update));
        }

        private void processReset(DirectBuffer directBuffer, int i, int i2) {
            SourceInputStreamFactory.this.resetRO.wrap(directBuffer, i, i + i2);
            SourceInputStreamFactory.this.source.doReset(this.sourceId);
        }
    }

    public SourceInputStreamFactory(Source source, LongFunction<List<Route>> longFunction, LongSupplier longSupplier, LongObjectBiConsumer<Correlation> longObjectBiConsumer, Slab slab) {
        this.source = source;
        this.supplyRoutes = longFunction;
        this.supplyTargetId = longSupplier;
        this.correlateNew = longObjectBiConsumer;
        this.slab = slab;
    }

    public MessageHandler newStream() {
        SourceInputStream sourceInputStream = new SourceInputStream();
        sourceInputStream.getClass();
        return (i, mutableDirectBuffer, i2, i3) -> {
            sourceInputStream.handleStream(i, mutableDirectBuffer, i2, i3);
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int headerSize(int i) {
        switch (Integer.highestOneBit(i)) {
            case 0:
            case 1:
            case 2:
            case 4:
            case 8:
            case 16:
            case Http2Flags.PRIORITY /* 32 */:
                return HEADER_SIZE_PAYLOAD_8_WITH_MASKING_KEY;
            case 64:
                return headerSize64to127(i);
            case 128:
                return 8;
            default:
                return HEADER_SIZE_EXTENDED_PAYLOAD_64_WITH_MASKING_KEY;
        }
    }

    private static int headerSize64to127(int i) {
        switch (i) {
            case 126:
            case 127:
                return 8;
            default:
                return HEADER_SIZE_PAYLOAD_8_WITH_MASKING_KEY;
        }
    }

    private static MessageDigest initSHA1() {
        try {
            return MessageDigest.getInstance("SHA-1");
        } catch (Exception e) {
            LangUtil.rethrowUnchecked(e);
            return null;
        }
    }
}
