package org.reaktivity.nukleus.kafka.internal.stream;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.IntSupplier;
import java.util.function.IntToLongFunction;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.ArrayUtil;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.IntArrayList;
import org.agrona.collections.Long2LongHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.buffer.BufferPool;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndex;
import org.reaktivity.nukleus.kafka.internal.cache.DefaultPartitionIndex;
import org.reaktivity.nukleus.kafka.internal.cache.MessageCache;
import org.reaktivity.nukleus.kafka.internal.cache.PartitionIndex;
import org.reaktivity.nukleus.kafka.internal.function.IntBooleanConsumer;
import org.reaktivity.nukleus.kafka.internal.function.IntLongConsumer;
import org.reaktivity.nukleus.kafka.internal.function.PartitionProgressHandler;
import org.reaktivity.nukleus.kafka.internal.types.Flyweight;
import org.reaktivity.nukleus.kafka.internal.types.KafkaHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.ListFW;
import org.reaktivity.nukleus.kafka.internal.types.MessageFW;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;
import org.reaktivity.nukleus.kafka.internal.types.String16FW;
import org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.ResponseHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.config.ConfigResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.config.DescribeConfigsRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.config.DescribeConfigsResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.config.ResourceRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.config.ResourceResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.FetchRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.FetchResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.PartitionRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.PartitionResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.RecordSetFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.TopicRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.TopicResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.metadata.BrokerMetadataFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.metadata.MetadataRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.metadata.MetadataResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.metadata.MetadataResponsePart2FW;
import org.reaktivity.nukleus.kafka.internal.types.codec.metadata.PartitionMetadataFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.metadata.TopicMetadataFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.offset.IsolationLevel;
import org.reaktivity.nukleus.kafka.internal.types.codec.offset.ListOffsetsPartitionRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.offset.ListOffsetsPartitionResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.offset.ListOffsetsRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.offset.ListOffsetsResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.offset.ListOffsetsTopicFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.AbortFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.DataFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.EndFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.TcpBeginExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.WindowFW;
import org.reaktivity.nukleus.kafka.internal.util.BufferUtil;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool.class */
public final class NetworkConnectionPool {
    private static final short FETCH_API_VERSION = 5;
    private static final short FETCH_API_KEY = 1;
    private static final short LIST_OFFSETS_API_KEY = 2;
    private static final short LIST_OFFSETS_API_VERSION = 2;
    private static final short METADATA_API_VERSION = 5;
    private static final short METADATA_API_KEY = 3;
    private static final short DESCRIBE_CONFIGS_API_VERSION = 0;
    private static final short DESCRIBE_CONFIGS_API_KEY = 32;
    private static final byte RESOURCE_TYPE_TOPIC = 2;
    private static final String CLEANUP_POLICY = "cleanup.policy";
    private static final String COMPACT = "compact";
    private static final int LOCAL_SLOT = -2;
    private static final int MAX_PADDING = 5120;
    private static final int KAFKA_SERVER_DEFAULT_DELETE_RETENTION_MS = 86400000;
    final MutableDirectBuffer encodeBuffer;
    private final ClientStreamFactory clientStreamFactory;
    private final String networkName;
    private final long networkRef;
    private final int fetchMaxBytes;
    private final int fetchPartitionMaxBytes;
    private final BufferPool bufferPool;
    private final MessageCache messageCache;
    private MetadataConnection metadataConnection;
    private final int maximumMessageSize;
    private int nextAttachId;
    private static final byte[] ANY_IP_ADDR = new byte[4];
    private static final PartitionIndex DEFAULT_PARTITION_INDEX = new DefaultPartitionIndex();
    private static final DecoderMessageDispatcher NOOP_DISPATCHER = new DecoderMessageDispatcher() { // from class: org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.1
        @Override // org.reaktivity.nukleus.kafka.internal.stream.DecoderMessageDispatcher
        public int dispatch(int i, long j, long j2, DirectBuffer directBuffer, HeadersFW headersFW, long j3, long j4, DirectBuffer directBuffer2) {
            return 0;
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.DecoderMessageDispatcher
        public void flush(int i, long j, long j2) {
        }
    };
    private static final MessageDispatcher MATCHING_MESSAGE_DISPATCHER = new MessageDispatcher() { // from class: org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.2
        @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
        public int dispatch(int i, long j, long j2, DirectBuffer directBuffer, Function<DirectBuffer, DirectBuffer> function, long j3, long j4, DirectBuffer directBuffer2) {
            return 1;
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
        public void flush(int i, long j, long j2) {
        }
    };
    final RequestHeaderFW.Builder requestRW = new RequestHeaderFW.Builder();
    final FetchRequestFW.Builder fetchRequestRW = new FetchRequestFW.Builder();
    final TopicRequestFW.Builder topicRequestRW = new TopicRequestFW.Builder();
    final PartitionRequestFW.Builder partitionRequestRW = new PartitionRequestFW.Builder();
    final PartitionRequestFW partitionRequestRO = new PartitionRequestFW();
    final MetadataRequestFW.Builder metadataRequestRW = new MetadataRequestFW.Builder();
    final DescribeConfigsRequestFW.Builder describeConfigsRequestRW = new DescribeConfigsRequestFW.Builder();
    final ListOffsetsRequestFW.Builder listOffsetsRequestRW = new ListOffsetsRequestFW.Builder();
    final ListOffsetsPartitionRequestFW.Builder listOffsetsPartitionRequestRW = new ListOffsetsPartitionRequestFW.Builder();
    final ListOffsetsTopicFW.Builder listOffsetsTopicRW = new ListOffsetsTopicFW.Builder();
    final ResourceRequestFW.Builder resourceRequestRW = new ResourceRequestFW.Builder();
    final String16FW.Builder configNameRW = new String16FW.Builder(ByteOrder.BIG_ENDIAN);
    final WindowFW windowRO = new WindowFW();
    final OctetsFW.Builder payloadRW = new OctetsFW.Builder();
    final TcpBeginExFW.Builder tcpBeginExRW = new TcpBeginExFW.Builder();
    final ResponseHeaderFW responseRO = new ResponseHeaderFW();
    final FetchResponseFW fetchResponseRO = new FetchResponseFW();
    final TopicResponseFW topicResponseRO = new TopicResponseFW();
    final PartitionResponseFW partitionResponseRO = new PartitionResponseFW();
    final RecordSetFW recordSetRO = new RecordSetFW();
    final MetadataResponseFW metadataResponseRO = new MetadataResponseFW();
    final BrokerMetadataFW brokerMetadataRO = new BrokerMetadataFW();
    final MetadataResponsePart2FW metadataResponsePart2RO = new MetadataResponsePart2FW();
    final TopicMetadataFW topicMetadataRO = new TopicMetadataFW();
    final PartitionMetadataFW partitionMetadataRO = new PartitionMetadataFW();
    final DescribeConfigsResponseFW describeConfigsResponseRO = new DescribeConfigsResponseFW();
    final ResourceResponseFW resourceResponseRO = new ResourceResponseFW();
    final ConfigResponseFW configResponseRO = new ConfigResponseFW();
    final ListOffsetsResponseFW listOffsetsResponseRO = new ListOffsetsResponseFW();
    final ListOffsetsTopicFW listOffsetsTopicRO = new ListOffsetsTopicFW();
    final ListOffsetsPartitionResponseFW listOffsetsPartitionResponseRO = new ListOffsetsPartitionResponseFW();
    private final UnsafeBuffer keyBuffer = new UnsafeBuffer(BufferUtil.EMPTY_BYTE_ARRAY);
    private final UnsafeBuffer valueBuffer = new UnsafeBuffer(BufferUtil.EMPTY_BYTE_ARRAY);
    private final MessageFW messageRO = new MessageFW();
    private final HeadersFW headersRO = new HeadersFW();
    private final KafkaHeadersIterator headersIterator = new KafkaHeadersIterator();
    private AbstractFetchConnection[] connections = new LiveFetchConnection[0];
    private HistoricalFetchConnection[] historicalConnections = new HistoricalFetchConnection[0];
    private final Map<String, NetworkTopic> topicsByName = new LinkedHashMap();
    private final Map<String, TopicMetadata> topicMetadataByName = new HashMap();
    private final Map<String, List<ListFW<KafkaHeaderFW>>> routeHeadersByTopic = new HashMap();
    private final Int2ObjectHashMap<Consumer<Long2LongHashMap>> detachersById = new Int2ObjectHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$AbstractFetchConnection.class */
    public abstract class AbstractFetchConnection extends AbstractNetworkConnection {
        private static final long EARLIEST_AVAILABLE_OFFSET = -2;
        final String host;
        final int port;
        final int brokerId;
        int encodeLimit;
        boolean offsetsNeeded;
        boolean offsetsRequested;
        Map<String, long[]> requestedFetchOffsetsByTopic;
        final ResponseDecoder fetchResponseDecoder;
        static final /* synthetic */ boolean $assertionsDisabled;

        private AbstractFetchConnection(BrokerMetadata brokerMetadata) {
            super();
            this.requestedFetchOffsetsByTopic = new HashMap();
            this.brokerId = brokerMetadata.nodeId;
            this.host = brokerMetadata.host;
            this.port = brokerMetadata.port;
            this.fetchResponseDecoder = new FetchResponseDecoder(this::getTopicDispatcher, this::getRequestedOffset, this::handlePartitionResponseError, this.localDecodeBuffer, NetworkConnectionPool.this.maximumMessageSize);
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        void doRequestIfNeeded() {
            if (this.nextRequestId == this.nextResponseId) {
                doBeginIfNotConnected((mutableDirectBuffer, i, i2) -> {
                    return NetworkConnectionPool.this.tcpBeginExRW.wrap2(mutableDirectBuffer, i, i2).localAddress(builder -> {
                        builder.ipv4Address(builder -> {
                            builder.put(NetworkConnectionPool.ANY_IP_ADDR);
                        });
                    }).localPort(0).remoteAddress(builder2 -> {
                        builder2.host(this.host);
                    }).remotePort(this.port).limit();
                });
                if (this.networkRequestBudget > this.networkRequestPadding) {
                    if (!this.offsetsNeeded) {
                        doFetchRequest();
                    } else {
                        doListOffsetsRequest();
                        this.offsetsRequested = true;
                    }
                }
            }
        }

        /* JADX WARN: Type inference failed for: r0v16, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.FetchRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v40, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.FetchRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v5, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v52, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v62, types: [org.reaktivity.nukleus.kafka.internal.types.OctetsFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v78, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.TopicRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v93, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.TopicRequestFW$Builder] */
        private void doFetchRequest() {
            this.encodeLimit = 0;
            RequestHeaderFW build = NetworkConnectionPool.this.requestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, this.encodeLimit, NetworkConnectionPool.this.encodeBuffer.capacity()).size(0).apiKey((short) 1).apiVersion((short) 5).correlationId(0).clientId((String) null).build();
            this.encodeLimit = build.limit();
            FetchRequestFW build2 = NetworkConnectionPool.this.fetchRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, this.encodeLimit, NetworkConnectionPool.this.encodeBuffer.capacity()).maxWaitTimeMillis(500).minBytes(1).maxBytes(NetworkConnectionPool.this.fetchMaxBytes).isolationLevel((byte) 0).topicCount(0).build();
            this.encodeLimit = build2.limit();
            int i = 0;
            for (String str : NetworkConnectionPool.this.topicsByName.keySet()) {
                int i2 = this.encodeLimit;
                TopicRequestFW build3 = NetworkConnectionPool.this.topicRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, this.encodeLimit, NetworkConnectionPool.this.encodeBuffer.capacity()).name(str).partitionCount(0).build();
                this.encodeLimit = build3.limit();
                long[] computeIfAbsent = this.requestedFetchOffsetsByTopic.computeIfAbsent(str, str2 -> {
                    return new long[((TopicMetadata) NetworkConnectionPool.this.topicMetadataByName.get(str)).partitionCount()];
                });
                int addTopicToRequest = addTopicToRequest(str, (i3, j) -> {
                    computeIfAbsent[i3] = j;
                }, i4 -> {
                    return computeIfAbsent[i4];
                });
                if (addTopicToRequest > 0) {
                    NetworkConnectionPool.this.topicRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build3.offset(), build3.limit()).name(build3.name()).partitionCount(addTopicToRequest).build();
                    i++;
                } else {
                    this.encodeLimit = i2;
                }
            }
            if (i <= 0 || (this.encodeLimit - 0) + this.networkRequestPadding > this.networkRequestBudget) {
                return;
            }
            NetworkConnectionPool.this.fetchRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build2.offset(), build2.limit()).maxWaitTimeMillis(build2.maxWaitTimeMillis()).minBytes(build2.minBytes()).maxBytes(build2.maxBytes()).isolationLevel(build2.isolationLevel()).topicCount(i).build();
            int i5 = this.nextRequestId;
            this.nextRequestId = i5 + 1;
            NetworkConnectionPool.this.requestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build.offset(), build.limit()).size((this.encodeLimit - 0) - 4).apiKey((short) 1).apiVersion((short) 5).correlationId(i5).clientId((String) null).build();
            OctetsFW build4 = NetworkConnectionPool.this.payloadRW.wrap2(NetworkConnectionPool.this.encodeBuffer, 0, this.encodeLimit).set((mutableDirectBuffer, i6, i7) -> {
                return i7 - i6;
            }).build();
            NetworkConnectionPool.this.clientStreamFactory.doData(this.networkTarget, this.networkId, this.networkRequestPadding, build4);
            this.networkRequestBudget -= build4.sizeof() + this.networkRequestPadding;
        }

        /* JADX WARN: Type inference failed for: r0v17, types: [org.reaktivity.nukleus.kafka.internal.types.codec.offset.ListOffsetsRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v39, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v49, types: [org.reaktivity.nukleus.kafka.internal.types.codec.offset.ListOffsetsRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v5, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v56, types: [org.reaktivity.nukleus.kafka.internal.types.OctetsFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v75, types: [org.reaktivity.nukleus.kafka.internal.types.codec.offset.ListOffsetsTopicFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v89, types: [org.reaktivity.nukleus.kafka.internal.types.codec.offset.ListOffsetsPartitionRequestFW$Builder] */
        private void doListOffsetsRequest() {
            int partitionCount;
            RequestHeaderFW build = NetworkConnectionPool.this.requestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, 0, NetworkConnectionPool.this.encodeBuffer.capacity()).size(0).apiKey((short) 2).apiVersion((short) 2).correlationId(0).clientId((String) null).build();
            ListOffsetsRequestFW build2 = NetworkConnectionPool.this.listOffsetsRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build.limit(), NetworkConnectionPool.this.encodeBuffer.capacity()).isolationLevel(builder -> {
                builder.set(IsolationLevel.READ_UNCOMMITTED);
            }).topicCount(0).build();
            int limit = build2.limit();
            int i = 0;
            for (TopicMetadata topicMetadata : NetworkConnectionPool.this.topicMetadataByName.values()) {
                if (topicMetadata.offsetsRequired(this.brokerId) && (partitionCount = topicMetadata.partitionCount(this.brokerId)) > 0) {
                    i++;
                    limit = NetworkConnectionPool.this.listOffsetsTopicRW.wrap2(NetworkConnectionPool.this.encodeBuffer, limit, NetworkConnectionPool.this.encodeBuffer.capacity()).name(topicMetadata.topicName).partitionCount(partitionCount).build().limit();
                    for (int i2 = 0; i2 < topicMetadata.nodeIdsByPartition.length; i2++) {
                        if (topicMetadata.nodeIdsByPartition[i2] == this.brokerId) {
                            limit = NetworkConnectionPool.this.listOffsetsPartitionRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, limit, NetworkConnectionPool.this.encodeBuffer.capacity()).partitionId(i2).timestamp(EARLIEST_AVAILABLE_OFFSET).build().limit();
                        }
                    }
                }
            }
            if ((limit - 0) + this.networkRequestPadding <= this.networkRequestBudget) {
                int i3 = this.nextRequestId;
                this.nextRequestId = i3 + 1;
                NetworkConnectionPool.this.requestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build.offset(), build.limit()).size((limit - 0) - 4).apiKey((short) 2).apiVersion((short) 2).correlationId(i3).clientId((String) null).build();
                NetworkConnectionPool.this.listOffsetsRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build2.offset(), build2.limit()).isolationLevel(builder2 -> {
                    builder2.set(IsolationLevel.READ_UNCOMMITTED);
                }).topicCount(i).build();
                OctetsFW build3 = NetworkConnectionPool.this.payloadRW.wrap2(NetworkConnectionPool.this.encodeBuffer, 0, limit).set((mutableDirectBuffer, i4, i5) -> {
                    return i5 - i4;
                }).build();
                NetworkConnectionPool.this.clientStreamFactory.doData(this.networkTarget, this.networkId, this.networkRequestPadding, build3);
                this.networkRequestBudget -= build3.sizeof() + this.networkRequestPadding;
            }
        }

        abstract int addTopicToRequest(String str, IntLongConsumer intLongConsumer, IntToLongFunction intToLongFunction);

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        void handleData(DataFW dataFW) {
            if (this.offsetsRequested) {
                super.handleData(dataFW);
                return;
            }
            OctetsFW payload = dataFW.payload();
            this.networkResponseBudget -= payload.sizeof() + dataFW.padding();
            if (this.networkResponseBudget < 0) {
                NetworkConnectionPool.this.clientStreamFactory.doReset(this.networkReplyThrottle, this.networkReplyId);
            }
            int decode = this.fetchResponseDecoder.decode(payload, dataFW.trace());
            doOfferResponseBudget();
            if (decode >= 0) {
                if (!$assertionsDisabled && decode != 0) {
                    throw new AssertionError("bytes remaining after fetch response, pipelined requests are not being used");
                }
                this.nextResponseId++;
                doRequestIfNeeded();
            }
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        final void handleResponse(long j, DirectBuffer directBuffer, int i, int i2) {
            if (!$assertionsDisabled && !this.offsetsNeeded) {
                throw new AssertionError();
            }
            handleListOffsetsResponse(j, directBuffer, i, i2);
            this.offsetsNeeded = false;
            this.offsetsRequested = false;
        }

        final void handleListOffsetsResponse(long j, DirectBuffer directBuffer, int i, int i2) {
            ListOffsetsResponseFW wrap = NetworkConnectionPool.this.listOffsetsResponseRO.wrap(directBuffer, i, i2);
            int i3 = wrap.topicCount();
            int limit = wrap.limit();
            short s = 0;
            for (int i4 = 0; i4 < i3 && s == 0; i4++) {
                ListOffsetsTopicFW wrap2 = NetworkConnectionPool.this.listOffsetsTopicRO.wrap(directBuffer, limit, i2);
                TopicMetadata topicMetadata = (TopicMetadata) NetworkConnectionPool.this.topicMetadataByName.get(wrap2.name().asString());
                limit = wrap2.limit();
                int partitionCount = wrap2.partitionCount();
                for (int i5 = 0; i5 < partitionCount && s == 0; i5++) {
                    ListOffsetsPartitionResponseFW wrap3 = NetworkConnectionPool.this.listOffsetsPartitionResponseRO.wrap(directBuffer, limit, i2);
                    s = wrap3.errorCode();
                    if (s != 0) {
                        break;
                    }
                    topicMetadata.setFirstOffset(wrap3.partitionId(), wrap3.firstOffset());
                    limit = wrap3.limit();
                }
            }
        }

        final long getRequestedOffset(String str, int i) {
            return this.requestedFetchOffsetsByTopic.get(str)[i];
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        void doReinitialize() {
            this.fetchResponseDecoder.reinitialize();
            super.doReinitialize();
        }

        private DecoderMessageDispatcher getTopicDispatcher(String str) {
            NetworkTopic networkTopic = (NetworkTopic) NetworkConnectionPool.this.topicsByName.get(str);
            return networkTopic == null ? NetworkConnectionPool.NOOP_DISPATCHER : networkTopic.dispatcher;
        }

        private void handlePartitionResponseError(String str, int i, short s) {
            switch (s) {
                case 1:
                    this.offsetsNeeded = true;
                    ((TopicMetadata) NetworkConnectionPool.this.topicMetadataByName.get(str)).setFirstOffset(i, -1L);
                    return;
                default:
                    throw new IllegalStateException(String.format("%s: unexpected error code %d from fetch, topic %s, partition %d, requested offset %d", this, Short.valueOf(s), str, Integer.valueOf(i), Long.valueOf(getRequestedOffset(str, i))));
            }
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        public String toString() {
            return String.format("%s [brokerId=%d, host=%s, port=%d, budget=%d, padding=%d]", getClass().getName(), Integer.valueOf(this.brokerId), this.host, Integer.valueOf(this.port), Integer.valueOf(this.networkRequestBudget), Integer.valueOf(this.networkRequestPadding));
        }

        static {
            $assertionsDisabled = !NetworkConnectionPool.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$AbstractNetworkConnection.class */
    public abstract class AbstractNetworkConnection {
        final MessageConsumer networkTarget;
        long networkId;
        long networkCorrelationId;
        int networkRequestBudget;
        int networkRequestPadding;
        int networkResponseBudget;
        MessageConsumer networkReplyThrottle;
        long networkReplyId;
        private MessageConsumer streamState;
        int networkSlot;
        int networkSlotOffset;
        int nextRequestId;
        int nextResponseId;
        final MutableDirectBuffer localDecodeBuffer;
        static final /* synthetic */ boolean $assertionsDisabled;

        private AbstractNetworkConnection() {
            this.networkSlot = -1;
            this.networkTarget = NetworkConnectionPool.this.clientStreamFactory.router.supplyTarget(NetworkConnectionPool.this.networkName);
            this.localDecodeBuffer = new UnsafeBuffer(ByteBuffer.allocateDirect(NetworkConnectionPool.this.fetchPartitionMaxBytes));
        }

        public String toString() {
            return String.format("[budget=%d, padding=%d]", Integer.valueOf(this.networkRequestBudget), Integer.valueOf(this.networkRequestPadding));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public MessageConsumer onCorrelated(MessageConsumer messageConsumer, long j) {
            this.networkReplyThrottle = messageConsumer;
            this.networkReplyId = j;
            this.streamState = this::beforeBegin;
            return this::handleStream;
        }

        void doBeginIfNotConnected() {
            doBeginIfNotConnected((mutableDirectBuffer, i, i2) -> {
                return 0;
            });
        }

        final void doBeginIfNotConnected(Flyweight.Builder.Visitor visitor) {
            if (this.networkId == 0) {
                long asLong = NetworkConnectionPool.this.clientStreamFactory.supplyStreamId.getAsLong();
                long asLong2 = NetworkConnectionPool.this.clientStreamFactory.supplyCorrelationId.getAsLong();
                NetworkConnectionPool.this.clientStreamFactory.correlations.put(asLong2, this);
                NetworkConnectionPool.this.clientStreamFactory.doBegin(this.networkTarget, asLong, NetworkConnectionPool.this.networkRef, asLong2, visitor);
                NetworkConnectionPool.this.clientStreamFactory.router.setThrottle(NetworkConnectionPool.this.networkName, asLong, this::handleThrottle);
                this.networkId = asLong;
                this.networkCorrelationId = asLong2;
            }
        }

        abstract void doRequestIfNeeded();

        void close() {
            if (this.networkId != 0) {
                NetworkConnectionPool.this.clientStreamFactory.doEnd(this.networkTarget, this.networkId);
                this.networkId = 0L;
            }
        }

        private void handleThrottle(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1073741825:
                    handleReset(NetworkConnectionPool.this.clientStreamFactory.resetRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741826:
                    handleWindow(NetworkConnectionPool.this.windowRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                default:
                    return;
            }
        }

        private void handleWindow(WindowFW windowFW) {
            int credit = windowFW.credit();
            int padding = windowFW.padding();
            this.networkRequestBudget += credit;
            this.networkRequestPadding = padding;
            doRequestIfNeeded();
        }

        private void handleReset(ResetFW resetFW) {
            NetworkConnectionPool.this.clientStreamFactory.correlations.remove(Long.valueOf(this.networkCorrelationId), this);
            doReinitialize();
            doRequestIfNeeded();
        }

        private void handleStream(int i, DirectBuffer directBuffer, int i2, int i3) {
            this.streamState.accept(i, directBuffer, i2, i3);
        }

        private void beforeBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
            if (i == 1) {
                handleBegin(NetworkConnectionPool.this.clientStreamFactory.beginRO.wrap(directBuffer, i2, i2 + i3));
            } else {
                NetworkConnectionPool.this.clientStreamFactory.doReset(this.networkReplyThrottle, this.networkReplyId);
            }
        }

        private void afterBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 2:
                    handleData(NetworkConnectionPool.this.clientStreamFactory.dataRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 3:
                    handleEnd(NetworkConnectionPool.this.clientStreamFactory.endRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 4:
                    handleAbort(NetworkConnectionPool.this.clientStreamFactory.abortRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                default:
                    NetworkConnectionPool.this.clientStreamFactory.doReset(this.networkReplyThrottle, this.networkReplyId);
                    return;
            }
        }

        private void handleBegin(BeginFW beginFW) {
            doOfferResponseBudget();
            this.streamState = this::afterBegin;
        }

        void handleData(DataFW dataFW) {
            OctetsFW payload = dataFW.payload();
            long trace = dataFW.trace();
            this.networkResponseBudget -= payload.sizeof() + dataFW.padding();
            if (this.networkResponseBudget < 0) {
                NetworkConnectionPool.this.clientStreamFactory.doReset(this.networkReplyThrottle, this.networkReplyId);
                return;
            }
            try {
                MutableDirectBuffer buffer = payload.buffer();
                int offset = payload.offset();
                int limit = payload.limit();
                if (this.networkSlot != -1) {
                    MutableDirectBuffer buffer2 = this.networkSlot == NetworkConnectionPool.LOCAL_SLOT ? this.localDecodeBuffer : NetworkConnectionPool.this.bufferPool.buffer(this.networkSlot);
                    int i = limit - offset;
                    buffer2.putBytes(this.networkSlotOffset, buffer, offset, i);
                    this.networkSlotOffset += i;
                    buffer = buffer2;
                    offset = 0;
                    limit = this.networkSlotOffset;
                }
                ResponseHeaderFW responseHeaderFW = null;
                int i2 = 0;
                if (offset + 4 <= limit) {
                    responseHeaderFW = NetworkConnectionPool.this.responseRO.wrap((DirectBuffer) buffer, offset, limit);
                    offset = responseHeaderFW.limit();
                    i2 = responseHeaderFW.size();
                }
                if (responseHeaderFW != null && offset + i2 <= limit) {
                    handleResponse(trace, buffer, offset, limit);
                    int size = offset + responseHeaderFW.size();
                    this.nextResponseId++;
                    if (size < limit) {
                        if (this.networkSlot == -1) {
                            this.networkSlot = NetworkConnectionPool.this.bufferPool.acquire(this.networkReplyId);
                        }
                        NetworkConnectionPool.this.bufferPool.buffer(this.networkSlot).putBytes(0, buffer, size, limit - size);
                        this.networkSlotOffset = limit - size;
                    } else {
                        if (!$assertionsDisabled && size != limit) {
                            throw new AssertionError();
                        }
                        this.networkSlotOffset = 0;
                        if (this.networkSlot == NetworkConnectionPool.LOCAL_SLOT) {
                            this.networkSlot = -1;
                        }
                    }
                    doOfferResponseBudget();
                    doRequestIfNeeded();
                } else if (this.networkSlot == -1) {
                    MutableDirectBuffer mutableDirectBuffer = this.localDecodeBuffer;
                    this.networkSlotOffset = 0;
                    this.networkSlot = NetworkConnectionPool.LOCAL_SLOT;
                    mutableDirectBuffer.putBytes(this.networkSlotOffset, payload.buffer(), payload.offset(), payload.sizeof());
                    this.networkSlotOffset += payload.sizeof();
                    doOfferResponseBudget();
                }
                if (this.networkSlotOffset != 0 || this.networkSlot == -1) {
                    return;
                }
                this.networkSlot = -1;
            } catch (Throwable th) {
                if (this.networkSlotOffset == 0 && this.networkSlot != -1) {
                    this.networkSlot = -1;
                }
                throw th;
            }
        }

        abstract void handleResponse(long j, DirectBuffer directBuffer, int i, int i2);

        private void handleEnd(EndFW endFW) {
            doReinitialize();
            doRequestIfNeeded();
        }

        private void handleAbort(AbortFW abortFW) {
            doReinitialize();
            doRequestIfNeeded();
        }

        void doOfferResponseBudget() {
            int max = Math.max(this.localDecodeBuffer.capacity() - this.networkResponseBudget, 0);
            if (max > 0) {
                NetworkConnectionPool.this.clientStreamFactory.doWindow(this.networkReplyThrottle, this.networkReplyId, max, 0, 0L);
                this.networkResponseBudget += max;
            }
        }

        void doReinitialize() {
            this.networkSlotOffset = 0;
            this.networkRequestBudget = 0;
            this.networkRequestPadding = 0;
            this.networkResponseBudget = 0;
            this.networkId = 0L;
            this.nextRequestId = 0;
            this.nextResponseId = 0;
        }

        static {
            $assertionsDisabled = !NetworkConnectionPool.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$BrokerMetadata.class */
    public static final class BrokerMetadata {
        final int nodeId;
        final String host;
        final int port;

        BrokerMetadata(int i, String str, int i2) {
            this.nodeId = i;
            this.host = str;
            this.port = i2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$HistoricalFetchConnection.class */
    public final class HistoricalFetchConnection extends AbstractFetchConnection {
        private HistoricalFetchConnection(BrokerMetadata brokerMetadata) {
            super(brokerMetadata);
        }

        /* JADX WARN: Type inference failed for: r0v43, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.PartitionRequestFW$Builder] */
        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractFetchConnection
        int addTopicToRequest(String str, IntLongConsumer intLongConsumer, IntToLongFunction intToLongFunction) {
            int maximumWritableBytes;
            int i = 0;
            int i2 = this.encodeLimit;
            NetworkTopic networkTopic = (NetworkTopic) NetworkConnectionPool.this.topicsByName.get(str);
            if (networkTopic.needsHistorical() && (maximumWritableBytes = networkTopic.maximumWritableBytes(false)) > 0) {
                TopicMetadata topicMetadata = (TopicMetadata) NetworkConnectionPool.this.topicMetadataByName.get(str);
                int[] iArr = topicMetadata.nodeIdsByPartition;
                int i3 = -1;
                for (NetworkTopicPartition networkTopicPartition : networkTopic.partitions) {
                    if (networkTopic.needsHistorical(networkTopicPartition.id) && i3 < networkTopicPartition.id && iArr[networkTopicPartition.id] == this.brokerId) {
                        PartitionRequestFW build = NetworkConnectionPool.this.partitionRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, this.encodeLimit, NetworkConnectionPool.this.encodeBuffer.capacity()).partitionId(networkTopicPartition.id).fetchOffset(Math.max(networkTopicPartition.offset, topicMetadata.firstAvailableOffset(networkTopicPartition.id))).maxBytes(maximumWritableBytes).build();
                        intLongConsumer.accept(networkTopicPartition.id, networkTopicPartition.offset);
                        this.encodeLimit = build.limit();
                        i3 = networkTopicPartition.id;
                        i++;
                    }
                }
                i = networkTopic.satisfyPartitionRequestsFromCache(i, intToLongFunction, NetworkConnectionPool.this.encodeBuffer, i2, this.encodeLimit, i4 -> {
                    this.encodeLimit = i4;
                });
            }
            return i;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$KafkaHeadersIterator.class */
    public static final class KafkaHeadersIterator implements Iterator<KafkaHeaderFW> {
        private final KafkaHeaderFW headerRO;
        private final IntArrayList offsets;
        private ListFW<KafkaHeaderFW> headers;
        private int position;

        private KafkaHeadersIterator() {
            this.headerRO = new KafkaHeaderFW();
            this.offsets = new IntArrayList();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Iterator<KafkaHeaderFW> wrap(ListFW<KafkaHeaderFW> listFW) {
            this.headers = listFW;
            this.offsets.clear();
            listFW.forEach(kafkaHeaderFW -> {
                this.offsets.addInt(kafkaHeaderFW.offset());
            });
            this.position = 0;
            return this;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.position < this.offsets.size();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public KafkaHeaderFW next() {
            int i = this.position;
            this.position++;
            return this.headerRO.wrap(this.headers.buffer(), this.offsets.getInt(i), this.headers.limit());
        }
    }

    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$LiveFetchConnection.class */
    private final class LiveFetchConnection extends AbstractFetchConnection {
        LiveFetchConnection(BrokerMetadata brokerMetadata) {
            super(brokerMetadata);
        }

        /* JADX WARN: Type inference failed for: r0v43, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.PartitionRequestFW$Builder] */
        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractFetchConnection
        int addTopicToRequest(String str, IntLongConsumer intLongConsumer, IntToLongFunction intToLongFunction) {
            NetworkTopic networkTopic = (NetworkTopic) NetworkConnectionPool.this.topicsByName.get(str);
            int maximumWritableBytes = networkTopic.maximumWritableBytes(true);
            int i = 0;
            if (maximumWritableBytes > 0) {
                TopicMetadata topicMetadata = (TopicMetadata) NetworkConnectionPool.this.topicMetadataByName.get(str);
                int[] iArr = topicMetadata.nodeIdsByPartition;
                Iterator<NetworkTopicPartition> it = networkTopic.partitions.iterator();
                NetworkTopicPartition next = it.hasNext() ? it.next() : null;
                while (true) {
                    NetworkTopicPartition networkTopicPartition = next;
                    if (networkTopicPartition == null) {
                        break;
                    }
                    NetworkTopicPartition next2 = it.hasNext() ? it.next() : null;
                    if ((next2 == null || next2.id != networkTopicPartition.id) && iArr[networkTopicPartition.id] == this.brokerId) {
                        PartitionRequestFW build = NetworkConnectionPool.this.partitionRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, this.encodeLimit, NetworkConnectionPool.this.encodeBuffer.capacity()).partitionId(networkTopicPartition.id).fetchOffset(Math.max(networkTopicPartition.offset, topicMetadata.firstAvailableOffset(networkTopicPartition.id))).maxBytes(maximumWritableBytes).build();
                        intLongConsumer.accept(networkTopicPartition.id, networkTopicPartition.offset);
                        this.encodeLimit = build.limit();
                        i++;
                    }
                    next = next2;
                }
            }
            return i;
        }
    }

    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$MetadataConnection.class */
    private final class MetadataConnection extends AbstractNetworkConnection {
        TopicMetadata pendingTopicMetadata;
        MetadataRequestType pendingRequest;
        static final /* synthetic */ boolean $assertionsDisabled;

        private MetadataConnection() {
            super();
            this.pendingRequest = MetadataRequestType.METADATA;
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        void doRequestIfNeeded() {
            if (this.nextRequestId == this.nextResponseId) {
                if (this.pendingTopicMetadata == null) {
                    Optional findFirst = NetworkConnectionPool.this.topicMetadataByName.values().stream().filter(topicMetadata -> {
                        return topicMetadata.nodeIdsByPartition == null;
                    }).findFirst();
                    if (findFirst.isPresent()) {
                        this.pendingTopicMetadata = (TopicMetadata) findFirst.get();
                    }
                }
                if (this.pendingTopicMetadata != null) {
                    switch (this.pendingTopicMetadata.nextRequiredRequestType()) {
                        case DESCRIBE_CONFIGS:
                            doDescribeConfigsRequest();
                            return;
                        case METADATA:
                            doMetadataRequest();
                            return;
                        default:
                            return;
                    }
                }
            }
        }

        /* JADX WARN: Type inference failed for: r0v11, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v23, types: [org.reaktivity.nukleus.kafka.internal.types.codec.config.DescribeConfigsRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v31, types: [org.reaktivity.nukleus.kafka.internal.types.codec.config.ResourceRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v41, types: [org.reaktivity.nukleus.kafka.internal.types.String16FW$Builder] */
        /* JADX WARN: Type inference failed for: r0v54, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v64, types: [org.reaktivity.nukleus.kafka.internal.types.OctetsFW$Builder] */
        private void doDescribeConfigsRequest() {
            doBeginIfNotConnected();
            if (this.networkRequestBudget > this.networkRequestPadding) {
                MutableDirectBuffer mutableDirectBuffer = NetworkConnectionPool.this.encodeBuffer;
                RequestHeaderFW build = NetworkConnectionPool.this.requestRW.wrap2(mutableDirectBuffer, 0, mutableDirectBuffer.capacity()).size(0).apiKey((short) 32).apiVersion((short) 0).correlationId(0).clientId((String) null).build();
                int limit = NetworkConnectionPool.this.configNameRW.wrap2(mutableDirectBuffer, NetworkConnectionPool.this.resourceRequestRW.wrap2(mutableDirectBuffer, NetworkConnectionPool.this.describeConfigsRequestRW.wrap2(mutableDirectBuffer, build.limit(), mutableDirectBuffer.capacity()).resourceCount(1).build().limit(), mutableDirectBuffer.capacity()).type((byte) 2).name(this.pendingTopicMetadata.topicName).configNamesCount(1).build().limit(), mutableDirectBuffer.capacity()).set(NetworkConnectionPool.CLEANUP_POLICY, StandardCharsets.UTF_8).build().limit();
                if ((limit - 0) + this.networkRequestPadding <= this.networkRequestBudget) {
                    int i = this.nextRequestId;
                    this.nextRequestId = i + 1;
                    NetworkConnectionPool.this.requestRW.wrap2(mutableDirectBuffer, build.offset(), build.limit()).size((limit - 0) - 4).apiKey((short) 32).apiVersion((short) 0).correlationId(i).clientId((String) null).build();
                    OctetsFW build2 = NetworkConnectionPool.this.payloadRW.wrap2(mutableDirectBuffer, 0, limit).set((mutableDirectBuffer2, i2, i3) -> {
                        return i3 - i2;
                    }).build();
                    NetworkConnectionPool.this.clientStreamFactory.doData(this.networkTarget, this.networkId, this.networkRequestPadding, build2);
                    this.networkRequestBudget -= build2.sizeof() + this.networkRequestPadding;
                    this.pendingRequest = MetadataRequestType.DESCRIBE_CONFIGS;
                }
            }
        }

        /* JADX WARN: Type inference failed for: r0v20, types: [org.reaktivity.nukleus.kafka.internal.types.codec.metadata.MetadataRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v34, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v44, types: [org.reaktivity.nukleus.kafka.internal.types.OctetsFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v8, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        private void doMetadataRequest() {
            doBeginIfNotConnected();
            if (this.networkRequestBudget > this.networkRequestPadding) {
                RequestHeaderFW build = NetworkConnectionPool.this.requestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, 0, NetworkConnectionPool.this.encodeBuffer.capacity()).size(0).apiKey((short) 3).apiVersion((short) 5).correlationId(0).clientId((String) null).build();
                int limit = NetworkConnectionPool.this.metadataRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build.limit(), NetworkConnectionPool.this.encodeBuffer.capacity()).topicCount(1).topicName(this.pendingTopicMetadata.topicName).build().limit();
                if ((limit - 0) + this.networkRequestPadding <= this.networkRequestBudget) {
                    int i = this.nextRequestId;
                    this.nextRequestId = i + 1;
                    NetworkConnectionPool.this.requestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build.offset(), build.limit()).size((limit - 0) - 4).apiKey((short) 3).apiVersion((short) 5).correlationId(i).clientId((String) null).build();
                    OctetsFW build2 = NetworkConnectionPool.this.payloadRW.wrap2(NetworkConnectionPool.this.encodeBuffer, 0, limit).set((mutableDirectBuffer, i2, i3) -> {
                        return i3 - i2;
                    }).build();
                    NetworkConnectionPool.this.clientStreamFactory.doData(this.networkTarget, this.networkId, this.networkRequestPadding, build2);
                    this.networkRequestBudget -= build2.sizeof() + this.networkRequestPadding;
                    this.pendingRequest = MetadataRequestType.METADATA;
                }
            }
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        void handleResponse(long j, DirectBuffer directBuffer, int i, int i2) {
            switch (this.pendingRequest) {
                case DESCRIBE_CONFIGS:
                    handleDescribeConfigsResponse(j, directBuffer, i, i2);
                    return;
                case METADATA:
                    handleMetadataResponse(j, directBuffer, i, i2);
                    return;
                default:
                    return;
            }
        }

        void handleDescribeConfigsResponse(long j, DirectBuffer directBuffer, int i, int i2) {
            DescribeConfigsResponseFW wrap = NetworkConnectionPool.this.describeConfigsResponseRO.wrap(directBuffer, i, i2);
            int resourceCount = wrap.resourceCount();
            int limit = wrap.limit();
            short s = 0;
            boolean z = false;
            for (int i3 = 0; i3 < resourceCount && s == 0; i3++) {
                ResourceResponseFW wrap2 = NetworkConnectionPool.this.resourceResponseRO.wrap(directBuffer, limit, i2);
                String16FW name = wrap2.name();
                if (!$assertionsDisabled && !name.asString().equals(this.pendingTopicMetadata.topicName)) {
                    throw new AssertionError();
                }
                s = wrap2.errorCode();
                if (s != 0) {
                    break;
                }
                int configCount = wrap2.configCount();
                limit = wrap2.limit();
                for (int i4 = 0; i4 < configCount && s == 0; i4++) {
                    ConfigResponseFW wrap3 = NetworkConnectionPool.this.configResponseRO.wrap(directBuffer, limit, i2);
                    String16FW name2 = wrap3.name();
                    if (!$assertionsDisabled && !name2.asString().equals(NetworkConnectionPool.CLEANUP_POLICY)) {
                        throw new AssertionError();
                    }
                    z = wrap3.value() != null && wrap3.value().asString().equals(NetworkConnectionPool.COMPACT);
                    limit = wrap3.limit();
                }
            }
            if (s != 0 && KafkaErrors.isRecoverable(s)) {
                this.pendingTopicMetadata.reset();
                doRequestIfNeeded();
                return;
            }
            TopicMetadata topicMetadata = this.pendingTopicMetadata;
            this.pendingTopicMetadata = null;
            topicMetadata.setErrorCode(s);
            topicMetadata.setCompacted(z);
            topicMetadata.setCompleted();
            topicMetadata.flush();
        }

        void handleMetadataResponse(long j, DirectBuffer directBuffer, int i, int i2) {
            MetadataResponseFW wrap = NetworkConnectionPool.this.metadataResponseRO.wrap(directBuffer, i, i2);
            int brokerCount = wrap.brokerCount();
            this.pendingTopicMetadata.initializeBrokers(brokerCount);
            int limit = wrap.limit();
            for (int i3 = 0; i3 < brokerCount; i3++) {
                BrokerMetadataFW wrap2 = NetworkConnectionPool.this.brokerMetadataRO.wrap(directBuffer, limit, i2);
                this.pendingTopicMetadata.addBroker(wrap2.nodeId(), wrap2.host().asString(), wrap2.port());
                limit = wrap2.limit();
            }
            MetadataResponsePart2FW wrap3 = NetworkConnectionPool.this.metadataResponsePart2RO.wrap(directBuffer, limit, i2);
            int i4 = wrap3.topicCount();
            if (!$assertionsDisabled && i4 != 1) {
                throw new AssertionError();
            }
            int limit2 = wrap3.limit();
            short s = 0;
            for (int i5 = 0; i5 < i4 && s == 0; i5++) {
                TopicMetadataFW wrap4 = NetworkConnectionPool.this.topicMetadataRO.wrap(directBuffer, limit2, i2);
                String16FW string16FW = wrap4.topic();
                if (!$assertionsDisabled && !string16FW.asString().equals(this.pendingTopicMetadata.topicName)) {
                    throw new AssertionError();
                }
                s = wrap4.errorCode();
                if (s != 0) {
                    break;
                }
                int partitionCount = wrap4.partitionCount();
                limit2 = wrap4.limit();
                this.pendingTopicMetadata.initializePartitions(partitionCount);
                for (int i6 = 0; i6 < partitionCount && s == 0; i6++) {
                    PartitionMetadataFW wrap5 = NetworkConnectionPool.this.partitionMetadataRO.wrap(directBuffer, limit2, i2);
                    s = wrap5.errorCode();
                    this.pendingTopicMetadata.addPartition(wrap5.partitionId(), wrap5.leader());
                    limit2 = wrap5.limit();
                }
            }
            TopicMetadata topicMetadata = this.pendingTopicMetadata;
            if (s == 0) {
                this.pendingTopicMetadata.nextRequiredRequestType = MetadataRequestType.DESCRIBE_CONFIGS;
            } else if (KafkaErrors.isRecoverable(s)) {
                this.pendingTopicMetadata.reset();
                doRequestIfNeeded();
            } else {
                NetworkConnectionPool.this.topicMetadataByName.remove(topicMetadata.topicName);
                this.pendingTopicMetadata = null;
                topicMetadata.setErrorCode(s);
                topicMetadata.flush();
            }
        }

        static {
            $assertionsDisabled = !NetworkConnectionPool.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$MetadataRequestType.class */
    public enum MetadataRequestType {
        METADATA,
        DESCRIBE_CONFIGS
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$NetworkTopic.class */
    public final class NetworkTopic {
        private final String topicName;
        private final boolean compacted;
        private final Set<IntSupplier> windowSuppliers;
        final NavigableSet<NetworkTopicPartition> partitions;
        private final NetworkTopicPartition candidate;
        private final TopicMessageDispatcher dispatcher;
        private final PartitionProgressHandler progressHandler;
        private BitSet needsHistoricalByPartition;
        private final boolean proactive;
        static final /* synthetic */ boolean $assertionsDisabled;

        public String toString() {
            return String.format("topicName=%s, partitions=%s", this.topicName, this.partitions);
        }

        private NetworkTopic(String str, int i, boolean z, boolean z2) {
            this.needsHistoricalByPartition = new BitSet();
            this.topicName = str;
            this.compacted = z;
            this.windowSuppliers = new HashSet();
            this.partitions = new TreeSet();
            this.candidate = new NetworkTopicPartition();
            this.progressHandler = this::handleProgress;
            PartitionIndex[] partitionIndexArr = new PartitionIndex[i];
            if (z) {
                for (int i2 = 0; i2 < i; i2++) {
                    partitionIndexArr[i2] = new CompactedPartitionIndex(1000, NetworkConnectionPool.KAFKA_SERVER_DEFAULT_DELETE_RETENTION_MS, NetworkConnectionPool.this.messageCache);
                }
            } else {
                for (int i3 = 0; i3 < i; i3++) {
                    partitionIndexArr[i3] = NetworkConnectionPool.DEFAULT_PARTITION_INDEX;
                }
            }
            this.dispatcher = new TopicMessageDispatcher(partitionIndexArr, z ? CompactedHeaderValueMessageDispatcher::new : HeaderValueMessageDispatcher::new);
            this.proactive = z2;
            List list = (List) NetworkConnectionPool.this.routeHeadersByTopic.get(str);
            if (list != null) {
                list.forEach(listFW -> {
                    this.dispatcher.add(null, -1, NetworkConnectionPool.this.headersIterator.wrap(listFW), NetworkConnectionPool.MATCHING_MESSAGE_DISPATCHER);
                });
            } else {
                this.dispatcher.add(null, -1, Collections.emptyIterator(), NetworkConnectionPool.MATCHING_MESSAGE_DISPATCHER);
            }
            if (z2) {
                for (int i4 = 0; i4 < i; i4++) {
                    attachToPartition(i4, 0L, 1);
                }
                this.dispatcher.add(null, -1, Collections.emptyIterator(), new ProgressUpdatingMessageDispatcher(i, this.progressHandler));
            }
        }

        PartitionProgressHandler doAttach(Long2LongHashMap long2LongHashMap, OctetsFW octetsFW, ListFW<KafkaHeaderFW> listFW, MessageDispatcher messageDispatcher, IntSupplier intSupplier) {
            int intValue;
            this.windowSuppliers.add(intSupplier);
            if (octetsFW == null) {
                intValue = -1;
                Long2LongHashMap.LongIterator it = long2LongHashMap.keySet().iterator();
                while (it.hasNext()) {
                    int nextValue = (int) it.nextValue();
                    long j = long2LongHashMap.get(nextValue);
                    long offset = this.dispatcher.entries(nextValue, j).next().offset();
                    if (offset > j) {
                        long2LongHashMap.put(nextValue, offset);
                        j = offset;
                    }
                    attachToPartition(nextValue, j, 1);
                }
            } else {
                intValue = long2LongHashMap.keySet().iterator().next().intValue();
                long j2 = long2LongHashMap.get(intValue);
                if (this.compacted) {
                    long offset2 = this.dispatcher.getEntry(intValue, j2, octetsFW).offset();
                    if (offset2 > j2) {
                        long2LongHashMap.put(intValue, offset2);
                        j2 = offset2;
                    }
                }
                attachToPartition(intValue, j2, 1);
            }
            NetworkConnectionPool.this.headersIterator.wrap(listFW);
            this.dispatcher.add(octetsFW, intValue, NetworkConnectionPool.this.headersIterator, messageDispatcher);
            return this.progressHandler;
        }

        private void attachToPartition(int i, long j, int i2) {
            boolean z;
            this.candidate.id = i;
            this.candidate.offset = j;
            NetworkTopicPartition floor = this.partitions.floor(this.candidate);
            if (!this.candidate.equals(floor)) {
                if (floor == null || floor.id != this.candidate.id) {
                    NetworkTopicPartition ceiling = this.partitions.ceiling(this.candidate);
                    z = ceiling != null && ceiling.id == this.candidate.id;
                } else {
                    z = true;
                }
                this.needsHistoricalByPartition.set(this.candidate.id, z);
                floor = new NetworkTopicPartition();
                floor.id = this.candidate.id;
                floor.offset = this.candidate.offset;
                this.partitions.add(floor);
            }
            NetworkTopicPartition.access$2712(floor, i2);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void doDetach(Long2LongHashMap long2LongHashMap, OctetsFW octetsFW, ListFW<KafkaHeaderFW> listFW, MessageDispatcher messageDispatcher, IntSupplier intSupplier) {
            this.windowSuppliers.remove(intSupplier);
            int longValue = (int) (octetsFW == null ? -1L : long2LongHashMap.keySet().iterator().next().longValue());
            NetworkConnectionPool.this.headersIterator.wrap(listFW);
            this.dispatcher.remove(octetsFW, longValue, NetworkConnectionPool.this.headersIterator, messageDispatcher);
            Long2LongHashMap.LongIterator it = long2LongHashMap.keySet().iterator();
            while (it.hasNext()) {
                long nextValue = it.nextValue();
                doDetach((int) nextValue, long2LongHashMap.get(nextValue), intSupplier);
            }
            if (!this.partitions.isEmpty() || this.compacted) {
                return;
            }
            NetworkConnectionPool.this.topicsByName.remove(this.topicName);
            NetworkConnectionPool.this.topicMetadataByName.remove(this.topicName);
        }

        void doDetach(int i, long j, IntSupplier intSupplier) {
            this.windowSuppliers.remove(intSupplier);
            this.candidate.id = i;
            this.candidate.offset = j;
            NetworkTopicPartition floor = this.partitions.floor(this.candidate);
            if (floor != null) {
                if (!$assertionsDisabled && floor.id != this.candidate.id) {
                    throw new AssertionError();
                }
                NetworkTopicPartition.access$2710(floor);
                if (floor.refs == 0) {
                    remove(floor);
                }
            }
        }

        int maximumWritableBytes(boolean z) {
            int min;
            int i = 0;
            if (z && this.proactive) {
                min = NetworkConnectionPool.this.fetchPartitionMaxBytes;
            } else {
                Iterator<IntSupplier> it = this.windowSuppliers.iterator();
                while (it.hasNext()) {
                    i = Math.max(i, it.next().getAsInt());
                }
                min = Math.min(i, NetworkConnectionPool.this.maximumMessageSize);
            }
            return min;
        }

        /* JADX WARN: Type inference failed for: r0v41, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.PartitionRequestFW$Builder] */
        int satisfyPartitionRequestsFromCache(int i, IntToLongFunction intToLongFunction, MutableDirectBuffer mutableDirectBuffer, int i2, int i3, IntConsumer intConsumer) {
            int i4 = i;
            int i5 = i3;
            for (int i6 = 0; i6 < i; i6++) {
                PartitionRequestFW wrap = NetworkConnectionPool.this.partitionRequestRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i3);
                if (!$assertionsDisabled && wrap.limit() > i3) {
                    throw new AssertionError();
                }
                int partitionId = wrap.partitionId();
                long fetchOffset = wrap.fetchOffset();
                long logStartOffset = wrap.logStartOffset();
                int maxBytes = wrap.maxBytes();
                long j = fetchOffset;
                Iterator<PartitionIndex.Entry> entries = this.dispatcher.entries(partitionId, fetchOffset);
                boolean z = true;
                boolean z2 = false;
                long applyAsLong = intToLongFunction.applyAsLong(partitionId);
                while (true) {
                    if (!entries.hasNext()) {
                        break;
                    }
                    PartitionIndex.Entry next = entries.next();
                    j = next.offset();
                    MessageFW messageFW = NetworkConnectionPool.this.messageCache.get(next.message(), NetworkConnectionPool.this.messageRO);
                    if (messageFW == null) {
                        z = false;
                        break;
                    }
                    j++;
                    int dispatch = this.dispatcher.dispatch(partitionId, applyAsLong, j, wrap(NetworkConnectionPool.this.keyBuffer, messageFW.key()), NetworkConnectionPool.this.headersRO.wrap(messageFW.headers().buffer(), messageFW.headers().offset(), messageFW.headers().limit()).headerSupplier(), messageFW.timestamp(), messageFW.traceId(), wrap(NetworkConnectionPool.this.valueBuffer, messageFW.value()));
                    z2 = true;
                    if (MessageDispatcher.blocked(dispatch) && !MessageDispatcher.delivered(dispatch)) {
                        z = true;
                        break;
                    }
                }
                if (z) {
                    j = this.dispatcher.nextOffset(partitionId);
                    mutableDirectBuffer.putBytes(i2, mutableDirectBuffer, wrap.limit(), i3);
                    i5 -= wrap.sizeof();
                    i4--;
                } else {
                    if (j > fetchOffset) {
                        wrap = NetworkConnectionPool.this.partitionRequestRW.wrap2(mutableDirectBuffer, i2, i5).partitionId(partitionId).fetchOffset(j).logStartOffset(logStartOffset).maxBytes(maxBytes).build();
                    }
                    i2 = wrap.limit();
                }
                if (z2) {
                    this.dispatcher.flush(partitionId, applyAsLong, j);
                }
            }
            if (!$assertionsDisabled && i3 > i3) {
                throw new AssertionError();
            }
            if (i5 < i3) {
                intConsumer.accept(i5);
            }
            return i4;
        }

        private void handleProgress(int i, long j, long j2) {
            this.candidate.id = i;
            this.candidate.offset = j;
            NetworkTopicPartition floor = this.partitions.floor(this.candidate);
            if (!$assertionsDisabled && floor == null) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && floor.offset != j) {
                throw new AssertionError();
            }
            NetworkTopicPartition.access$2710(floor);
            this.candidate.offset = j2;
            NetworkTopicPartition floor2 = this.partitions.floor(this.candidate);
            if (floor2 == null || floor2.offset != j2) {
                if (floor2 != null && floor2 != floor) {
                    this.needsHistoricalByPartition.set(i);
                }
                floor2 = new NetworkTopicPartition();
                floor2.id = i;
                floor2.offset = j2;
                this.partitions.add(floor2);
            }
            NetworkTopicPartition.access$2708(floor2);
            if (floor.refs == 0) {
                remove(floor);
            }
        }

        private void remove(NetworkTopicPartition networkTopicPartition) {
            this.partitions.remove(networkTopicPartition);
            boolean z = false;
            networkTopicPartition.offset = 0L;
            NetworkTopicPartition ceiling = this.partitions.ceiling(networkTopicPartition);
            if (ceiling != null && ceiling.id == networkTopicPartition.id) {
                networkTopicPartition.offset = Long.MAX_VALUE;
                if (this.partitions.floor(networkTopicPartition) != ceiling) {
                    z = true;
                }
            }
            this.needsHistoricalByPartition.set(networkTopicPartition.id, z);
        }

        boolean needsHistorical() {
            return this.needsHistoricalByPartition.nextSetBit(0) != -1;
        }

        boolean needsHistorical(int i) {
            return this.needsHistoricalByPartition.get(i);
        }

        DirectBuffer wrap(MutableDirectBuffer mutableDirectBuffer, Flyweight flyweight) {
            MutableDirectBuffer mutableDirectBuffer2 = null;
            if (flyweight != null) {
                mutableDirectBuffer.wrap(flyweight.buffer(), flyweight.offset(), flyweight.sizeof());
                mutableDirectBuffer2 = mutableDirectBuffer;
            }
            return mutableDirectBuffer2;
        }

        static {
            $assertionsDisabled = !NetworkConnectionPool.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$NetworkTopicPartition.class */
    public static final class NetworkTopicPartition implements Comparable<NetworkTopicPartition> {
        private static final NetworkTopicPartition NONE = new NetworkTopicPartition();
        int id;
        long offset;
        private int refs;

        private NetworkTopicPartition() {
        }

        @Override // java.lang.Comparable
        public int compareTo(NetworkTopicPartition networkTopicPartition) {
            int i = this.id - networkTopicPartition.id;
            if (i == 0) {
                i = (int) (((this.offset - networkTopicPartition.offset) >> 32) & (-1));
            }
            if (i == 0) {
                i = (int) ((this.offset - networkTopicPartition.offset) & (-1));
            }
            return i;
        }

        public boolean equals(Object obj) {
            boolean z = false;
            if (this == obj) {
                z = true;
            } else if (obj != null && obj.getClass() == NetworkTopicPartition.class) {
                NetworkTopicPartition networkTopicPartition = (NetworkTopicPartition) obj;
                z = this.id == networkTopicPartition.id && this.offset == networkTopicPartition.offset;
            }
            return z;
        }

        public int hashCode() {
            return (31 * this.id) + (this.id ^ (this.id >>> 32));
        }

        public String toString() {
            return String.format("id=%d, offset=%d, refs=%d", Integer.valueOf(this.id), Long.valueOf(this.offset), Integer.valueOf(this.refs));
        }

        static /* synthetic */ int access$2712(NetworkTopicPartition networkTopicPartition, int i) {
            int i2 = networkTopicPartition.refs + i;
            networkTopicPartition.refs = i2;
            return i2;
        }

        static /* synthetic */ int access$2710(NetworkTopicPartition networkTopicPartition) {
            int i = networkTopicPartition.refs;
            networkTopicPartition.refs = i - 1;
            return i;
        }

        static /* synthetic */ int access$2708(NetworkTopicPartition networkTopicPartition) {
            int i = networkTopicPartition.refs;
            networkTopicPartition.refs = i + 1;
            return i;
        }

        static {
            NONE.id = -1;
        }
    }

    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$ProgressUpdatingMessageDispatcher.class */
    private static final class ProgressUpdatingMessageDispatcher implements MessageDispatcher {
        private final long[] offsets;
        private final PartitionProgressHandler progressHandler;

        ProgressUpdatingMessageDispatcher(int i, PartitionProgressHandler partitionProgressHandler) {
            this.offsets = new long[i];
            this.progressHandler = partitionProgressHandler;
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
        public int dispatch(int i, long j, long j2, DirectBuffer directBuffer, Function<DirectBuffer, DirectBuffer> function, long j3, long j4, DirectBuffer directBuffer2) {
            return 0;
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
        public void flush(int i, long j, long j2) {
            if (j2 > this.offsets[i]) {
                this.progressHandler.handle(i, this.offsets[i], j2);
                this.offsets[i] = j2;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$TopicMetadata.class */
    public static final class TopicMetadata {
        private static final long UNKNOWN_OFFSET = -1;
        private final String topicName;
        private short errorCode;
        private boolean compacted;
        private boolean completed;
        BrokerMetadata[] brokers;
        private int nextBrokerIndex;
        private int[] nodeIdsByPartition;
        private long[] firstOffsetsByPartition;
        private List<Consumer<TopicMetadata>> consumers = new ArrayList();
        private MetadataRequestType nextRequiredRequestType = MetadataRequestType.METADATA;

        TopicMetadata(String str) {
            this.topicName = str;
        }

        MetadataRequestType nextRequiredRequestType() {
            return this.nextRequiredRequestType;
        }

        void setCompacted(boolean z) {
            this.compacted = z;
        }

        void setCompleted() {
            this.completed = true;
        }

        void setErrorCode(short s) {
            this.errorCode = s;
        }

        void initializeBrokers(int i) {
            this.brokers = new BrokerMetadata[i];
        }

        void addBroker(int i, String str, int i2) {
            BrokerMetadata[] brokerMetadataArr = this.brokers;
            int i3 = this.nextBrokerIndex;
            this.nextBrokerIndex = i3 + 1;
            brokerMetadataArr[i3] = new BrokerMetadata(i, str, i2);
        }

        void initializePartitions(int i) {
            this.nodeIdsByPartition = new int[i];
            this.firstOffsetsByPartition = new long[i];
        }

        void addPartition(int i, int i2) {
            this.nodeIdsByPartition[i] = i2;
        }

        long firstAvailableOffset(int i) {
            return this.firstOffsetsByPartition[i];
        }

        boolean offsetsRequired(int i) {
            boolean z = false;
            if (this.nodeIdsByPartition != null) {
                int i2 = 0;
                while (true) {
                    if (i2 < this.nodeIdsByPartition.length) {
                        if (this.nodeIdsByPartition[i2] == i && this.firstOffsetsByPartition[i2] == -1) {
                            z = true;
                            break;
                        }
                        i2++;
                    } else {
                        break;
                    }
                }
            }
            return z;
        }

        void setFirstOffset(int i, long j) {
            this.firstOffsetsByPartition[i] = j;
        }

        void doAttach(Consumer<TopicMetadata> consumer) {
            this.consumers.add(consumer);
            if (this.completed) {
                flush();
            }
        }

        void flush() {
            Iterator<Consumer<TopicMetadata>> it = this.consumers.iterator();
            while (it.hasNext()) {
                it.next().accept(this);
            }
            this.consumers.clear();
        }

        short errorCode() {
            return this.errorCode;
        }

        void visitBrokers(Consumer<BrokerMetadata> consumer) {
            for (BrokerMetadata brokerMetadata : this.brokers) {
                consumer.accept(brokerMetadata);
            }
        }

        int partitionCount() {
            return this.nodeIdsByPartition.length;
        }

        int partitionCount(int i) {
            int i2 = 0;
            for (int i3 = 0; i3 < this.nodeIdsByPartition.length; i3++) {
                if (this.nodeIdsByPartition[i3] == i) {
                    i2++;
                }
            }
            return i2;
        }

        void reset() {
            this.brokers = null;
            this.nodeIdsByPartition = null;
            this.nextBrokerIndex = 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NetworkConnectionPool(ClientStreamFactory clientStreamFactory, String str, long j, int i, int i2, BufferPool bufferPool, MessageCache messageCache) {
        this.clientStreamFactory = clientStreamFactory;
        this.networkName = str;
        this.networkRef = j;
        this.fetchMaxBytes = i;
        this.fetchPartitionMaxBytes = i2;
        this.bufferPool = bufferPool;
        this.messageCache = messageCache;
        this.encodeBuffer = new UnsafeBuffer(new byte[clientStreamFactory.bufferPool.slotCapacity()]);
        this.maximumMessageSize = bufferPool.slotCapacity() > MAX_PADDING ? bufferPool.slotCapacity() - MAX_PADDING : bufferPool.slotCapacity();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doAttach(String str, Long2LongHashMap long2LongHashMap, int i, OctetsFW octetsFW, ListFW<KafkaHeaderFW> listFW, MessageDispatcher messageDispatcher, IntSupplier intSupplier, Consumer<PartitionProgressHandler> consumer, Consumer<Consumer<IntBooleanConsumer>> consumer2, IntConsumer intConsumer) {
        this.topicMetadataByName.computeIfAbsent(str, TopicMetadata::new).doAttach(topicMetadata -> {
            doAttach(str, long2LongHashMap, i, octetsFW, listFW, messageDispatcher, intSupplier, consumer, consumer2, intConsumer, topicMetadata);
        });
        if (this.metadataConnection == null) {
            this.metadataConnection = new MetadataConnection();
        }
        this.metadataConnection.doRequestIfNeeded();
    }

    private void doAttach(String str, Long2LongHashMap long2LongHashMap, int i, OctetsFW octetsFW, ListFW<KafkaHeaderFW> listFW, MessageDispatcher messageDispatcher, IntSupplier intSupplier, Consumer<PartitionProgressHandler> consumer, Consumer<Consumer<IntBooleanConsumer>> consumer2, IntConsumer intConsumer, TopicMetadata topicMetadata) {
        short errorCode = topicMetadata.errorCode();
        switch (errorCode) {
            case 0:
                if (octetsFW != null) {
                    int partition = BufferUtil.partition(i, topicMetadata.partitionCount());
                    long max = Math.max(long2LongHashMap.computeIfAbsent(0L, j -> {
                        return 0L;
                    }), topicMetadata.firstAvailableOffset(partition));
                    if (partition != 0) {
                        long2LongHashMap.remove(0L);
                    }
                    long2LongHashMap.put(partition, max);
                } else {
                    int partitionCount = topicMetadata.partitionCount();
                    for (int i2 = 0; i2 < partitionCount; i2++) {
                        long2LongHashMap.put(i2, Math.max(long2LongHashMap.computeIfAbsent(i2, j2 -> {
                            return 0L;
                        }), topicMetadata.firstAvailableOffset(i2)));
                    }
                }
                NetworkTopic computeIfAbsent = this.topicsByName.computeIfAbsent(str, str2 -> {
                    return new NetworkTopic(str2, topicMetadata.partitionCount(), topicMetadata.compacted, false);
                });
                consumer2.accept(intBooleanConsumer -> {
                    consumer.accept(computeIfAbsent.doAttach(long2LongHashMap, octetsFW, listFW, messageDispatcher, intSupplier));
                    int i3 = this.nextAttachId;
                    this.nextAttachId = i3 + 1;
                    this.detachersById.put(i3, long2LongHashMap2 -> {
                        computeIfAbsent.doDetach(long2LongHashMap2, octetsFW, listFW, messageDispatcher, intSupplier);
                    });
                    doConnections(topicMetadata);
                    intBooleanConsumer.accept(i3, topicMetadata.compacted);
                });
                return;
            case 3:
                intConsumer.accept(errorCode);
                return;
            case MessageDispatcher.FLAGS_BLOCKED /* 5 */:
                return;
            case 17:
                intConsumer.accept(errorCode);
                return;
            default:
                throw new RuntimeException(String.format("Unexpected errorCode %d from metadata query", Short.valueOf(errorCode)));
        }
    }

    public void addRoute(String str, ListFW<KafkaHeaderFW> listFW) {
        this.routeHeadersByTopic.computeIfAbsent(str, str2 -> {
            return new ArrayList();
        }).add(listFW);
    }

    public void doBootstrap(BiConsumer<Short, String> biConsumer) {
        for (String str : this.routeHeadersByTopic.keySet()) {
            this.topicMetadataByName.computeIfAbsent(str, TopicMetadata::new).doAttach(topicMetadata -> {
                doBootstrap(str, biConsumer, topicMetadata);
            });
            if (this.metadataConnection == null) {
                this.metadataConnection = new MetadataConnection();
            }
            this.metadataConnection.doRequestIfNeeded();
        }
    }

    private void doBootstrap(String str, BiConsumer<Short, String> biConsumer, TopicMetadata topicMetadata) {
        short errorCode = topicMetadata.errorCode();
        switch (errorCode) {
            case 0:
                if (topicMetadata.compacted) {
                    this.topicsByName.computeIfAbsent(str, str2 -> {
                        return new NetworkTopic(str2, topicMetadata.partitionCount(), topicMetadata.compacted, true);
                    });
                    doConnections(topicMetadata);
                    doFlush();
                    return;
                }
                return;
            case 3:
                biConsumer.accept(Short.valueOf(errorCode), str);
                return;
            case MessageDispatcher.FLAGS_BLOCKED /* 5 */:
                return;
            case 17:
                biConsumer.accept(Short.valueOf(errorCode), str);
                return;
            default:
                throw new RuntimeException(String.format("Unexpected errorCode %d from metadata query", Short.valueOf(errorCode)));
        }
    }

    private void doConnections(TopicMetadata topicMetadata) {
        topicMetadata.visitBrokers(brokerMetadata -> {
            this.connections = applyBrokerMetadata(this.connections, brokerMetadata, brokerMetadata -> {
                return new LiveFetchConnection(brokerMetadata);
            });
        });
        topicMetadata.visitBrokers(brokerMetadata2 -> {
            this.historicalConnections = (HistoricalFetchConnection[]) applyBrokerMetadata(this.historicalConnections, brokerMetadata2, brokerMetadata2 -> {
                return new HistoricalFetchConnection(brokerMetadata2);
            });
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v8, types: [org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool$AbstractFetchConnection[]] */
    private <T extends AbstractFetchConnection> T[] applyBrokerMetadata(T[] tArr, BrokerMetadata brokerMetadata, Function<BrokerMetadata, T> function) {
        T[] tArr2 = tArr;
        T t = null;
        int i = 0;
        while (true) {
            if (i >= tArr.length) {
                break;
            }
            T t2 = tArr[i];
            if (t2.brokerId == brokerMetadata.nodeId) {
                t = t2;
                if (!t2.host.equals(brokerMetadata.host) || t2.port != brokerMetadata.port) {
                    t2.close();
                    t = function.apply(brokerMetadata);
                }
            } else {
                i++;
            }
        }
        if (t == null) {
            tArr2 = (AbstractFetchConnection[]) ArrayUtil.add(tArr, function.apply(brokerMetadata));
        }
        return tArr2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doFlush() {
        for (AbstractFetchConnection abstractFetchConnection : this.connections) {
            abstractFetchConnection.doRequestIfNeeded();
        }
        for (HistoricalFetchConnection historicalFetchConnection : this.historicalConnections) {
            historicalFetchConnection.doRequestIfNeeded();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doDetach(int i, Long2LongHashMap long2LongHashMap) {
        Consumer consumer = (Consumer) this.detachersById.remove(i);
        if (consumer != null) {
            consumer.accept(long2LongHashMap);
        }
    }
}
