package org.reaktivity.nukleus.tcp.internal.stream;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SocketChannel;
import java.util.Objects;
import java.util.function.LongSupplier;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.Configuration;
import org.reaktivity.nukleus.buffer.BufferPool;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.route.RouteManager;
import org.reaktivity.nukleus.stream.StreamFactory;
import org.reaktivity.nukleus.tcp.internal.poller.Poller;
import org.reaktivity.nukleus.tcp.internal.poller.PollerKey;
import org.reaktivity.nukleus.tcp.internal.types.OctetsFW;
import org.reaktivity.nukleus.tcp.internal.types.control.RouteFW;
import org.reaktivity.nukleus.tcp.internal.types.control.TcpRouteExFW;
import org.reaktivity.nukleus.tcp.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.tcp.internal.util.IpUtil;

/* loaded from: input_file:org/reaktivity/nukleus/tcp/internal/stream/ServerStreamFactory.class */
public class ServerStreamFactory implements StreamFactory {
    private final RouteFW routeRO = new RouteFW();
    private final TcpRouteExFW routeExRO = new TcpRouteExFW();
    private final BeginFW beginRO = new BeginFW();
    private final RouteManager router;
    private final LongSupplier supplyStreamId;
    private final LongSupplier supplyCorrelationId;
    private final Long2ObjectHashMap<Correlation> correlations;
    private final Poller poller;
    private final BufferPool bufferPool;
    private final LongSupplier incrementOverflow;
    private final ByteBuffer readByteBuffer;
    private final MutableDirectBuffer readBuffer;
    private final ByteBuffer writeByteBuffer;
    private final MessageWriter writer;

    public ServerStreamFactory(Configuration configuration, RouteManager routeManager, MutableDirectBuffer mutableDirectBuffer, BufferPool bufferPool, LongSupplier longSupplier, LongSupplier longSupplier2, LongSupplier longSupplier3, Long2ObjectHashMap<Correlation> long2ObjectHashMap, Poller poller) {
        this.router = (RouteManager) Objects.requireNonNull(routeManager);
        this.writeByteBuffer = ByteBuffer.allocateDirect(mutableDirectBuffer.capacity()).order(ByteOrder.nativeOrder());
        this.writer = new MessageWriter((MutableDirectBuffer) Objects.requireNonNull(mutableDirectBuffer));
        this.bufferPool = bufferPool;
        this.incrementOverflow = longSupplier;
        this.supplyStreamId = (LongSupplier) Objects.requireNonNull(longSupplier2);
        this.supplyCorrelationId = longSupplier3;
        this.correlations = (Long2ObjectHashMap) Objects.requireNonNull(long2ObjectHashMap);
        this.readByteBuffer = ByteBuffer.allocateDirect(Math.min(mutableDirectBuffer.capacity() - 10, 65535)).order(ByteOrder.nativeOrder());
        this.readBuffer = new UnsafeBuffer(this.readByteBuffer);
        this.poller = poller;
    }

    public MessageConsumer newStream(int i, DirectBuffer directBuffer, int i2, int i3, MessageConsumer messageConsumer) {
        BeginFW wrap = this.beginRO.wrap(directBuffer, i2, i2 + i3);
        long sourceRef = wrap.sourceRef();
        if (sourceRef == 0) {
            return newConnectReplyStream(wrap, messageConsumer);
        }
        long streamId = wrap.streamId();
        this.writer.doReset(messageConsumer, streamId);
        throw new IllegalArgumentException(String.format("Stream id %d is not a reply stream, sourceRef %d is non-zero", Long.valueOf(streamId), Long.valueOf(sourceRef)));
    }

    public void onAccepted(String str, long j, SocketChannel socketChannel, InetSocketAddress inetSocketAddress) {
        RouteFW routeFW = (RouteFW) this.router.resolve((i, directBuffer, i2, i3) -> {
            RouteFW wrap = this.routeRO.wrap(directBuffer, i2, i3);
            OctetsFW extension = this.routeRO.extension();
            InetAddress inetAddress = null;
            if (extension.sizeof() > 0) {
                TcpRouteExFW tcpRouteExFW = this.routeExRO;
                tcpRouteExFW.getClass();
                inetAddress = IpUtil.inetAddress(((TcpRouteExFW) extension.get(tcpRouteExFW::wrap)).address());
            }
            return j == wrap.sourceRef() && str.equals(wrap.source().asString()) && IpUtil.addressesMatch(inetSocketAddress, new InetSocketAddress(inetAddress, (int) j));
        }, this::wrapRoute);
        if (routeFW == null) {
            CloseHelper.close(socketChannel);
            return;
        }
        long targetRef = routeFW.targetRef();
        String asString = routeFW.target().asString();
        long asLong = this.supplyStreamId.getAsLong();
        long asLong2 = this.supplyCorrelationId.getAsLong();
        try {
            InetSocketAddress inetSocketAddress2 = (InetSocketAddress) socketChannel.getLocalAddress();
            InetSocketAddress inetSocketAddress3 = (InetSocketAddress) socketChannel.getRemoteAddress();
            MessageConsumer supplyTarget = this.router.supplyTarget(asString);
            this.writer.doTcpBegin(supplyTarget, asLong, targetRef, asLong2, inetSocketAddress2, inetSocketAddress3);
            PollerKey doRegister = this.poller.doRegister(socketChannel, 0, null);
            ReadStream readStream = new ReadStream(supplyTarget, asLong, doRegister, socketChannel, this.readByteBuffer, this.readBuffer, this.writer);
            readStream.getClass();
            this.correlations.put(asLong2, new Correlation(str, socketChannel, readStream::setCorrelatedThrottle, supplyTarget, asLong));
            RouteManager routeManager = this.router;
            readStream.getClass();
            routeManager.setThrottle(asString, asLong, readStream::handleThrottle);
            readStream.getClass();
            doRegister.handler(1, readStream::handleStream);
        } catch (IOException e) {
            CloseHelper.quietClose(socketChannel);
            LangUtil.rethrowUnchecked(e);
        }
    }

    private MessageConsumer newConnectReplyStream(BeginFW beginFW, MessageConsumer messageConsumer) {
        MessageConsumer messageConsumer2 = null;
        Correlation correlation = (Correlation) this.correlations.remove(beginFW.correlationId());
        long streamId = beginFW.streamId();
        if (correlation != null) {
            correlation.setCorrelatedThrottle(messageConsumer, streamId);
            WriteStream writeStream = new WriteStream(messageConsumer, streamId, correlation.channel(), this.poller, this.incrementOverflow, this.bufferPool, this.writeByteBuffer, this.writer);
            writeStream.setCorrelatedInput(correlation.correlatedStreamId(), correlation.correlatedStream());
            writeStream.doConnected();
            writeStream.getClass();
            messageConsumer2 = writeStream::handleStream;
        } else {
            this.writer.doReset(messageConsumer, streamId);
        }
        return messageConsumer2;
    }

    private RouteFW wrapRoute(int i, DirectBuffer directBuffer, int i2, int i3) {
        return this.routeRO.wrap(directBuffer, i2, i2 + i3);
    }
}
