package org.reaktivity.nukleus.kafka.internal.stream;

import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Consumer;
import java.util.function.IntSupplier;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2LongHashMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.buffer.BufferPool;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.kafka.internal.KafkaConfiguration;
import org.reaktivity.nukleus.kafka.internal.function.PartitionProgressHandler;
import org.reaktivity.nukleus.kafka.internal.function.PartitionResponseConsumer;
import org.reaktivity.nukleus.kafka.internal.types.ArrayFW;
import org.reaktivity.nukleus.kafka.internal.types.Flyweight;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;
import org.reaktivity.nukleus.kafka.internal.types.Varint64FW;
import org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.ResponseHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.FetchRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.FetchResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.HeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.PartitionRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.PartitionResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.RecordBatchFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.RecordFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.RecordSetFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.TopicRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.TopicResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.control.KafkaRouteExFW;
import org.reaktivity.nukleus.kafka.internal.types.control.RouteFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.AbortFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.DataFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.EndFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaBeginExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaDataExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.WindowFW;
import org.reaktivity.nukleus.route.RouteManager;
import org.reaktivity.nukleus.stream.StreamFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:57)
    */
/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/ClientStreamFactory.class */
public final class ClientStreamFactory implements StreamFactory {
    private static final short FETCH_API_VERSION = 5;
    private static final short FETCH_API_KEY = 1;
    private final RouteManager router;
    private final LongSupplier supplyStreamId;
    private final LongSupplier supplyCorrelationId;
    private final BufferPool bufferPool;
    private final MutableDirectBuffer writeBuffer;
    private final MutableDirectBuffer encodeBuffer;
    private final Long2ObjectHashMap<NetworkConnectionPool.NetworkConnection> correlations;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final RouteFW routeRO = new RouteFW();
    private final BeginFW beginRO = new BeginFW();
    private final DataFW dataRO = new DataFW();
    private final EndFW endRO = new EndFW();
    private final AbortFW abortRO = new AbortFW();
    private final KafkaRouteExFW routeExRO = new KafkaRouteExFW();
    private final KafkaBeginExFW beginExRO = new KafkaBeginExFW();
    private final WindowFW windowRO = new WindowFW();
    private final ResetFW resetRO = new ResetFW();
    private final BeginFW.Builder beginRW = new BeginFW.Builder();
    private final DataFW.Builder dataRW = new DataFW.Builder();
    private final EndFW.Builder endRW = new EndFW.Builder();
    private final AbortFW.Builder abortRW = new AbortFW.Builder();
    private final KafkaDataExFW.Builder dataExRW = new KafkaDataExFW.Builder();
    private final WindowFW.Builder windowRW = new WindowFW.Builder();
    private final ResetFW.Builder resetRW = new ResetFW.Builder();
    private final RequestHeaderFW.Builder requestRW = new RequestHeaderFW.Builder();
    private final FetchRequestFW.Builder fetchRequestRW = new FetchRequestFW.Builder();
    private final TopicRequestFW.Builder topicRequestRW = new TopicRequestFW.Builder();
    private final PartitionRequestFW.Builder partitionRequestRW = new PartitionRequestFW.Builder();
    private final OctetsFW.Builder payloadRW = new OctetsFW.Builder();
    private final ResponseHeaderFW responseRO = new ResponseHeaderFW();
    private final FetchResponseFW fetchResponseRO = new FetchResponseFW();
    private final TopicResponseFW topicResponseRO = new TopicResponseFW();
    private final PartitionResponseFW partitionResponseRO = new PartitionResponseFW();
    private final RecordSetFW recordSetRO = new RecordSetFW();
    private final RecordBatchFW recordBatchRO = new RecordBatchFW();
    private final RecordFW recordRO = new RecordFW();
    private final Map<String, Long2ObjectHashMap<NetworkConnectionPool>> connectionPools = new LinkedHashMap();

    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/ClientStreamFactory$ClientAcceptStream.class */
    public final class ClientAcceptStream {
        private final MessageConsumer applicationThrottle;
        private final long applicationId;
        private final NetworkConnectionPool networkPool;
        private final Long2LongHashMap fetchOffsets;
        private MessageConsumer applicationReply;
        private long applicationReplyId;
        private int applicationReplyBudget;
        private int applicationReplyPadding;
        private long networkAttachId;
        private MessageConsumer streamState;

        private ClientAcceptStream(MessageConsumer messageConsumer, long j, NetworkConnectionPool networkConnectionPool) {
            this.applicationThrottle = messageConsumer;
            this.applicationId = j;
            this.networkPool = networkConnectionPool;
            this.fetchOffsets = new Long2LongHashMap(-1L);
            this.streamState = this::beforeBegin;
        }

        public void handleStream(int i, DirectBuffer directBuffer, int i2, int i3) {
            this.streamState.accept(i, directBuffer, i2, i3);
        }

        private void beforeBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
            if (i == 1) {
                handleBegin(ClientStreamFactory.this.beginRO.wrap(directBuffer, i2, i2 + i3));
            } else {
                ClientStreamFactory.this.doReset(this.applicationThrottle, this.applicationId);
            }
        }

        private void afterBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 2:
                    ClientStreamFactory.this.doReset(this.applicationThrottle, this.applicationId);
                    this.networkPool.doDetach(this.networkAttachId, this.fetchOffsets);
                    return;
                case EndFW.TYPE_ID /* 3 */:
                    return;
                case 4:
                    ClientStreamFactory.this.doAbort(this.applicationReply, this.applicationReplyId);
                    this.networkPool.doDetach(this.networkAttachId, this.fetchOffsets);
                    return;
                default:
                    ClientStreamFactory.this.doReset(this.applicationThrottle, this.applicationId);
                    return;
            }
        }

        private void handleBegin(BeginFW beginFW) {
            String asString = beginFW.source().asString();
            long correlationId = beginFW.correlationId();
            OctetsFW extension = beginFW.extension();
            if (extension.sizeof() == 0) {
                ClientStreamFactory.this.doReset(this.applicationThrottle, this.applicationId);
                return;
            }
            KafkaBeginExFW kafkaBeginExFW = ClientStreamFactory.this.beginExRO;
            Objects.requireNonNull(kafkaBeginExFW);
            KafkaBeginExFW kafkaBeginExFW2 = (KafkaBeginExFW) extension.get(kafkaBeginExFW::wrap);
            long asLong = ClientStreamFactory.this.supplyStreamId.getAsLong();
            MessageConsumer supplyTarget = ClientStreamFactory.this.router.supplyTarget(asString);
            String asString2 = kafkaBeginExFW2.topicName().asString();
            ArrayFW<Varint64FW> fetchOffsets = kafkaBeginExFW2.fetchOffsets();
            this.fetchOffsets.clear();
            fetchOffsets.forEach(varint64FW -> {
                this.fetchOffsets.put(this.fetchOffsets.size(), varint64FW.value());
            });
            long doAttach = this.networkPool.doAttach(asString2, this.fetchOffsets, this::onPartitionResponse, this::writeableBytes);
            ClientStreamFactory.this.doKafkaBegin(supplyTarget, asLong, 0L, correlationId, extension);
            ClientStreamFactory.this.router.setThrottle(asString, asLong, this::handleThrottle);
            ClientStreamFactory.this.doWindow(this.applicationThrottle, this.applicationId, 0, 0);
            this.networkAttachId = doAttach;
            this.streamState = this::afterBegin;
            this.applicationReply = supplyTarget;
            this.applicationReplyId = asLong;
        }

        private void handleThrottle(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1073741825:
                    handleReset(ClientStreamFactory.this.resetRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741826:
                    handleWindow(ClientStreamFactory.this.windowRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                default:
                    return;
            }
        }

        private void handleWindow(WindowFW windowFW) {
            this.applicationReplyBudget += windowFW.credit();
            this.applicationReplyPadding = windowFW.padding();
            this.networkPool.doFlush(this.networkAttachId);
        }

        private void handleReset(ResetFW resetFW) {
            this.networkPool.doDetach(this.networkAttachId, this.fetchOffsets);
            ClientStreamFactory.this.doReset(this.applicationThrottle, this.applicationId);
        }

        private void onPartitionResponse(DirectBuffer directBuffer, int i, int i2, PartitionProgressHandler partitionProgressHandler) {
            int i3 = i + i2;
            PartitionResponseFW wrap = ClientStreamFactory.this.partitionResponseRO.wrap(directBuffer, i, i3);
            int partitionId = wrap.partitionId();
            RecordBatchFW wrap2 = ClientStreamFactory.this.recordBatchRO.wrap(directBuffer, wrap.limit() + 4, i3);
            long firstOffset = wrap2.firstOffset();
            long recordCount = firstOffset + wrap2.recordCount();
            long j = this.fetchOffsets.get(partitionId);
            int limit = wrap2.limit();
            while (firstOffset < recordCount && limit < i3) {
                RecordFW wrap3 = ClientStreamFactory.this.recordRO.wrap(directBuffer, limit, i3);
                int headerCount = wrap3.headerCount();
                int limit2 = wrap3.limit() + 4;
                for (int i4 = 0; i4 < headerCount; i4++) {
                    limit2 = new HeaderFW().wrap(directBuffer, limit2, i3).limit();
                }
                limit = limit2;
                firstOffset++;
                if (firstOffset > j) {
                    OctetsFW value = wrap3.value();
                    if (this.applicationReplyBudget < value.sizeof() + this.applicationReplyPadding) {
                        break;
                    }
                    this.fetchOffsets.put(partitionId, firstOffset);
                    ClientStreamFactory.this.doKafkaData(this.applicationReply, this.applicationReplyId, value, this.fetchOffsets.values().iterator());
                    this.applicationReplyBudget -= value.sizeof() + this.applicationReplyPadding;
                }
            }
            partitionProgressHandler.handle(partitionId, j, firstOffset);
        }

        private int writeableBytes() {
            return this.applicationReplyBudget - this.applicationReplyPadding;
        }

        /* synthetic */ ClientAcceptStream(ClientStreamFactory clientStreamFactory, MessageConsumer messageConsumer, long j, NetworkConnectionPool networkConnectionPool, AnonymousClass1 anonymousClass1) {
            this(messageConsumer, j, networkConnectionPool);
        }
    }

    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/ClientStreamFactory$NetworkConnectionPool.class */
    public final class NetworkConnectionPool {
        private final String networkName;
        private final long networkRef;
        private final NetworkConnection connection;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/ClientStreamFactory$NetworkConnectionPool$NetworkConnection.class */
        public final class NetworkConnection {
            private final MessageConsumer networkTarget;
            private final Map<String, NetworkTopic> topicsByName;
            private final Int2ObjectHashMap<Consumer<Long2LongHashMap>> detachersById;
            private int nextAttachId;
            private int nextRequestId;
            private int nextResponseId;
            private long networkId;
            private int networkRequestBudget;
            private int networkRequestPadding;
            private int networkResponseBudget;
            private int networkResponsePadding;
            private MessageConsumer networkReplyThrottle;
            private long networkReplyId;
            private MessageConsumer streamState;
            private int networkSlot;
            private int networkSlotOffset;
            static final /* synthetic */ boolean $assertionsDisabled;

            private NetworkConnection() {
                this.networkSlot = -1;
                this.networkTarget = ClientStreamFactory.this.router.supplyTarget(NetworkConnectionPool.this.networkName);
                this.topicsByName = new LinkedHashMap();
                this.detachersById = new Int2ObjectHashMap<>();
            }

            public String toString() {
                return String.format("[budget=%d, paddng=%d]", Integer.valueOf(this.networkRequestBudget), Integer.valueOf(this.networkRequestPadding));
            }

            public MessageConsumer onCorrelated(MessageConsumer messageConsumer, long j) {
                this.networkReplyThrottle = messageConsumer;
                this.networkReplyId = j;
                this.streamState = this::beforeBegin;
                return this::handleStream;
            }

            public int doAttach(String str, Long2LongHashMap long2LongHashMap, PartitionResponseConsumer partitionResponseConsumer, IntSupplier intSupplier) {
                NetworkTopic computeIfAbsent = this.topicsByName.computeIfAbsent(str, str2 -> {
                    return new NetworkTopic(str2);
                });
                computeIfAbsent.doAttach(long2LongHashMap, partitionResponseConsumer, intSupplier);
                int i = this.nextAttachId;
                this.nextAttachId = i + 1;
                this.detachersById.put(i, long2LongHashMap2 -> {
                    computeIfAbsent.doDetach(long2LongHashMap2, partitionResponseConsumer, intSupplier);
                });
                doFetchIfNotInFlight();
                return i;
            }

            public void doFlush() {
                doFetchIfNotInFlight();
            }

            public void doDetach(int i, Long2LongHashMap long2LongHashMap) {
                Consumer consumer = (Consumer) this.detachersById.remove(i);
                if (consumer != null) {
                    consumer.accept(long2LongHashMap);
                }
            }

            /* JADX WARN: Type inference failed for: r0v11, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
            /* JADX WARN: Type inference failed for: r0v114, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.TopicRequestFW$Builder] */
            /* JADX WARN: Type inference failed for: r0v126, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.PartitionRequestFW$Builder] */
            /* JADX WARN: Type inference failed for: r0v24, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.FetchRequestFW$Builder] */
            /* JADX WARN: Type inference failed for: r0v45, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.FetchRequestFW$Builder] */
            /* JADX WARN: Type inference failed for: r0v62, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
            /* JADX WARN: Type inference failed for: r0v73, types: [org.reaktivity.nukleus.kafka.internal.types.OctetsFW$Builder] */
            /* JADX WARN: Type inference failed for: r0v88, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.TopicRequestFW$Builder] */
            private void doFetchIfNotInFlight() {
                if (this.nextRequestId == this.nextResponseId) {
                    doBeginIfNotConnected();
                    if (this.networkRequestBudget > this.networkRequestPadding) {
                        RequestHeaderFW build = ClientStreamFactory.this.requestRW.wrap2(ClientStreamFactory.this.encodeBuffer, 512, ClientStreamFactory.this.encodeBuffer.capacity()).size(0).apiKey((short) 1).apiVersion((short) 5).correlationId(0).clientId((String) null).build();
                        FetchRequestFW build2 = ClientStreamFactory.this.fetchRequestRW.wrap2(ClientStreamFactory.this.encodeBuffer, build.limit(), ClientStreamFactory.this.encodeBuffer.capacity()).maxWaitTimeMillis(500).minBytes(1).maxBytes(0).isolationLevel((byte) 0).topicCount(0).build();
                        int limit = build2.limit();
                        int i = 0;
                        int i2 = 0;
                        for (String str : this.topicsByName.keySet()) {
                            TopicRequestFW build3 = ClientStreamFactory.this.topicRequestRW.wrap2(ClientStreamFactory.this.encodeBuffer, limit, ClientStreamFactory.this.encodeBuffer.capacity()).name(str).partitionCount(0).build();
                            limit = build3.limit() + 4;
                            NetworkTopic networkTopic = this.topicsByName.get(str);
                            int maximumWritableBytes = networkTopic.maximumWritableBytes();
                            i += maximumWritableBytes;
                            int i3 = 0;
                            int i4 = -1;
                            for (NetworkTopicPartition networkTopicPartition : networkTopic.partitions) {
                                if (i4 < networkTopicPartition.id) {
                                    limit = ClientStreamFactory.this.partitionRequestRW.wrap2(ClientStreamFactory.this.encodeBuffer, limit, ClientStreamFactory.this.encodeBuffer.capacity()).partitionId(networkTopicPartition.id).fetchOffset(networkTopicPartition.offset).maxBytes(maximumWritableBytes).build().limit();
                                    i4 = networkTopicPartition.id;
                                    i3++;
                                }
                            }
                            ClientStreamFactory.this.topicRequestRW.wrap2(ClientStreamFactory.this.encodeBuffer, build3.offset(), build3.limit() + 4).name(build3.name()).partitionCount(i3).build();
                            i2++;
                        }
                        ClientStreamFactory.this.fetchRequestRW.wrap2(ClientStreamFactory.this.encodeBuffer, build2.offset(), build2.limit()).maxWaitTimeMillis(build2.maxWaitTimeMillis()).minBytes(build2.minBytes()).maxBytes(i).isolationLevel(build2.isolationLevel()).topicCount(i2).build();
                        if (i <= 0 || (limit - 512) + this.networkRequestPadding > this.networkRequestBudget) {
                            return;
                        }
                        int i5 = this.nextRequestId;
                        this.nextRequestId = i5 + 1;
                        ClientStreamFactory.this.requestRW.wrap2(ClientStreamFactory.this.encodeBuffer, build.offset(), build.limit()).size((limit - 512) - 4).apiKey((short) 1).apiVersion((short) 5).correlationId(i5).clientId((String) null).build();
                        OctetsFW build4 = ClientStreamFactory.this.payloadRW.wrap2(ClientStreamFactory.this.encodeBuffer, 512, limit).set((mutableDirectBuffer, i6, i7) -> {
                            return i7 - i6;
                        }).build();
                        ClientStreamFactory.this.doData(this.networkTarget, this.networkId, build4);
                        this.networkRequestBudget -= build4.sizeof() + this.networkRequestPadding;
                    }
                }
            }

            private void doBeginIfNotConnected() {
                if (this.networkId == 0) {
                    long asLong = ClientStreamFactory.this.supplyStreamId.getAsLong();
                    long asLong2 = ClientStreamFactory.this.supplyCorrelationId.getAsLong();
                    ClientStreamFactory.this.correlations.put(asLong2, this);
                    ClientStreamFactory.this.doBegin(this.networkTarget, asLong, NetworkConnectionPool.this.networkRef, asLong2);
                    ClientStreamFactory.this.router.setThrottle(NetworkConnectionPool.this.networkName, asLong, this::handleThrottle);
                    this.networkId = asLong;
                }
            }

            private void handleThrottle(int i, DirectBuffer directBuffer, int i2, int i3) {
                switch (i) {
                    case 1073741825:
                        handleReset(ClientStreamFactory.this.resetRO.wrap(directBuffer, i2, i2 + i3));
                        return;
                    case 1073741826:
                        handleWindow(ClientStreamFactory.this.windowRO.wrap(directBuffer, i2, i2 + i3));
                        return;
                    default:
                        return;
                }
            }

            private void handleWindow(WindowFW windowFW) {
                int credit = windowFW.credit();
                int padding = windowFW.padding();
                this.networkRequestBudget += credit;
                this.networkRequestPadding = padding;
                doFetchIfNotInFlight();
            }

            private void handleReset(ResetFW resetFW) {
                doReinitialize();
                doFetchIfNotInFlight();
            }

            private void handleStream(int i, DirectBuffer directBuffer, int i2, int i3) {
                this.streamState.accept(i, directBuffer, i2, i3);
            }

            private void beforeBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
                if (i == 1) {
                    handleBegin(ClientStreamFactory.this.beginRO.wrap(directBuffer, i2, i2 + i3));
                } else {
                    ClientStreamFactory.this.doReset(this.networkReplyThrottle, this.networkReplyId);
                }
            }

            private void afterBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
                switch (i) {
                    case 2:
                        handleData(ClientStreamFactory.this.dataRO.wrap(directBuffer, i2, i2 + i3));
                        return;
                    case EndFW.TYPE_ID /* 3 */:
                        handleEnd(ClientStreamFactory.this.endRO.wrap(directBuffer, i2, i2 + i3));
                        return;
                    case 4:
                        handleAbort(ClientStreamFactory.this.abortRO.wrap(directBuffer, i2, i2 + i3));
                        return;
                    default:
                        ClientStreamFactory.this.doReset(this.networkReplyThrottle, this.networkReplyId);
                        return;
                }
            }

            private void handleBegin(BeginFW beginFW) {
                doOfferResponseBudget();
                this.streamState = this::afterBegin;
            }

            private void handleData(DataFW dataFW) {
                OctetsFW payload = dataFW.payload();
                this.networkResponseBudget -= payload.sizeof() + this.networkResponsePadding;
                if (this.networkResponseBudget < 0) {
                    ClientStreamFactory.this.doReset(this.networkReplyThrottle, this.networkReplyId);
                    return;
                }
                try {
                    DirectBuffer buffer = payload.buffer();
                    int offset = payload.offset();
                    int limit = payload.limit();
                    if (this.networkSlot != -1) {
                        DirectBuffer buffer2 = ClientStreamFactory.this.bufferPool.buffer(this.networkSlot);
                        int i = limit - offset;
                        buffer2.putBytes(this.networkSlotOffset, buffer, offset, i);
                        this.networkSlotOffset += i;
                        buffer = buffer2;
                        offset = 0;
                        limit = this.networkSlotOffset;
                    }
                    ResponseHeaderFW responseHeaderFW = null;
                    if (offset + 4 + 4 <= limit) {
                        responseHeaderFW = ClientStreamFactory.this.responseRO.wrap(buffer, offset, limit);
                        offset = responseHeaderFW.limit();
                    }
                    if (responseHeaderFW != null && offset + responseHeaderFW.size() <= limit) {
                        handleFetchResponse(buffer, offset, limit);
                        int size = offset + responseHeaderFW.size();
                        this.nextResponseId++;
                        if (size < limit) {
                            if (this.networkSlot == -1) {
                                this.networkSlot = ClientStreamFactory.this.bufferPool.acquire(this.networkReplyId);
                            }
                            ClientStreamFactory.this.bufferPool.buffer(this.networkSlot).putBytes(0, buffer, size, limit - size);
                            this.networkSlotOffset = limit - size;
                        } else {
                            if (!$assertionsDisabled && size != limit) {
                                throw new AssertionError();
                            }
                            this.networkSlotOffset = 0;
                        }
                        doOfferResponseBudget();
                        doFetchIfNotInFlight();
                    } else if (this.networkSlot == -1) {
                        this.networkSlot = ClientStreamFactory.this.bufferPool.acquire(this.networkReplyId);
                        ClientStreamFactory.this.bufferPool.buffer(this.networkSlot).putBytes(this.networkSlotOffset, payload.buffer(), payload.offset(), payload.sizeof());
                        this.networkSlotOffset += payload.sizeof();
                    }
                    if (this.networkSlotOffset != 0 || this.networkSlot == -1) {
                        return;
                    }
                    ClientStreamFactory.this.bufferPool.release(this.networkSlot);
                    this.networkSlot = -1;
                } catch (Throwable th) {
                    if (this.networkSlotOffset == 0 && this.networkSlot != -1) {
                        ClientStreamFactory.this.bufferPool.release(this.networkSlot);
                        this.networkSlot = -1;
                    }
                    throw th;
                }
            }

            private void handleFetchResponse(DirectBuffer directBuffer, int i, int i2) {
                FetchResponseFW wrap = ClientStreamFactory.this.fetchResponseRO.wrap(directBuffer, i, i2);
                int i3 = wrap.topicCount();
                int limit = wrap.limit();
                for (int i4 = 0; i4 < i3; i4++) {
                    TopicResponseFW wrap2 = ClientStreamFactory.this.topicResponseRO.wrap(directBuffer, limit, i2);
                    String asString = wrap2.name().asString();
                    int partitionCount = wrap2.partitionCount();
                    limit = wrap2.limit() + 4;
                    NetworkTopic networkTopic = this.topicsByName.get(asString);
                    for (int i5 = 0; i5 < partitionCount; i5++) {
                        PartitionResponseFW wrap3 = ClientStreamFactory.this.partitionResponseRO.wrap(directBuffer, limit, i2);
                        RecordSetFW wrap4 = ClientStreamFactory.this.recordSetRO.wrap(directBuffer, wrap3.limit(), i2);
                        limit = wrap4.limit() + wrap4.recordBatchSize();
                        if (networkTopic != null) {
                            networkTopic.onPartitionResponse(wrap3.buffer(), wrap3.offset(), limit);
                        }
                    }
                }
            }

            private void handleEnd(EndFW endFW) {
                doReinitialize();
                doFetchIfNotInFlight();
            }

            private void handleAbort(AbortFW abortFW) {
                doReinitialize();
                doFetchIfNotInFlight();
            }

            private void doReinitialize() {
                this.networkRequestBudget = 0;
                this.networkRequestPadding = 0;
                this.networkId = 0L;
                this.nextRequestId = 0;
                this.nextResponseId = 0;
            }

            private void doOfferResponseBudget() {
                int max = Math.max((ClientStreamFactory.this.bufferPool.slotCapacity() - this.networkSlotOffset) - this.networkResponseBudget, 0);
                ClientStreamFactory.this.doWindow(this.networkReplyThrottle, this.networkReplyId, max, this.networkResponsePadding);
                this.networkResponseBudget += max;
            }

            /* synthetic */ NetworkConnection(NetworkConnectionPool networkConnectionPool, AnonymousClass1 anonymousClass1) {
                this();
            }

            static {
                $assertionsDisabled = !ClientStreamFactory.class.desiredAssertionStatus();
            }
        }

        private NetworkConnectionPool(String str, long j) {
            this.networkName = str;
            this.networkRef = j;
            this.connection = new NetworkConnection();
        }

        public long doAttach(String str, Long2LongHashMap long2LongHashMap, PartitionResponseConsumer partitionResponseConsumer, IntSupplier intSupplier) {
            return 0 | this.connection.doAttach(str, long2LongHashMap, partitionResponseConsumer, intSupplier);
        }

        public void doFlush(long j) {
            int i = (int) ((j >> 32) & (-1));
            if (!$assertionsDisabled && i != 0) {
                throw new AssertionError();
            }
            this.connection.doFlush();
        }

        public void doDetach(long j, Long2LongHashMap long2LongHashMap) {
            int i = (int) ((j >> 32) & (-1));
            int i2 = (int) (j & (-1));
            if (!$assertionsDisabled && i != 0) {
                throw new AssertionError();
            }
            this.connection.doDetach(i2, long2LongHashMap);
        }

        /* synthetic */ NetworkConnectionPool(ClientStreamFactory clientStreamFactory, String str, long j, AnonymousClass1 anonymousClass1) {
            this(str, j);
        }

        static {
            $assertionsDisabled = !ClientStreamFactory.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/ClientStreamFactory$NetworkTopic.class */
    public final class NetworkTopic {
        private final String topicName;
        private final Set<PartitionResponseConsumer> recordConsumers;
        private final Set<IntSupplier> windowSuppliers;
        private final NavigableSet<NetworkTopicPartition> partitions;
        private final NetworkTopicPartition candidate;
        private final PartitionProgressHandler progressHandler;
        static final /* synthetic */ boolean $assertionsDisabled;

        public String toString() {
            return String.format("topicName=%s, partitions=%s", this.topicName, this.partitions);
        }

        private NetworkTopic(String str) {
            this.topicName = str;
            this.recordConsumers = new HashSet();
            this.windowSuppliers = new HashSet();
            this.partitions = new TreeSet();
            this.candidate = new NetworkTopicPartition();
            this.progressHandler = this::handleProgress;
        }

        public void doAttach(Long2LongHashMap long2LongHashMap, PartitionResponseConsumer partitionResponseConsumer, IntSupplier intSupplier) {
            this.recordConsumers.add(partitionResponseConsumer);
            this.windowSuppliers.add(intSupplier);
            this.candidate.id = -1;
            Long2LongHashMap.LongIterator it = long2LongHashMap.values().iterator();
            while (it.hasNext()) {
                NetworkTopicPartition.access$2008(this.candidate);
                NetworkTopicPartition.access$2102(this.candidate, it.nextValue());
                NetworkTopicPartition floor = this.partitions.floor(this.candidate);
                if (floor == null || floor.id != this.candidate.id) {
                    floor = new NetworkTopicPartition();
                    floor.id = this.candidate.id;
                    NetworkTopicPartition.access$2102(floor, this.candidate.offset);
                    this.partitions.add(floor);
                }
                NetworkTopicPartition.access$2208(floor);
            }
        }

        public void doDetach(Long2LongHashMap long2LongHashMap, PartitionResponseConsumer partitionResponseConsumer, IntSupplier intSupplier) {
            this.recordConsumers.remove(partitionResponseConsumer);
            this.windowSuppliers.remove(intSupplier);
            this.candidate.id = -1;
            Long2LongHashMap.LongIterator it = long2LongHashMap.values().iterator();
            while (it.hasNext()) {
                NetworkTopicPartition.access$2008(this.candidate);
                NetworkTopicPartition.access$2102(this.candidate, it.nextValue());
                NetworkTopicPartition floor = this.partitions.floor(this.candidate);
                if (floor != null) {
                    NetworkTopicPartition.access$2210(floor);
                    if (floor.refs == 0) {
                        this.partitions.remove(floor);
                    }
                }
            }
        }

        public void onPartitionResponse(DirectBuffer directBuffer, int i, int i2) {
            Iterator<PartitionResponseConsumer> it = this.recordConsumers.iterator();
            while (it.hasNext()) {
                it.next().accept(directBuffer, i, i + i2, this.progressHandler);
            }
            this.partitions.removeIf(networkTopicPartition -> {
                return networkTopicPartition.refs == 0;
            });
        }

        public int maximumWritableBytes() {
            int i = 0;
            Iterator<IntSupplier> it = this.windowSuppliers.iterator();
            while (it.hasNext()) {
                i = Math.max(i, it.next().getAsInt());
            }
            return i;
        }

        private void handleProgress(int i, long j, long j2) {
            this.candidate.id = i;
            NetworkTopicPartition.access$2102(this.candidate, j);
            NetworkTopicPartition floor = this.partitions.floor(this.candidate);
            if (!$assertionsDisabled && floor == null) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && floor.offset != j) {
                throw new AssertionError();
            }
            NetworkTopicPartition.access$2210(floor);
            NetworkTopicPartition.access$2102(this.candidate, j2);
            NetworkTopicPartition floor2 = this.partitions.floor(this.candidate);
            if (floor2 == null || floor2.offset != j2) {
                floor2 = new NetworkTopicPartition();
                floor2.id = i;
                NetworkTopicPartition.access$2102(floor2, j2);
                this.partitions.add(floor2);
            }
            NetworkTopicPartition.access$2208(floor2);
        }

        /* synthetic */ NetworkTopic(ClientStreamFactory clientStreamFactory, String str, AnonymousClass1 anonymousClass1) {
            this(str);
        }

        static {
            $assertionsDisabled = !ClientStreamFactory.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/ClientStreamFactory$NetworkTopicPartition.class */
    public static final class NetworkTopicPartition implements Comparable<NetworkTopicPartition> {
        private int id;
        private long offset;
        private int refs;

        private NetworkTopicPartition() {
        }

        @Override // java.lang.Comparable
        public int compareTo(NetworkTopicPartition networkTopicPartition) {
            int i = this.id - networkTopicPartition.id;
            if (i == 0) {
                i = (int) (((this.offset - networkTopicPartition.offset) >> 32) & (-1));
            }
            if (i == 0) {
                i = (int) ((this.offset - networkTopicPartition.offset) & (-1));
            }
            return i;
        }

        public String toString() {
            return String.format("id=%d, offset=%d, refs=%d", Integer.valueOf(this.id), Long.valueOf(this.offset), Integer.valueOf(this.refs));
        }

        /* synthetic */ NetworkTopicPartition(AnonymousClass1 anonymousClass1) {
            this();
        }

        static /* synthetic */ int access$2008(NetworkTopicPartition networkTopicPartition) {
            int i = networkTopicPartition.id;
            networkTopicPartition.id = i + 1;
            return i;
        }

        /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.reaktivity.nukleus.kafka.internal.stream.ClientStreamFactory.NetworkTopicPartition.access$2102(org.reaktivity.nukleus.kafka.internal.stream.ClientStreamFactory$NetworkTopicPartition, long):long
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        static /* synthetic */ long access$2102(org.reaktivity.nukleus.kafka.internal.stream.ClientStreamFactory.NetworkTopicPartition r6, long r7) {
            /*
                r0 = r6
                r1 = r7
                // decode failed: arraycopy: source index -1 out of bounds for object array[6]
                r0.offset = r1
                return r-1
            */
            throw new UnsupportedOperationException("Method not decompiled: org.reaktivity.nukleus.kafka.internal.stream.ClientStreamFactory.NetworkTopicPartition.access$2102(org.reaktivity.nukleus.kafka.internal.stream.ClientStreamFactory$NetworkTopicPartition, long):long");
        }

        static /* synthetic */ int access$2208(NetworkTopicPartition networkTopicPartition) {
            int i = networkTopicPartition.refs;
            networkTopicPartition.refs = i + 1;
            return i;
        }

        static /* synthetic */ int access$2210(NetworkTopicPartition networkTopicPartition) {
            int i = networkTopicPartition.refs;
            networkTopicPartition.refs = i - 1;
            return i;
        }
    }

    public ClientStreamFactory(KafkaConfiguration kafkaConfiguration, RouteManager routeManager, MutableDirectBuffer mutableDirectBuffer, BufferPool bufferPool, LongSupplier longSupplier, LongSupplier longSupplier2, Long2ObjectHashMap<NetworkConnectionPool.NetworkConnection> long2ObjectHashMap) {
        this.router = (RouteManager) Objects.requireNonNull(routeManager);
        this.writeBuffer = (MutableDirectBuffer) Objects.requireNonNull(mutableDirectBuffer);
        this.bufferPool = (BufferPool) Objects.requireNonNull(bufferPool);
        this.supplyStreamId = (LongSupplier) Objects.requireNonNull(longSupplier);
        this.supplyCorrelationId = longSupplier2;
        this.correlations = (Long2ObjectHashMap) Objects.requireNonNull(long2ObjectHashMap);
        this.encodeBuffer = new UnsafeBuffer(new byte[bufferPool.slotCapacity()]);
    }

    public MessageConsumer newStream(int i, DirectBuffer directBuffer, int i2, int i3, MessageConsumer messageConsumer) {
        BeginFW wrap = this.beginRO.wrap(directBuffer, i2, i2 + i3);
        return wrap.sourceRef() == 0 ? newConnectReplyStream(wrap, messageConsumer) : newAcceptStream(wrap, messageConsumer);
    }

    private MessageConsumer newAcceptStream(BeginFW beginFW, MessageConsumer messageConsumer) {
        long authorization = beginFW.authorization();
        long sourceRef = beginFW.sourceRef();
        OctetsFW extension = this.beginRO.extension();
        MessageConsumer messageConsumer2 = null;
        if (extension.sizeof() > 0) {
            KafkaBeginExFW kafkaBeginExFW = this.beginExRO;
            Objects.requireNonNull(kafkaBeginExFW);
            RouteFW resolveRoute = resolveRoute(authorization, sourceRef, ((KafkaBeginExFW) extension.get(kafkaBeginExFW::wrap)).topicName().asString());
            if (resolveRoute != null) {
                long streamId = beginFW.streamId();
                String asString = resolveRoute.target().asString();
                ClientAcceptStream clientAcceptStream = new ClientAcceptStream(messageConsumer, streamId, (NetworkConnectionPool) this.connectionPools.computeIfAbsent(asString, this::newConnectionPoolsByRef).computeIfAbsent(resolveRoute.targetRef(), j -> {
                    return new NetworkConnectionPool(asString, j);
                }));
                messageConsumer2 = (i, directBuffer, i2, i3) -> {
                    clientAcceptStream.handleStream(i, directBuffer, i2, i3);
                };
            }
        }
        return messageConsumer2;
    }

    private Long2ObjectHashMap<NetworkConnectionPool> newConnectionPoolsByRef(String str) {
        return new Long2ObjectHashMap<>();
    }

    private MessageConsumer newConnectReplyStream(BeginFW beginFW, MessageConsumer messageConsumer) {
        NetworkConnectionPool.NetworkConnection networkConnection;
        long streamId = beginFW.streamId();
        long sourceRef = beginFW.sourceRef();
        long correlationId = beginFW.correlationId();
        MessageConsumer messageConsumer2 = null;
        if (sourceRef == 0 && (networkConnection = (NetworkConnectionPool.NetworkConnection) this.correlations.remove(correlationId)) != null) {
            messageConsumer2 = networkConnection.onCorrelated(messageConsumer, streamId);
        }
        return messageConsumer2;
    }

    private RouteFW resolveRoute(long j, long j2, String str) {
        return (RouteFW) this.router.resolve(j, (i, directBuffer, i2, i3) -> {
            RouteFW wrap = this.routeRO.wrap(directBuffer, i2, i3);
            OctetsFW extension = wrap.extension();
            Predicate predicate = str2 -> {
                return true;
            };
            if (extension.sizeof() > 0) {
                KafkaRouteExFW kafkaRouteExFW = this.routeExRO;
                Objects.requireNonNull(kafkaRouteExFW);
                String asString = ((KafkaRouteExFW) extension.get(kafkaRouteExFW::wrap)).topicName().asString();
                Objects.requireNonNull(asString);
                predicate = (v1) -> {
                    return r0.equals(v1);
                };
            }
            return wrap.sourceRef() == j2 && predicate.test(str);
        }, this::wrapRoute);
    }

    private RouteFW wrapRoute(int i, DirectBuffer directBuffer, int i2, int i3) {
        if ($assertionsDisabled || i == 1) {
            return this.routeRO.wrap(directBuffer, i2, i2 + i3);
        }
        throw new AssertionError();
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW$Builder] */
    public void doBegin(MessageConsumer messageConsumer, long j, long j2, long j3) {
        BeginFW build = this.beginRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(j).source("kafka").sourceRef(j2).correlationId(j3).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.DataFW$Builder] */
    public void doData(MessageConsumer messageConsumer, long j, OctetsFW octetsFW) {
        DataFW build = this.dataRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(j).payload(builder -> {
            builder.set(octetsFW.buffer(), octetsFW.offset(), octetsFW.sizeof());
        }).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.EndFW$Builder] */
    private void doEnd(MessageConsumer messageConsumer, long j) {
        EndFW build = this.endRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(j).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.AbortFW$Builder] */
    public void doAbort(MessageConsumer messageConsumer, long j) {
        AbortFW build = this.abortRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(j).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.WindowFW$Builder] */
    public void doWindow(MessageConsumer messageConsumer, long j, int i, int i2) {
        WindowFW build = this.windowRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(j).credit(i).padding(i2).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.ResetFW$Builder] */
    public void doReset(MessageConsumer messageConsumer, long j) {
        ResetFW build = this.resetRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(j).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW$Builder] */
    public void doKafkaBegin(MessageConsumer messageConsumer, long j, long j2, long j3, OctetsFW octetsFW) {
        BeginFW build = this.beginRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(j).source("kafka").sourceRef(j2).correlationId(j3).extension(builder -> {
            builder.set(octetsFW);
        }).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.DataFW$Builder] */
    public void doKafkaData(MessageConsumer messageConsumer, long j, OctetsFW octetsFW, Long2LongHashMap.LongIterator longIterator) {
        DataFW build = this.dataRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(j).payload(builder -> {
            builder.set(octetsFW.buffer(), octetsFW.offset(), octetsFW.sizeof());
        }).extension(builder2 -> {
            builder2.set(visitKafkaDataEx(longIterator));
        }).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    private Flyweight.Builder.Visitor visitKafkaDataEx(Long2LongHashMap.LongIterator longIterator) {
        return (mutableDirectBuffer, i, i2) -> {
            return this.dataExRW.wrap2(mutableDirectBuffer, i, i2).fetchOffsets(builder -> {
                while (longIterator.hasNext()) {
                    builder.item(builder -> {
                        builder.set(longIterator.nextValue());
                    });
                }
            }).build().sizeof();
        };
    }

    static {
        $assertionsDisabled = !ClientStreamFactory.class.desiredAssertionStatus();
    }
}
