package org.reaktivity.nukleus.http_cache.internal.routable.stream;

import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2LongHashMap;
import org.agrona.concurrent.MessageHandler;
import org.reaktivity.nukleus.http_cache.internal.routable.Source;
import org.reaktivity.nukleus.http_cache.internal.routable.Target;
import org.reaktivity.nukleus.http_cache.internal.routable.stream.ProxyAcceptStreamFactory;
import org.reaktivity.nukleus.http_cache.internal.router.Correlation;
import org.reaktivity.nukleus.http_cache.internal.types.OctetsFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.DataFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.EndFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.FrameFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.HttpBeginExFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.WindowFW;

/* loaded from: input_file:org/reaktivity/nukleus/http_cache/internal/routable/stream/ProxyConnectReplyStreamFactory.class */
public final class ProxyConnectReplyStreamFactory {
    private final FrameFW frameRO = new FrameFW();
    private final BeginFW beginRO = new BeginFW();
    private final DataFW dataRO = new DataFW();
    private final EndFW endRO = new EndFW();
    private final HttpBeginExFW httpBeginExRO = new HttpBeginExFW();
    private final WindowFW windowRO = new WindowFW();
    private final ResetFW resetRO = new ResetFW();
    private final Source source;
    private final Function<String, Target> supplyTarget;
    private final LongSupplier supplyStreamId;
    private final LongFunction<Correlation> correlateEstablished;
    private Int2ObjectHashMap<List<ProxyAcceptStreamFactory.SourceInputStream>> awaitingRequestMatches;
    private Int2ObjectHashMap<ProxyAcceptStreamFactory.SourceInputStream> urlToPendingStream;

    /* loaded from: input_file:org/reaktivity/nukleus/http_cache/internal/routable/stream/ProxyConnectReplyStreamFactory$GroupThrottle.class */
    public class GroupThrottle {
        long groupWaterMark = 0;
        private final Long2LongHashMap streamToWaterMark = new Long2LongHashMap(0);
        private final long replyStreamId;
        private int numParticipants;
        private Source source;

        GroupThrottle(int i, Source source, long j) {
            this.numParticipants = i;
            this.replyStreamId = j;
            this.source = source;
        }

        private void increment(long j, long j2) {
            this.streamToWaterMark.put(j, this.streamToWaterMark.get(j) + j2);
            updateThrottle();
        }

        /* JADX WARN: Type inference failed for: r0v4, types: [org.agrona.collections.Long2LongHashMap$Values] */
        private void updateThrottle() {
            if (this.numParticipants == this.streamToWaterMark.size()) {
                long longValue = ((Long) this.streamToWaterMark.values2().stream().min((v0, v1) -> {
                    return Long.compare(v0, v1);
                }).get()).longValue();
                long j = longValue - this.groupWaterMark;
                if (j > 0) {
                    this.source.doWindow(this.replyStreamId, (int) j);
                    this.groupWaterMark = longValue;
                }
            }
        }

        public void handleThrottle(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1073741825:
                    processReset(directBuffer, i2, i3);
                    return;
                case 1073741826:
                    processWindow(directBuffer, i2, i3);
                    return;
                default:
                    return;
            }
        }

        private void processWindow(DirectBuffer directBuffer, int i, int i2) {
            ProxyConnectReplyStreamFactory.this.windowRO.wrap(directBuffer, i, i + i2);
            int update = ProxyConnectReplyStreamFactory.this.windowRO.update();
            if (update > 0) {
                increment(ProxyConnectReplyStreamFactory.this.windowRO.streamId(), update);
            }
        }

        private void processReset(DirectBuffer directBuffer, int i, int i2) {
            ProxyConnectReplyStreamFactory.this.resetRO.wrap(directBuffer, i, i + i2);
            this.streamToWaterMark.remove(ProxyConnectReplyStreamFactory.this.resetRO.streamId());
            this.numParticipants--;
            if (this.numParticipants == 0) {
                this.source.doReset(this.replyStreamId);
            } else {
                updateThrottle();
            }
        }

        public void optOut() {
            this.numParticipants--;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/http_cache/internal/routable/stream/ProxyConnectReplyStreamFactory$TargetOutputEstablishedStream.class */
    public final class TargetOutputEstablishedStream {
        private MessageHandler streamState;
        private long sourceId;
        private Target target;
        private long targetId;
        private List<ProxyAcceptStreamFactory.SourceInputStream> forwardResponsesTo;

        private TargetOutputEstablishedStream() {
            this.streamState = this::beforeBegin;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void handleStream(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
            this.streamState.onMessage(i, mutableDirectBuffer, i2, i3);
        }

        private void beforeBegin(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
            if (i == 1) {
                processBegin(mutableDirectBuffer, i2, i3);
            } else {
                processUnexpected(mutableDirectBuffer, i2, i3);
            }
        }

        private void afterBeginOrData(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
            if (this.forwardResponsesTo != null) {
                this.forwardResponsesTo.stream().forEach(sourceInputStream -> {
                    sourceInputStream.replyMessageHandler().onMessage(i, mutableDirectBuffer, i2, i3);
                });
            }
            switch (i) {
                case 2:
                    processData(mutableDirectBuffer, i2, i3);
                    return;
                case 3:
                    processEnd(mutableDirectBuffer, i2, i3);
                    return;
                default:
                    processUnexpected(mutableDirectBuffer, i2, i3);
                    return;
            }
        }

        private void afterEnd(int i, DirectBuffer directBuffer, int i2, int i3) {
            processUnexpected(directBuffer, i2, i3);
        }

        private void afterRejectOrReset(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
            if (i == 2) {
                ProxyConnectReplyStreamFactory.this.dataRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3);
                ProxyConnectReplyStreamFactory.this.source.doWindow(ProxyConnectReplyStreamFactory.this.dataRO.streamId(), i3);
            } else if (i == 3) {
                ProxyConnectReplyStreamFactory.this.endRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3);
                ProxyConnectReplyStreamFactory.this.source.removeStream(ProxyConnectReplyStreamFactory.this.endRO.streamId());
                this.streamState = (v1, v2, v3, v4) -> {
                    afterEnd(v1, v2, v3, v4);
                };
            }
        }

        private void processUnexpected(DirectBuffer directBuffer, int i, int i2) {
            ProxyConnectReplyStreamFactory.this.frameRO.wrap(directBuffer, i, i + i2);
            ProxyConnectReplyStreamFactory.this.source.doReset(ProxyConnectReplyStreamFactory.this.frameRO.streamId());
            this.streamState = this::afterRejectOrReset;
        }

        private void processBegin(MutableDirectBuffer mutableDirectBuffer, int i, int i2) {
            ProxyConnectReplyStreamFactory.this.beginRO.wrap((DirectBuffer) mutableDirectBuffer, i, i + i2);
            long streamId = ProxyConnectReplyStreamFactory.this.beginRO.streamId();
            long sourceRef = ProxyConnectReplyStreamFactory.this.beginRO.sourceRef();
            Correlation correlation = (Correlation) ProxyConnectReplyStreamFactory.this.correlateEstablished.apply(ProxyConnectReplyStreamFactory.this.beginRO.correlationId());
            int requestURLHash = correlation.requestURLHash();
            this.forwardResponsesTo = (List) ProxyConnectReplyStreamFactory.this.awaitingRequestMatches.remove(requestURLHash);
            if (requestURLHash == -1 || this.forwardResponsesTo == null) {
                this.forwardResponsesTo = Collections.emptyList();
            }
            if (sourceRef != 0 || correlation == null) {
                processUnexpected(mutableDirectBuffer, i, i2);
                return;
            }
            Target target = (Target) ProxyConnectReplyStreamFactory.this.supplyTarget.apply(correlation.source());
            long asLong = ProxyConnectReplyStreamFactory.this.supplyStreamId.getAsLong();
            long id = correlation.id();
            OctetsFW extension = ProxyConnectReplyStreamFactory.this.beginRO.extension();
            HttpBeginExFW httpBeginExFW = ProxyConnectReplyStreamFactory.this.httpBeginExRO;
            httpBeginExFW.getClass();
            HttpBeginExFW httpBeginExFW2 = (HttpBeginExFW) extension.get(httpBeginExFW::wrap);
            target.doHttpBegin(asLong, 0L, id, builder -> {
                httpBeginExFW2.headers().forEach(httpHeaderFW -> {
                    builder.item(builder -> {
                        builder.representation((byte) 0).name(httpHeaderFW.name().asString()).value(httpHeaderFW.value().asString());
                    });
                });
            });
            GroupThrottle groupThrottle = new GroupThrottle(this.forwardResponsesTo.size() + 1, ProxyConnectReplyStreamFactory.this.source, streamId);
            this.forwardResponsesTo.stream().forEach(sourceInputStream -> {
                sourceInputStream.setReplyThrottle(groupThrottle);
                sourceInputStream.replyMessageHandler().onMessage(1, mutableDirectBuffer, i, i2);
            });
            ProxyAcceptStreamFactory.SourceInputStream sourceInputStream2 = (ProxyAcceptStreamFactory.SourceInputStream) ProxyConnectReplyStreamFactory.this.urlToPendingStream.remove(requestURLHash);
            if (sourceInputStream2 != null) {
                sourceInputStream2.clean();
            }
            groupThrottle.getClass();
            target.addThrottle(asLong, (v1, v2, v3, v4) -> {
                r2.handleThrottle(v1, v2, v3, v4);
            });
            this.sourceId = streamId;
            this.target = target;
            this.targetId = asLong;
            this.streamState = this::afterBeginOrData;
        }

        private void processData(DirectBuffer directBuffer, int i, int i2) {
            ProxyConnectReplyStreamFactory.this.dataRO.wrap(directBuffer, i, i + i2);
            this.target.doHttpData(this.targetId, ProxyConnectReplyStreamFactory.this.dataRO.payload());
        }

        private void processEnd(DirectBuffer directBuffer, int i, int i2) {
            ProxyConnectReplyStreamFactory.this.endRO.wrap(directBuffer, i, i + i2);
            this.target.doHttpEnd(this.targetId);
            this.target.removeThrottle(this.targetId);
            ProxyConnectReplyStreamFactory.this.source.removeStream(this.sourceId);
        }
    }

    public ProxyConnectReplyStreamFactory(Source source, Function<String, Target> function, LongSupplier longSupplier, LongFunction<Correlation> longFunction, Int2ObjectHashMap<ProxyAcceptStreamFactory.SourceInputStream> int2ObjectHashMap, Int2ObjectHashMap<List<ProxyAcceptStreamFactory.SourceInputStream>> int2ObjectHashMap2) {
        this.source = source;
        this.supplyTarget = function;
        this.supplyStreamId = longSupplier;
        this.correlateEstablished = longFunction;
        this.awaitingRequestMatches = int2ObjectHashMap2;
        this.urlToPendingStream = int2ObjectHashMap;
    }

    public MessageHandler newStream() {
        TargetOutputEstablishedStream targetOutputEstablishedStream = new TargetOutputEstablishedStream();
        targetOutputEstablishedStream.getClass();
        return (i, mutableDirectBuffer, i2, i3) -> {
            targetOutputEstablishedStream.handleStream(i, mutableDirectBuffer, i2, i3);
        };
    }
}
