package org.reaktivity.nukleus.http_push.internal.routable.stream;

import java.util.List;
import java.util.Optional;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.MessageHandler;
import org.reaktivity.nukleus.http_push.internal.routable.Route;
import org.reaktivity.nukleus.http_push.internal.routable.Source;
import org.reaktivity.nukleus.http_push.internal.routable.Target;
import org.reaktivity.nukleus.http_push.internal.router.Correlation;
import org.reaktivity.nukleus.http_push.internal.router.RouteKind;
import org.reaktivity.nukleus.http_push.internal.types.HttpHeaderFW;
import org.reaktivity.nukleus.http_push.internal.types.ListFW;
import org.reaktivity.nukleus.http_push.internal.types.OctetsFW;
import org.reaktivity.nukleus.http_push.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.http_push.internal.types.stream.DataFW;
import org.reaktivity.nukleus.http_push.internal.types.stream.EndFW;
import org.reaktivity.nukleus.http_push.internal.types.stream.FrameFW;
import org.reaktivity.nukleus.http_push.internal.types.stream.HttpBeginExFW;
import org.reaktivity.nukleus.http_push.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.http_push.internal.types.stream.WindowFW;
import org.reaktivity.nukleus.http_push.internal.util.HttpHeadersUtil;
import org.reaktivity.nukleus.http_push.internal.util.function.LongObjectBiConsumer;
import org.reaktivity.nukleus.ws.internal.types.stream.WsHeaderFW;

/* loaded from: input_file:org/reaktivity/nukleus/http_push/internal/routable/stream/SourceInputStreamFactory.class */
public final class SourceInputStreamFactory {
    private final LongObjectBiConsumer<Runnable> scheduler;
    private final Source source;
    private final LongFunction<List<Route>> supplyRoutes;
    private final LongSupplier supplyTargetId;
    private final LongObjectBiConsumer<Correlation> correlateNew;
    private final Slab slab;
    private static final Predicate<? super HttpHeaderFW> IS_INJECTED_HEADER = httpHeaderFW -> {
        return HttpHeadersUtil.INJECTED_HEADER_NAME.equals(httpHeaderFW.name().asString());
    };
    private final FrameFW frameRO = new FrameFW();
    private final BeginFW beginRO = new BeginFW();
    private final HttpBeginExFW httpBeginExRO = new HttpBeginExFW();
    private final DataFW dataRO = new DataFW();
    private final EndFW endRO = new EndFW();
    private final WindowFW windowRO = new WindowFW();
    private final ResetFW resetRO = new ResetFW();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/http_push/internal/routable/stream/SourceInputStreamFactory$SourceInputStream.class */
    public final class SourceInputStream {
        private MessageHandler streamState;
        private long sourceId;
        private Target target;
        private long targetId;
        private int storedRequestSize;
        private int pollInterval;

        private SourceInputStream() {
            this.storedRequestSize = 0;
            this.pollInterval = 0;
            this.streamState = (v1, v2, v3, v4) -> {
                beforeBegin(v1, v2, v3, v4);
            };
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void handleStream(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
            this.streamState.onMessage(i, mutableDirectBuffer, i2, i3);
        }

        private void beforeBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
            if (i == 1) {
                processBegin(directBuffer, i2, i3);
            } else {
                processUnexpected(directBuffer, i2, i3);
            }
        }

        private void afterBeginOrData(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 2:
                    processData(directBuffer, i2, i3);
                    return;
                case 3:
                    processEnd(directBuffer, i2, i3);
                    return;
                default:
                    processUnexpected(directBuffer, i2, i3);
                    return;
            }
        }

        private void afterScheduledPoll(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 2:
                    return;
                case 3:
                    this.target.removeThrottle(this.targetId);
                    SourceInputStreamFactory.this.endRO.wrap(directBuffer, i2, i2 + i3);
                    SourceInputStreamFactory.this.source.removeStream(SourceInputStreamFactory.this.endRO.streamId());
                    return;
                default:
                    processUnexpected(directBuffer, i2, i3);
                    return;
            }
        }

        private void afterEnd(int i, DirectBuffer directBuffer, int i2, int i3) {
            processUnexpected(directBuffer, i2, i3);
        }

        private void afterReplyOrReset(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
            if (i == 2) {
                SourceInputStreamFactory.this.dataRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3);
                SourceInputStreamFactory.this.source.doWindow(SourceInputStreamFactory.this.dataRO.streamId(), i3);
            } else if (i == 3) {
                SourceInputStreamFactory.this.endRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3);
                SourceInputStreamFactory.this.source.removeStream(SourceInputStreamFactory.this.endRO.streamId());
                this.streamState = (v1, v2, v3, v4) -> {
                    afterEnd(v1, v2, v3, v4);
                };
            }
        }

        private void processUnexpected(DirectBuffer directBuffer, int i, int i2) {
            SourceInputStreamFactory.this.frameRO.wrap(directBuffer, i, i + i2);
            SourceInputStreamFactory.this.source.doReset(SourceInputStreamFactory.this.frameRO.streamId());
            this.streamState = this::afterReplyOrReset;
        }

        private void processInvalidRequest(DirectBuffer directBuffer, int i, int i2, long j, String str) {
            Optional<Route> resolveReplyTo = resolveReplyTo(j);
            if (!resolveReplyTo.isPresent()) {
                processUnexpected(directBuffer, i, i2);
            } else {
                resolveReplyTo.get().target().doHttpEnd(SourceInputStreamFactory.this.supplyTargetId.getAsLong());
                this.streamState = this::afterReplyOrReset;
            }
        }

        private void processBegin(DirectBuffer directBuffer, int i, int i2) {
            SourceInputStreamFactory.this.beginRO.wrap(directBuffer, i, i + i2);
            long streamId = SourceInputStreamFactory.this.beginRO.streamId();
            long sourceRef = SourceInputStreamFactory.this.beginRO.sourceRef();
            long correlationId = SourceInputStreamFactory.this.beginRO.correlationId();
            Optional<Route> resolveTarget = resolveTarget(sourceRef);
            if (!resolveTarget.isPresent()) {
                processInvalidRequest(directBuffer, i, i2, sourceRef, "400");
                return;
            }
            long asLong = SourceInputStreamFactory.this.supplyTargetId.getAsLong();
            Route route = resolveTarget.get();
            Target target = route.target();
            long targetRef = route.targetRef();
            long streamId2 = SourceInputStreamFactory.this.beginRO.streamId();
            OctetsFW extension = SourceInputStreamFactory.this.beginRO.extension();
            HttpBeginExFW httpBeginExFW = SourceInputStreamFactory.this.httpBeginExRO;
            httpBeginExFW.getClass();
            extension.get(httpBeginExFW::wrap);
            ListFW<HttpHeaderFW> headers = SourceInputStreamFactory.this.httpBeginExRO.headers();
            int acquire = SourceInputStreamFactory.this.slab.acquire(streamId2);
            if (acquire == -1) {
                target.doHttpBegin(asLong, targetRef, asLong, builder -> {
                    builder.set(SourceInputStreamFactory.this.beginRO.extension());
                });
                target.addThrottle(asLong, (v1, v2, v3, v4) -> {
                    handleThrottle(v1, v2, v3, v4);
                });
                SourceInputStreamFactory.this.correlateNew.accept(asLong, (long) new Correlation(correlationId, SourceInputStreamFactory.this.source.routableName(), RouteKind.OUTPUT_ESTABLISHED, acquire, this.storedRequestSize));
                this.sourceId = streamId;
                this.target = target;
                this.targetId = asLong;
                this.streamState = (v1, v2, v3, v4) -> {
                    afterBeginOrData(v1, v2, v3, v4);
                };
                return;
            }
            MutableDirectBuffer buffer = SourceInputStreamFactory.this.slab.buffer(acquire);
            storeHeadersForTargetEstablish(headers, buffer);
            if (headers.anyMatch(HttpHeadersUtil.IS_POLL_HEADER) && headers.anyMatch(SourceInputStreamFactory.IS_INJECTED_HEADER)) {
                HttpHeadersUtil.forEachMatch(headers, HttpHeadersUtil.IS_POLL_HEADER, httpHeaderFW -> {
                    this.pollInterval = Integer.parseInt(httpHeaderFW.value().asString());
                });
                schedulePoll(asLong, asLong, target, targetRef, streamId2, buffer, acquire);
                this.streamState = (v1, v2, v3, v4) -> {
                    afterScheduledPoll(v1, v2, v3, v4);
                };
            } else {
                target.doHttpBegin(asLong, targetRef, asLong, builder2 -> {
                    builder2.set(SourceInputStreamFactory.this.beginRO.extension());
                });
                target.addThrottle(asLong, (v1, v2, v3, v4) -> {
                    handleThrottle(v1, v2, v3, v4);
                });
                this.streamState = (v1, v2, v3, v4) -> {
                    afterBeginOrData(v1, v2, v3, v4);
                };
            }
            SourceInputStreamFactory.this.correlateNew.accept(asLong, (long) new Correlation(correlationId, SourceInputStreamFactory.this.source.routableName(), RouteKind.OUTPUT_ESTABLISHED, acquire, this.storedRequestSize));
            this.sourceId = streamId;
            this.target = target;
            this.targetId = asLong;
        }

        private void storeHeadersForTargetEstablish(ListFW<HttpHeaderFW> listFW, MutableDirectBuffer mutableDirectBuffer) {
            mutableDirectBuffer.putBytes(0, listFW.buffer(), listFW.offset(), listFW.sizeof());
            this.storedRequestSize = listFW.sizeof();
        }

        private void schedulePoll(long j, long j2, Target target, long j3, long j4, DirectBuffer directBuffer, int i) {
            SourceInputStreamFactory.this.scheduler.accept(System.currentTimeMillis() + (this.pollInterval * WsHeaderFW.STATUS_NORMAL_CLOSURE), (long) () -> {
                ListFW<HttpHeaderFW> wrap = SourceInputStreamFactory.this.httpBeginExRO.headers().wrap(directBuffer, 0, this.storedRequestSize);
                Predicate predicate = httpHeaderFW -> {
                    return HttpHeadersUtil.INJECTED_HEADER_NAME.equals(httpHeaderFW.name().asString());
                };
                if (wrap.anyMatch(HttpHeadersUtil.INJECTED_HEADER_AND_NO_CACHE) && wrap.anyMatch(HttpHeadersUtil.NO_CACHE_CACHE_CONTROL)) {
                    predicate = predicate.or(httpHeaderFW2 -> {
                        return "cache-control".equals(httpHeaderFW2.name().asString());
                    });
                }
                Predicate negate = predicate.negate();
                target.doHttpBegin2(this.targetId, j3, this.targetId, builder -> {
                    wrap.forEach(httpHeaderFW3 -> {
                        if (negate.test(httpHeaderFW3)) {
                            builder.item(builder -> {
                                builder.representation((byte) 0).name(httpHeaderFW3.name()).value(httpHeaderFW3.value());
                            });
                        }
                    });
                });
                target.doHttpEnd(this.targetId);
            });
        }

        private void processData(DirectBuffer directBuffer, int i, int i2) {
            SourceInputStreamFactory.this.dataRO.wrap(directBuffer, i, i + i2);
            this.target.doHttpData(this.targetId, SourceInputStreamFactory.this.dataRO.payload());
        }

        private void processEnd(DirectBuffer directBuffer, int i, int i2) {
            SourceInputStreamFactory.this.endRO.wrap(directBuffer, i, i + i2);
            this.target.doHttpEnd(this.targetId);
            long streamId = SourceInputStreamFactory.this.endRO.streamId();
            this.streamState = (v1, v2, v3, v4) -> {
                afterEnd(v1, v2, v3, v4);
            };
            SourceInputStreamFactory.this.source.removeStream(streamId);
            this.target.removeThrottle(this.targetId);
        }

        private Optional<Route> resolveTarget(long j) {
            return ((List) SourceInputStreamFactory.this.supplyRoutes.apply(j)).stream().findFirst();
        }

        private Optional<Route> resolveReplyTo(long j) {
            List list = (List) SourceInputStreamFactory.this.supplyRoutes.apply(j);
            return list.stream().filter(Route.sourceMatches(SourceInputStreamFactory.this.source.routableName())).findFirst();
        }

        private void handleThrottle(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1073741825:
                    processReset(directBuffer, i2, i3);
                    return;
                case 1073741826:
                    processWindow(directBuffer, i2, i3);
                    return;
                default:
                    return;
            }
        }

        private void processWindow(DirectBuffer directBuffer, int i, int i2) {
            SourceInputStreamFactory.this.windowRO.wrap(directBuffer, i, i + i2);
            SourceInputStreamFactory.this.source.doWindow(this.sourceId, SourceInputStreamFactory.this.windowRO.update());
        }

        private void processReset(DirectBuffer directBuffer, int i, int i2) {
            SourceInputStreamFactory.this.resetRO.wrap(directBuffer, i, i + i2);
            SourceInputStreamFactory.this.source.doReset(this.sourceId);
        }
    }

    public SourceInputStreamFactory(Source source, LongFunction<List<Route>> longFunction, LongSupplier longSupplier, LongObjectBiConsumer<Correlation> longObjectBiConsumer, Slab slab, LongObjectBiConsumer<Runnable> longObjectBiConsumer2) {
        this.source = source;
        this.supplyRoutes = longFunction;
        this.supplyTargetId = longSupplier;
        this.correlateNew = longObjectBiConsumer;
        this.slab = slab;
        this.scheduler = longObjectBiConsumer2;
    }

    public MessageHandler newStream() {
        SourceInputStream sourceInputStream = new SourceInputStream();
        sourceInputStream.getClass();
        return (i, mutableDirectBuffer, i2, i3) -> {
            sourceInputStream.handleStream(i, mutableDirectBuffer, i2, i3);
        };
    }
}
