package org.reaktivity.nukleus.kafka.internal.stream;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.IntSupplier;
import java.util.function.IntToLongFunction;
import java.util.function.LongSupplier;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.TimerWheel;
import org.agrona.collections.ArrayUtil;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.IntArrayList;
import org.agrona.collections.Long2LongHashMap;
import org.agrona.collections.LongArrayList;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.buffer.BufferPool;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.kafka.internal.KafkaRefCounters;
import org.reaktivity.nukleus.kafka.internal.cache.CompactedPartitionIndex;
import org.reaktivity.nukleus.kafka.internal.cache.DefaultPartitionIndex;
import org.reaktivity.nukleus.kafka.internal.cache.MessageCache;
import org.reaktivity.nukleus.kafka.internal.cache.PartitionIndex;
import org.reaktivity.nukleus.kafka.internal.function.IntLongConsumer;
import org.reaktivity.nukleus.kafka.internal.function.PartitionProgressHandler;
import org.reaktivity.nukleus.kafka.internal.types.Flyweight;
import org.reaktivity.nukleus.kafka.internal.types.KafkaHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.ListFW;
import org.reaktivity.nukleus.kafka.internal.types.MessageFW;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;
import org.reaktivity.nukleus.kafka.internal.types.String16FW;
import org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.ResponseHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.config.ConfigResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.config.DescribeConfigsRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.config.DescribeConfigsResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.config.ResourceRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.config.ResourceResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.FetchRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.FetchResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.PartitionRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.PartitionResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.RecordSetFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.TopicRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.TopicResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.metadata.BrokerMetadataFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.metadata.MetadataRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.metadata.MetadataResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.metadata.MetadataResponsePart2FW;
import org.reaktivity.nukleus.kafka.internal.types.codec.metadata.PartitionMetadataFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.metadata.TopicMetadataFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.offset.IsolationLevel;
import org.reaktivity.nukleus.kafka.internal.types.codec.offset.ListOffsetsPartitionRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.offset.ListOffsetsPartitionResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.offset.ListOffsetsRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.offset.ListOffsetsResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.offset.ListOffsetsTopicFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.AbortFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.DataFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.EndFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.TcpBeginExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.WindowFW;
import org.reaktivity.nukleus.kafka.internal.util.BufferUtil;
import org.reaktivity.nukleus.kafka.internal.util.DelayedTaskScheduler;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool.class */
public final class NetworkConnectionPool {
    private static final short FETCH_API_VERSION = 5;
    private static final short FETCH_API_KEY = 1;
    private static final short LIST_OFFSETS_API_KEY = 2;
    private static final short LIST_OFFSETS_API_VERSION = 2;
    private static final short METADATA_API_VERSION = 5;
    private static final short METADATA_API_KEY = 3;
    private static final short DESCRIBE_CONFIGS_API_VERSION = 0;
    private static final short DESCRIBE_CONFIGS_API_KEY = 32;
    private static final long MAX_OFFSET = Long.MAX_VALUE;
    private static final long NO_OFFSET = -1;
    private static final byte RESOURCE_TYPE_TOPIC = 2;
    private static final String CLEANUP_POLICY = "cleanup.policy";
    private static final String DELETE_RETENTION_MS = "delete.retention.ms";
    private static final String COMPACT = "compact";
    private static final int LOCAL_SLOT = -2;
    private static final int KAFKA_SERVER_DEFAULT_DELETE_RETENTION_MS = 86400000;
    private static final LongSupplier NO_COUNTER;
    private static final DecoderMessageDispatcher NOOP_DISPATCHER;
    private static final MessageDispatcher MATCHING_MESSAGE_DISPATCHER;
    final MutableDirectBuffer encodeBuffer;
    private final ClientStreamFactory clientStreamFactory;
    private final String networkName;
    private final long networkRef;
    private final int fetchMaxBytes;
    private final int fetchPartitionMaxBytes;
    private final int readIdleTimeout;
    private final BufferPool bufferPool;
    private final MessageCache messageCache;
    private final boolean forceProactiveMessageCache;
    private MetadataConnection metadataConnection;
    private final KafkaRefCounters routeCounters;
    private int nextAttachId;
    private static final byte[] ANY_IP_ADDR = new byte[4];
    private static final PartitionIndex DEFAULT_PARTITION_INDEX = new DefaultPartitionIndex();
    final RequestHeaderFW.Builder requestRW = new RequestHeaderFW.Builder();
    final FetchRequestFW.Builder fetchRequestRW = new FetchRequestFW.Builder();
    final TopicRequestFW.Builder topicRequestRW = new TopicRequestFW.Builder();
    final PartitionRequestFW.Builder partitionRequestRW = new PartitionRequestFW.Builder();
    final PartitionRequestFW partitionRequestRO = new PartitionRequestFW();
    final MetadataRequestFW.Builder metadataRequestRW = new MetadataRequestFW.Builder();
    final DescribeConfigsRequestFW.Builder describeConfigsRequestRW = new DescribeConfigsRequestFW.Builder();
    final ListOffsetsRequestFW.Builder listOffsetsRequestRW = new ListOffsetsRequestFW.Builder();
    final ListOffsetsPartitionRequestFW.Builder listOffsetsPartitionRequestRW = new ListOffsetsPartitionRequestFW.Builder();
    final ListOffsetsTopicFW.Builder listOffsetsTopicRW = new ListOffsetsTopicFW.Builder();
    final ResourceRequestFW.Builder resourceRequestRW = new ResourceRequestFW.Builder();
    final String16FW.Builder configNameRW = new String16FW.Builder(ByteOrder.BIG_ENDIAN);
    final WindowFW windowRO = new WindowFW();
    final OctetsFW.Builder payloadRW = new OctetsFW.Builder();
    final TcpBeginExFW.Builder tcpBeginExRW = new TcpBeginExFW.Builder();
    final ResponseHeaderFW responseRO = new ResponseHeaderFW();
    final FetchResponseFW fetchResponseRO = new FetchResponseFW();
    final TopicResponseFW topicResponseRO = new TopicResponseFW();
    final PartitionResponseFW partitionResponseRO = new PartitionResponseFW();
    final RecordSetFW recordSetRO = new RecordSetFW();
    final MetadataResponseFW metadataResponseRO = new MetadataResponseFW();
    final BrokerMetadataFW brokerMetadataRO = new BrokerMetadataFW();
    final MetadataResponsePart2FW metadataResponsePart2RO = new MetadataResponsePart2FW();
    final TopicMetadataFW topicMetadataRO = new TopicMetadataFW();
    final PartitionMetadataFW partitionMetadataRO = new PartitionMetadataFW();
    final DescribeConfigsResponseFW describeConfigsResponseRO = new DescribeConfigsResponseFW();
    final ResourceResponseFW resourceResponseRO = new ResourceResponseFW();
    final ConfigResponseFW configResponseRO = new ConfigResponseFW();
    final ListOffsetsResponseFW listOffsetsResponseRO = new ListOffsetsResponseFW();
    final ListOffsetsTopicFW listOffsetsTopicRO = new ListOffsetsTopicFW();
    final ListOffsetsPartitionResponseFW listOffsetsPartitionResponseRO = new ListOffsetsPartitionResponseFW();
    private final UnsafeBuffer keyBuffer = new UnsafeBuffer(BufferUtil.EMPTY_BYTE_ARRAY);
    private final UnsafeBuffer valueBuffer = new UnsafeBuffer(BufferUtil.EMPTY_BYTE_ARRAY);
    private final MessageFW messageRO = new MessageFW();
    private final HeadersFW headersRO = new HeadersFW();
    private final KafkaHeadersIterator headersIterator = new KafkaHeadersIterator();
    private AbstractFetchConnection[] connections = new LiveFetchConnection[0];
    private HistoricalFetchConnection[] historicalConnections = new HistoricalFetchConnection[0];
    private final List<NetworkTopicPartition> partitionsWorkList = new ArrayList();
    private final LongArrayList offsetsWorkList = new LongArrayList();
    private int nestedDoFlushCalls = 0;
    private final Map<String, NetworkTopic> topicsByName = new LinkedHashMap();
    private final Map<String, TopicMetadata> topicMetadataByName = new HashMap();
    private final Map<String, List<ListFW<KafkaHeaderFW>>> routeHeadersByTopic = new HashMap();
    private final Int2ObjectHashMap<Consumer<Long2LongHashMap>> detachersById = new Int2ObjectHashMap<>();
    private final Backoff metadataBackoffMillis = new Backoff(10, 10000);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$AbstractFetchConnection.class */
    public abstract class AbstractFetchConnection extends AbstractNetworkConnection {
        private static final long EARLIEST_AVAILABLE_OFFSET = -2;
        private static final long NEXT_OFFSET = -1;
        final String host;
        final int port;
        final int brokerId;
        int encodeLimit;
        boolean offsetsNeeded;
        boolean offsetsRequested;
        Map<String, long[]> requestedFetchOffsetsByTopic;
        final ResponseDecoder fetchResponseDecoder;
        private final LongSupplier fetches;
        static final /* synthetic */ boolean $assertionsDisabled;

        private AbstractFetchConnection(BrokerMetadata brokerMetadata, LongSupplier longSupplier) {
            super();
            this.requestedFetchOffsetsByTopic = new HashMap();
            this.brokerId = brokerMetadata.nodeId;
            this.host = brokerMetadata.host;
            this.port = brokerMetadata.port;
            this.fetches = longSupplier;
            this.fetchResponseDecoder = new FetchResponseDecoder(this::getTopicDispatcher, this::getRequestedOffset, this::handlePartitionResponseError, this.localDecodeBuffer);
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        void doRequestIfNeeded() {
            if (this.nextRequestId == this.nextResponseId) {
                doBeginIfNotConnected((mutableDirectBuffer, i, i2) -> {
                    return NetworkConnectionPool.this.tcpBeginExRW.wrap2(mutableDirectBuffer, i, i2).localAddress(builder -> {
                        builder.ipv4Address(builder -> {
                            builder.put(NetworkConnectionPool.ANY_IP_ADDR);
                        });
                    }).localPort(0).remoteAddress(builder2 -> {
                        builder2.host(this.host);
                    }).remotePort(this.port).limit();
                });
                if (this.networkRequestBudget > this.networkRequestPadding) {
                    if (this.offsetsNeeded) {
                        doListOffsetsRequest();
                        this.offsetsRequested = true;
                    } else {
                        doFetchRequest();
                        if (this.offsetsNeeded) {
                            doRequestIfNeeded();
                        }
                    }
                }
            }
        }

        /* JADX WARN: Type inference failed for: r0v103, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.TopicRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v16, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.FetchRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v40, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.FetchRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v5, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v52, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v62, types: [org.reaktivity.nukleus.kafka.internal.types.OctetsFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v88, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.TopicRequestFW$Builder] */
        private void doFetchRequest() {
            this.encodeLimit = 0;
            RequestHeaderFW build = NetworkConnectionPool.this.requestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, this.encodeLimit, NetworkConnectionPool.this.encodeBuffer.capacity()).size(0).apiKey((short) 1).apiVersion((short) 5).correlationId(0).clientId((String) null).build();
            this.encodeLimit = build.limit();
            FetchRequestFW build2 = NetworkConnectionPool.this.fetchRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, this.encodeLimit, NetworkConnectionPool.this.encodeBuffer.capacity()).maxWaitTimeMillis(500).minBytes(1).maxBytes(NetworkConnectionPool.this.fetchMaxBytes).isolationLevel((byte) 0).topicCount(0).build();
            this.encodeLimit = build2.limit();
            int i = 0;
            for (String str : NetworkConnectionPool.this.topicsByName.keySet()) {
                int i2 = this.encodeLimit;
                TopicRequestFW build3 = NetworkConnectionPool.this.topicRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, this.encodeLimit, NetworkConnectionPool.this.encodeBuffer.capacity()).name(str).partitionCount(0).build();
                this.encodeLimit = build3.limit();
                long[] computeIfAbsent = this.requestedFetchOffsetsByTopic.computeIfAbsent(str, str2 -> {
                    return new long[((TopicMetadata) NetworkConnectionPool.this.topicMetadataByName.get(str)).partitionCount()];
                });
                int addTopicToRequest = addTopicToRequest(str, (i3, j) -> {
                    computeIfAbsent[i3] = j;
                }, i4 -> {
                    return computeIfAbsent[i4];
                });
                if (addTopicToRequest > 0) {
                    NetworkConnectionPool.this.topicRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build3.offset(), build3.limit()).name(build3.name()).partitionCount(addTopicToRequest).build();
                    i++;
                } else {
                    this.encodeLimit = i2;
                }
            }
            if (i <= 0 || (this.encodeLimit - 0) + this.networkRequestPadding > this.networkRequestBudget) {
                return;
            }
            NetworkConnectionPool.this.fetchRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build2.offset(), build2.limit()).maxWaitTimeMillis(build2.maxWaitTimeMillis()).minBytes(build2.minBytes()).maxBytes(build2.maxBytes()).isolationLevel(build2.isolationLevel()).topicCount(i).build();
            int i5 = this.nextRequestId;
            this.nextRequestId = i5 + 1;
            NetworkConnectionPool.this.requestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build.offset(), build.limit()).size((this.encodeLimit - 0) - 4).apiKey((short) 1).apiVersion((short) 5).correlationId(i5).clientId((String) null).build();
            OctetsFW build4 = NetworkConnectionPool.this.payloadRW.wrap2(NetworkConnectionPool.this.encodeBuffer, 0, this.encodeLimit).set((mutableDirectBuffer, i6, i7) -> {
                return i7 - i6;
            }).build();
            this.fetches.getAsLong();
            NetworkConnectionPool.this.clientStreamFactory.doData(this.networkTarget, this.networkId, this.networkRequestPadding, build4);
            this.networkRequestBudget -= build4.sizeof() + this.networkRequestPadding;
            this.timer.cancel();
            NetworkConnectionPool.this.clientStreamFactory.scheduler.rescheduleTimeout(NetworkConnectionPool.this.readIdleTimeout, this.timer, this::fetchRequestIdle);
        }

        /* JADX WARN: Type inference failed for: r0v104, types: [org.reaktivity.nukleus.kafka.internal.types.codec.offset.ListOffsetsPartitionRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v17, types: [org.reaktivity.nukleus.kafka.internal.types.codec.offset.ListOffsetsRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v39, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v49, types: [org.reaktivity.nukleus.kafka.internal.types.codec.offset.ListOffsetsRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v5, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v56, types: [org.reaktivity.nukleus.kafka.internal.types.OctetsFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v80, types: [org.reaktivity.nukleus.kafka.internal.types.codec.offset.ListOffsetsTopicFW$Builder] */
        private void doListOffsetsRequest() {
            RequestHeaderFW build = NetworkConnectionPool.this.requestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, 0, NetworkConnectionPool.this.encodeBuffer.capacity()).size(0).apiKey((short) 2).apiVersion((short) 2).correlationId(0).clientId((String) null).build();
            ListOffsetsRequestFW build2 = NetworkConnectionPool.this.listOffsetsRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build.limit(), NetworkConnectionPool.this.encodeBuffer.capacity()).isolationLevel(builder -> {
                builder.set(IsolationLevel.READ_UNCOMMITTED);
            }).topicCount(0).build();
            int limit = build2.limit();
            int i = 0;
            for (TopicMetadata topicMetadata : NetworkConnectionPool.this.topicMetadataByName.values()) {
                int offsetsRequired = topicMetadata.offsetsRequired(this.brokerId);
                if (offsetsRequired > 0) {
                    i++;
                    limit = NetworkConnectionPool.this.listOffsetsTopicRW.wrap2(NetworkConnectionPool.this.encodeBuffer, limit, NetworkConnectionPool.this.encodeBuffer.capacity()).name(topicMetadata.topicName).partitionCount(offsetsRequired).build().limit();
                    for (int i2 = 0; i2 < topicMetadata.nodeIdsByPartition.length; i2++) {
                        if (topicMetadata.nodeIdsByPartition[i2] == this.brokerId && topicMetadata.offsetsOutOfRangeByPartition[i2] != -1) {
                            limit = NetworkConnectionPool.this.listOffsetsPartitionRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, limit, NetworkConnectionPool.this.encodeBuffer.capacity()).partitionId(i2).timestamp(topicMetadata.offsetsOutOfRangeByPartition[i2] == NetworkConnectionPool.MAX_OFFSET ? -1L : EARLIEST_AVAILABLE_OFFSET).build().limit();
                        }
                    }
                }
            }
            if ((limit - 0) + this.networkRequestPadding <= this.networkRequestBudget) {
                int i3 = this.nextRequestId;
                this.nextRequestId = i3 + 1;
                NetworkConnectionPool.this.requestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build.offset(), build.limit()).size((limit - 0) - 4).apiKey((short) 2).apiVersion((short) 2).correlationId(i3).clientId((String) null).build();
                NetworkConnectionPool.this.listOffsetsRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build2.offset(), build2.limit()).isolationLevel(builder2 -> {
                    builder2.set(IsolationLevel.READ_UNCOMMITTED);
                }).topicCount(i).build();
                OctetsFW build3 = NetworkConnectionPool.this.payloadRW.wrap2(NetworkConnectionPool.this.encodeBuffer, 0, limit).set((mutableDirectBuffer, i4, i5) -> {
                    return i5 - i4;
                }).build();
                NetworkConnectionPool.this.clientStreamFactory.doData(this.networkTarget, this.networkId, this.networkRequestPadding, build3);
                this.networkRequestBudget -= build3.sizeof() + this.networkRequestPadding;
                this.timer.cancel();
                NetworkConnectionPool.this.clientStreamFactory.scheduler.rescheduleTimeout(NetworkConnectionPool.this.readIdleTimeout, this.timer, this::listOffsetsRequestIdle);
            }
        }

        abstract int addTopicToRequest(String str, IntLongConsumer intLongConsumer, IntToLongFunction intToLongFunction);

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        void handleData(DataFW dataFW) {
            if (this.offsetsRequested) {
                super.handleData(dataFW);
                return;
            }
            this.timer.cancel();
            NetworkConnectionPool.this.clientStreamFactory.scheduler.rescheduleTimeout(NetworkConnectionPool.this.readIdleTimeout, this.timer);
            OctetsFW payload = dataFW.payload();
            this.networkResponseBudget -= payload.sizeof() + dataFW.padding();
            if (this.networkResponseBudget < 0) {
                NetworkConnectionPool.this.clientStreamFactory.doReset(this.networkReplyThrottle, this.networkReplyId);
            }
            int decode = this.fetchResponseDecoder.decode(payload, dataFW.trace());
            doOfferResponseBudget();
            if (decode >= 0) {
                this.timer.cancel();
                if (!$assertionsDisabled && decode != 0) {
                    throw new AssertionError("bytes remaining after fetch response, pipelined requests are not being used");
                }
                this.nextResponseId++;
                doRequestIfNeeded();
            }
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        final void handleResponse(long j, DirectBuffer directBuffer, int i, int i2) {
            if (!$assertionsDisabled && !this.offsetsNeeded) {
                throw new AssertionError();
            }
            handleListOffsetsResponse(j, directBuffer, i, i2);
            this.offsetsNeeded = false;
            this.offsetsRequested = false;
        }

        final void handleListOffsetsResponse(long j, DirectBuffer directBuffer, int i, int i2) {
            this.timer.cancel();
            ListOffsetsResponseFW tryWrap = NetworkConnectionPool.this.listOffsetsResponseRO.tryWrap(directBuffer, i, i2);
            if (tryWrap == null) {
                abort();
                return;
            }
            int i3 = tryWrap.topicCount();
            int limit = tryWrap.limit();
            KafkaError kafkaError = KafkaError.NONE;
            for (int i4 = 0; i4 < i3 && kafkaError == KafkaError.NONE; i4++) {
                ListOffsetsTopicFW tryWrap2 = NetworkConnectionPool.this.listOffsetsTopicRO.tryWrap(directBuffer, limit, i2);
                if (tryWrap2 == null) {
                    abort();
                    return;
                }
                String16FW name = tryWrap2.name();
                String asString = name.asString();
                TopicMetadata topicMetadata = (TopicMetadata) NetworkConnectionPool.this.topicMetadataByName.get(asString);
                limit = tryWrap2.limit();
                int partitionCount = tryWrap2.partitionCount();
                int i5 = 0;
                while (true) {
                    if (i5 < partitionCount && kafkaError == KafkaError.NONE) {
                        ListOffsetsPartitionResponseFW tryWrap3 = NetworkConnectionPool.this.listOffsetsPartitionResponseRO.tryWrap(directBuffer, limit, i2);
                        if (tryWrap3 == null) {
                            abort();
                            return;
                        }
                        kafkaError = KafkaError.asKafkaError(tryWrap3.errorCode());
                        if (kafkaError != KafkaError.NONE) {
                            handlePartitionResponseError(name.asString(), i5, kafkaError);
                            break;
                        }
                        long firstOffset = tryWrap3.firstOffset();
                        int partitionId = tryWrap3.partitionId();
                        long j2 = topicMetadata.offsetsOutOfRangeByPartition[partitionId];
                        if (firstOffset > j2) {
                            topicMetadata.setFirstOffset(partitionId, firstOffset);
                        } else {
                            NetworkTopic networkTopic = (NetworkTopic) NetworkConnectionPool.this.topicsByName.get(asString);
                            if (networkTopic != null) {
                                if (j2 == NetworkConnectionPool.MAX_OFFSET) {
                                    networkTopic.dispatcher.adjustOffset(partitionId, NetworkConnectionPool.MAX_OFFSET, firstOffset);
                                    networkTopic.setLiveOffset(partitionId, firstOffset);
                                } else {
                                    networkTopic.dispatcher.detach(true);
                                }
                                topicMetadata.offsetsOutOfRangeByPartition[partitionId] = -1;
                            }
                        }
                        limit = tryWrap3.limit();
                        i5++;
                    }
                }
            }
        }

        final long getRequestedOffset(String str, int i) {
            return this.requestedFetchOffsetsByTopic.get(str)[i];
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        void doReinitialize() {
            this.fetchResponseDecoder.reinitialize();
            super.doReinitialize();
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        void handleConnectionFailed() {
            invalidateConnectionMetadata();
            removeConnection();
            NetworkConnectionPool.this.metadataConnection.doRequestIfNeeded();
        }

        abstract void removeConnection();

        private void invalidateConnectionMetadata() {
            for (TopicMetadata topicMetadata : NetworkConnectionPool.this.topicMetadataByName.values()) {
                if (topicMetadata.invalidateBroker(this.brokerId)) {
                    topicMetadata.doAttach(NetworkConnectionPool.access$1908(NetworkConnectionPool.this), this::metadataUpdated);
                }
            }
        }

        private void metadataUpdated(TopicMetadata topicMetadata) {
            NetworkConnectionPool.this.doConnections(topicMetadata);
            NetworkConnectionPool.this.doFlush();
        }

        private DecoderMessageDispatcher getTopicDispatcher(String str) {
            NetworkTopic networkTopic = (NetworkTopic) NetworkConnectionPool.this.topicsByName.get(str);
            return networkTopic == null ? NetworkConnectionPool.NOOP_DISPATCHER : networkTopic.dispatcher;
        }

        private void handlePartitionResponseError(String str, int i, KafkaError kafkaError) {
            TopicMetadata topicMetadata = (TopicMetadata) NetworkConnectionPool.this.topicMetadataByName.get(str);
            switch (AnonymousClass3.$SwitchMap$org$reaktivity$nukleus$kafka$internal$stream$KafkaError[kafkaError.ordinal()]) {
                case 2:
                case MessageDispatcher.FLAGS_BLOCKED /* 5 */:
                case 6:
                    if (topicMetadata != null) {
                        topicMetadata.setErrorCode(kafkaError);
                        topicMetadata.scheduleRefresh(NetworkConnectionPool.this.clientStreamFactory.scheduler, NetworkConnectionPool.this.metadataBackoffMillis, NetworkConnectionPool.this.metadataConnection);
                        System.out.format("Fetch failed for topic \"%s\" partition %d due to Kafka error code %s, retrying...\n", str, Integer.valueOf(i), kafkaError);
                        return;
                    }
                    return;
                case 3:
                default:
                    System.out.format("Fetch failed for topic \"%s\" partition %d due to Kafka error code %s, re-establishing connection...\n", str, Integer.valueOf(i), kafkaError);
                    abort();
                    return;
                case 4:
                    if (topicMetadata != null) {
                        this.offsetsNeeded = true;
                        topicMetadata.setOffsetOutOfRange(i, getRequestedOffset(str, i));
                        return;
                    }
                    return;
            }
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        public String toString() {
            return String.format("%s [brokerId=%d, host=%s, port=%d, budget=%d, padding=%d, networkId=%d, networkReplyId=%d,nextRequestId=%d, nextResponseId=%d]", getClass().getSimpleName(), Integer.valueOf(this.brokerId), this.host, Integer.valueOf(this.port), Integer.valueOf(this.networkRequestBudget), Integer.valueOf(this.networkRequestPadding), Long.valueOf(this.networkId), Long.valueOf(this.networkReplyId), Integer.valueOf(this.nextRequestId), Integer.valueOf(this.nextResponseId));
        }

        static {
            $assertionsDisabled = !NetworkConnectionPool.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$AbstractNetworkConnection.class */
    public abstract class AbstractNetworkConnection {
        final MessageConsumer networkTarget;
        TimerWheel.Timer timer;
        long networkId;
        long networkCorrelationId;
        int networkRequestBudget;
        int networkRequestPadding;
        int networkResponseBudget;
        MessageConsumer networkReplyThrottle;
        long networkReplyId;
        private MessageConsumer streamState;
        int networkSlot;
        int networkSlotOffset;
        int nextRequestId;
        int nextResponseId;
        final MutableDirectBuffer localDecodeBuffer;
        static final /* synthetic */ boolean $assertionsDisabled;

        private AbstractNetworkConnection() {
            this.networkSlot = -1;
            this.networkTarget = NetworkConnectionPool.this.clientStreamFactory.router.supplyTarget(NetworkConnectionPool.this.networkName);
            this.localDecodeBuffer = new UnsafeBuffer(ByteBuffer.allocateDirect(NetworkConnectionPool.this.fetchPartitionMaxBytes));
            this.timer = NetworkConnectionPool.this.clientStreamFactory.scheduler.newBlankTimer();
        }

        public String toString() {
            return String.format("%s [budget=%d, padding=%d, networkId=%d, networkReplyId=%d, nextRequestId %d, nextResponseId %d]", getClass().getSimpleName(), Integer.valueOf(this.networkRequestBudget), Integer.valueOf(this.networkRequestPadding), Long.valueOf(this.networkId), Long.valueOf(this.networkReplyId), Integer.valueOf(this.nextRequestId), Integer.valueOf(this.nextResponseId));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public MessageConsumer onCorrelated(MessageConsumer messageConsumer, long j) {
            this.networkReplyThrottle = messageConsumer;
            this.networkReplyId = j;
            this.streamState = this::beforeBegin;
            return this::handleStream;
        }

        void doBeginIfNotConnected() {
            doBeginIfNotConnected((mutableDirectBuffer, i, i2) -> {
                return 0;
            });
        }

        final void doBeginIfNotConnected(Flyweight.Builder.Visitor visitor) {
            if (this.networkId == 0 && this.networkReplyId == 0) {
                long asLong = NetworkConnectionPool.this.clientStreamFactory.supplyStreamId.getAsLong();
                long asLong2 = NetworkConnectionPool.this.clientStreamFactory.supplyCorrelationId.getAsLong();
                NetworkConnectionPool.this.clientStreamFactory.correlations.put(asLong2, this);
                NetworkConnectionPool.this.clientStreamFactory.doBegin(this.networkTarget, asLong, NetworkConnectionPool.this.networkRef, asLong2, visitor);
                NetworkConnectionPool.this.clientStreamFactory.router.setThrottle(NetworkConnectionPool.this.networkName, asLong, this::handleThrottle);
                this.networkId = asLong;
                this.networkCorrelationId = asLong2;
            }
        }

        abstract void doRequestIfNeeded();

        final void metadataRequestIdle() {
            NetworkConnectionPool.this.routeCounters.metadataRequestIdleTimeouts.getAsLong();
            idle();
        }

        final void describeConfigsRequestIdle() {
            NetworkConnectionPool.this.routeCounters.describeConfigsRequestIdleTimeouts.getAsLong();
            idle();
        }

        final void fetchRequestIdle() {
            NetworkConnectionPool.this.routeCounters.fetchRequestIdleTimeouts.getAsLong();
            idle();
        }

        final void listOffsetsRequestIdle() {
            NetworkConnectionPool.this.routeCounters.listOffsetsRequestIdleTimeouts.getAsLong();
            idle();
        }

        final void idle() {
            abort();
            handleConnectionFailed();
        }

        final void abort() {
            if (this.networkId != 0) {
                NetworkConnectionPool.this.clientStreamFactory.doAbort(this.networkTarget, this.networkId);
                this.networkId = 0L;
                this.networkRequestBudget = 0;
            }
        }

        final void close() {
            if (this.networkId != 0) {
                NetworkConnectionPool.this.clientStreamFactory.doEnd(this.networkTarget, this.networkId, null);
                this.networkId = 0L;
                this.streamState = this::afterClose;
            }
        }

        private void handleThrottle(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1073741825:
                    handleReset(NetworkConnectionPool.this.clientStreamFactory.resetRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741826:
                    handleWindow(NetworkConnectionPool.this.windowRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                default:
                    return;
            }
        }

        private void handleWindow(WindowFW windowFW) {
            int credit = windowFW.credit();
            int padding = windowFW.padding();
            this.networkRequestBudget += credit;
            this.networkRequestPadding = padding;
            doRequestIfNeeded();
        }

        private void handleReset(ResetFW resetFW) {
            NetworkConnectionPool.this.clientStreamFactory.correlations.remove(Long.valueOf(this.networkCorrelationId), this);
            if (this.networkReplyId != 0) {
                NetworkConnectionPool.this.clientStreamFactory.doReset(this.networkReplyThrottle, this.networkReplyId);
                this.networkReplyId = 0L;
            }
            handleConnectionFailed();
            this.timer.cancel();
        }

        private void handleStream(int i, DirectBuffer directBuffer, int i2, int i3) {
            this.streamState.accept(i, directBuffer, i2, i3);
        }

        private void beforeBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
            if (i == 1) {
                handleBegin(NetworkConnectionPool.this.clientStreamFactory.beginRO.wrap(directBuffer, i2, i2 + i3));
            } else {
                NetworkConnectionPool.this.clientStreamFactory.doReset(this.networkReplyThrottle, this.networkReplyId);
            }
        }

        private void afterBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 2:
                    handleData(NetworkConnectionPool.this.clientStreamFactory.dataRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 3:
                    handleEnd(NetworkConnectionPool.this.clientStreamFactory.endRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 4:
                    handleAbort(NetworkConnectionPool.this.clientStreamFactory.abortRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                default:
                    NetworkConnectionPool.this.clientStreamFactory.doReset(this.networkReplyThrottle, this.networkReplyId);
                    return;
            }
        }

        private void afterClose(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 2:
                    handleData(NetworkConnectionPool.this.clientStreamFactory.dataRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 3:
                case 4:
                    doReinitialize();
                    return;
                default:
                    NetworkConnectionPool.this.clientStreamFactory.doReset(this.networkReplyThrottle, this.networkReplyId);
                    return;
            }
        }

        private void handleBegin(BeginFW beginFW) {
            doOfferResponseBudget();
            this.streamState = this::afterBegin;
        }

        void handleData(DataFW dataFW) {
            this.timer.cancel();
            NetworkConnectionPool.this.clientStreamFactory.scheduler.rescheduleTimeout(NetworkConnectionPool.this.readIdleTimeout, this.timer);
            OctetsFW payload = dataFW.payload();
            long trace = dataFW.trace();
            this.networkResponseBudget -= payload.sizeof() + dataFW.padding();
            if (this.networkResponseBudget < 0) {
                NetworkConnectionPool.this.clientStreamFactory.doReset(this.networkReplyThrottle, this.networkReplyId);
                return;
            }
            try {
                MutableDirectBuffer buffer = payload.buffer();
                int offset = payload.offset();
                int limit = payload.limit();
                if (this.networkSlot != -1) {
                    MutableDirectBuffer buffer2 = this.networkSlot == NetworkConnectionPool.LOCAL_SLOT ? this.localDecodeBuffer : NetworkConnectionPool.this.bufferPool.buffer(this.networkSlot);
                    int i = limit - offset;
                    buffer2.putBytes(this.networkSlotOffset, buffer, offset, i);
                    this.networkSlotOffset += i;
                    buffer = buffer2;
                    offset = 0;
                    limit = this.networkSlotOffset;
                }
                ResponseHeaderFW tryWrap = NetworkConnectionPool.this.responseRO.tryWrap((DirectBuffer) buffer, offset, limit);
                if (tryWrap != null && tryWrap.limit() + tryWrap.size() <= limit) {
                    int limit2 = tryWrap.limit();
                    handleResponse(trace, buffer, limit2, limit);
                    int size = limit2 + tryWrap.size();
                    this.nextResponseId++;
                    if (size < limit) {
                        if (this.networkSlot == -1) {
                            this.networkSlot = NetworkConnectionPool.this.bufferPool.acquire(this.networkReplyId);
                        }
                        NetworkConnectionPool.this.bufferPool.buffer(this.networkSlot).putBytes(0, buffer, size, limit - size);
                        this.networkSlotOffset = limit - size;
                    } else {
                        if (!$assertionsDisabled && size != limit) {
                            throw new AssertionError();
                        }
                        this.networkSlotOffset = 0;
                        if (this.networkSlot == NetworkConnectionPool.LOCAL_SLOT) {
                            this.networkSlot = -1;
                        }
                    }
                    doOfferResponseBudget();
                    doRequestIfNeeded();
                } else if (this.networkSlot == -1) {
                    MutableDirectBuffer mutableDirectBuffer = this.localDecodeBuffer;
                    this.networkSlotOffset = 0;
                    this.networkSlot = NetworkConnectionPool.LOCAL_SLOT;
                    mutableDirectBuffer.putBytes(this.networkSlotOffset, payload.buffer(), payload.offset(), payload.sizeof());
                    this.networkSlotOffset += payload.sizeof();
                    doOfferResponseBudget();
                }
                if (this.networkSlotOffset != 0 || this.networkSlot == -1) {
                    return;
                }
                this.networkSlot = -1;
            } catch (Throwable th) {
                if (this.networkSlotOffset == 0 && this.networkSlot != -1) {
                    this.networkSlot = -1;
                }
                throw th;
            }
        }

        abstract void handleResponse(long j, DirectBuffer directBuffer, int i, int i2);

        abstract void handleConnectionFailed();

        private void handleEnd(EndFW endFW) {
            if (this.networkId != 0) {
                NetworkConnectionPool.this.clientStreamFactory.doEnd(this.networkTarget, this.networkId, null);
                this.networkId = 0L;
            }
            doReinitialize();
            doRequestIfNeeded();
            this.timer.cancel();
        }

        private void handleAbort(AbortFW abortFW) {
            if (this.networkId != 0) {
                NetworkConnectionPool.this.clientStreamFactory.doAbort(this.networkTarget, this.networkId);
                this.networkId = 0L;
            }
            handleConnectionFailed();
            this.timer.cancel();
        }

        void doOfferResponseBudget() {
            int max = Math.max(this.localDecodeBuffer.capacity() - this.networkResponseBudget, 0);
            if (max > 0) {
                NetworkConnectionPool.this.clientStreamFactory.doWindow(this.networkReplyThrottle, this.networkReplyId, max, 0, 0L);
                this.networkResponseBudget += max;
            }
        }

        void doReinitialize() {
            this.networkSlotOffset = 0;
            this.networkRequestBudget = 0;
            this.networkRequestPadding = 0;
            this.networkResponseBudget = 0;
            this.networkId = 0L;
            this.networkReplyId = 0L;
            this.nextRequestId = 0;
            this.nextResponseId = 0;
        }

        static {
            $assertionsDisabled = !NetworkConnectionPool.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$BrokerMetadata.class */
    public static final class BrokerMetadata {
        final int nodeId;
        final String host;
        final int port;

        BrokerMetadata(int i, String str, int i2) {
            this.nodeId = i;
            this.host = str;
            this.port = i2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$HistoricalFetchConnection.class */
    public final class HistoricalFetchConnection extends AbstractFetchConnection {
        private HistoricalFetchConnection(BrokerMetadata brokerMetadata) {
            super(brokerMetadata, NetworkConnectionPool.this.routeCounters.historicalFetches);
        }

        /* JADX WARN: Type inference failed for: r0v53, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.PartitionRequestFW$Builder] */
        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractFetchConnection
        int addTopicToRequest(String str, IntLongConsumer intLongConsumer, IntToLongFunction intToLongFunction) {
            int i = 0;
            int i2 = this.encodeLimit;
            NetworkTopic networkTopic = (NetworkTopic) NetworkConnectionPool.this.topicsByName.get(str);
            if (networkTopic.needsHistorical()) {
                int maximumWritableBytes = networkTopic.maximumWritableBytes(false);
                TopicMetadata topicMetadata = (TopicMetadata) NetworkConnectionPool.this.topicMetadataByName.get(str);
                if (maximumWritableBytes > 0 && topicMetadata != null) {
                    int[] iArr = topicMetadata.nodeIdsByPartition;
                    int i3 = -1;
                    Iterator<NetworkTopicPartition> it = networkTopic.partitions.iterator();
                    while (it.hasNext()) {
                        NetworkTopicPartition next = it.next();
                        if (networkTopic.needsHistorical(next.id) && i3 < next.id && iArr[next.id] == this.brokerId) {
                            long ensureOffsetInRange = topicMetadata.ensureOffsetInRange(next.id, next.offset);
                            PartitionRequestFW build = NetworkConnectionPool.this.partitionRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, this.encodeLimit, NetworkConnectionPool.this.encodeBuffer.capacity()).partitionId(next.id).fetchOffset(ensureOffsetInRange).maxBytes(maximumWritableBytes).build();
                            if (ensureOffsetInRange < next.offset) {
                                networkTopic.dispatcher.adjustOffset(next.id, next.offset, ensureOffsetInRange);
                                it.remove();
                                next.offset = ensureOffsetInRange;
                                NetworkConnectionPool.this.partitionsWorkList.add(next);
                            }
                            intLongConsumer.accept(next.id, next.offset);
                            this.encodeLimit = build.limit();
                            i3 = next.id;
                            i++;
                        }
                    }
                    if (!NetworkConnectionPool.this.partitionsWorkList.isEmpty()) {
                        networkTopic.partitions.addAll(NetworkConnectionPool.this.partitionsWorkList);
                        NetworkConnectionPool.this.partitionsWorkList.clear();
                    }
                    i = networkTopic.satisfyPartitionRequestsFromCache(i, intToLongFunction, intLongConsumer, NetworkConnectionPool.this.encodeBuffer, i2, this.encodeLimit, i4 -> {
                        this.encodeLimit = i4;
                    });
                }
            }
            return i;
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractFetchConnection
        void removeConnection() {
            NetworkConnectionPool.this.removeConnection(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$KafkaHeadersIterator.class */
    public static final class KafkaHeadersIterator implements Iterator<KafkaHeaderFW> {
        private final KafkaHeaderFW headerRO;
        private final IntArrayList offsets;
        private ListFW<KafkaHeaderFW> headers;
        private int position;

        private KafkaHeadersIterator() {
            this.headerRO = new KafkaHeaderFW();
            this.offsets = new IntArrayList();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Iterator<KafkaHeaderFW> wrap(ListFW<KafkaHeaderFW> listFW) {
            this.headers = listFW;
            this.offsets.clear();
            listFW.forEach(kafkaHeaderFW -> {
                this.offsets.addInt(kafkaHeaderFW.offset());
            });
            this.position = 0;
            return this;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.position < this.offsets.size();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public KafkaHeaderFW next() {
            int i = this.position;
            this.position++;
            return this.headerRO.wrap(this.headers.buffer(), this.offsets.getInt(i), this.headers.limit());
        }
    }

    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$LiveFetchConnection.class */
    private final class LiveFetchConnection extends AbstractFetchConnection {
        static final /* synthetic */ boolean $assertionsDisabled;

        LiveFetchConnection(BrokerMetadata brokerMetadata) {
            super(brokerMetadata, NetworkConnectionPool.NO_COUNTER);
        }

        /* JADX WARN: Type inference failed for: r0v73, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.PartitionRequestFW$Builder] */
        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractFetchConnection
        int addTopicToRequest(String str, IntLongConsumer intLongConsumer, IntToLongFunction intToLongFunction) {
            NetworkTopic networkTopic = (NetworkTopic) NetworkConnectionPool.this.topicsByName.get(str);
            int maximumWritableBytes = networkTopic.maximumWritableBytes(true);
            TopicMetadata topicMetadata = (TopicMetadata) NetworkConnectionPool.this.topicMetadataByName.get(str);
            int i = 0;
            if (maximumWritableBytes > 0 && topicMetadata != null) {
                int[] iArr = topicMetadata.nodeIdsByPartition;
                Iterator<NetworkTopicPartition> it = networkTopic.partitions.iterator();
                NetworkTopicPartition next = it.hasNext() ? it.next() : null;
                while (true) {
                    NetworkTopicPartition networkTopicPartition = next;
                    if (networkTopicPartition == null) {
                        break;
                    }
                    NetworkTopicPartition next2 = it.hasNext() ? it.next() : null;
                    if ((next2 == null || next2.id != networkTopicPartition.id) && iArr[networkTopicPartition.id] == this.brokerId) {
                        if (networkTopicPartition.offset == NetworkConnectionPool.MAX_OFFSET) {
                            this.offsetsNeeded = true;
                            topicMetadata.offsetsOutOfRangeByPartition[networkTopicPartition.id] = Long.MAX_VALUE;
                        } else {
                            long ensureOffsetInRange = topicMetadata.ensureOffsetInRange(networkTopicPartition.id, networkTopicPartition.offset);
                            PartitionRequestFW build = NetworkConnectionPool.this.partitionRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, this.encodeLimit, NetworkConnectionPool.this.encodeBuffer.capacity()).partitionId(networkTopicPartition.id).fetchOffset(ensureOffsetInRange).maxBytes(maximumWritableBytes).build();
                            long j = networkTopicPartition.offset;
                            if (ensureOffsetInRange < networkTopicPartition.offset) {
                                networkTopic.dispatcher.adjustOffset(networkTopicPartition.id, networkTopicPartition.offset, ensureOffsetInRange);
                                j = ensureOffsetInRange;
                                NetworkConnectionPool.this.partitionsWorkList.add(networkTopicPartition.m30clone());
                                NetworkConnectionPool.this.offsetsWorkList.addLong(ensureOffsetInRange);
                            }
                            intLongConsumer.accept(networkTopicPartition.id, j);
                            this.encodeLimit = build.limit();
                            i++;
                        }
                    }
                    next = next2;
                }
                if (!NetworkConnectionPool.this.partitionsWorkList.isEmpty()) {
                    for (int i2 = 0; i2 < NetworkConnectionPool.this.partitionsWorkList.size(); i2++) {
                        NetworkTopicPartition networkTopicPartition2 = (NetworkTopicPartition) NetworkConnectionPool.this.partitionsWorkList.get(i2);
                        boolean remove = networkTopic.partitions.remove(networkTopicPartition2);
                        if (!$assertionsDisabled && !remove) {
                            throw new AssertionError();
                        }
                        networkTopicPartition2.offset = NetworkConnectionPool.this.offsetsWorkList.getLong(i2);
                        networkTopic.partitions.add(networkTopicPartition2);
                    }
                    NetworkConnectionPool.this.partitionsWorkList.clear();
                    NetworkConnectionPool.this.offsetsWorkList.clear();
                }
            }
            return i;
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractFetchConnection
        void removeConnection() {
            NetworkConnectionPool.this.removeConnection(this);
        }

        static {
            $assertionsDisabled = !NetworkConnectionPool.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$MetadataConnection.class */
    public final class MetadataConnection extends AbstractNetworkConnection {
        TopicMetadata pendingTopicMetadata;
        MetadataRequestType pendingRequest;
        static final /* synthetic */ boolean $assertionsDisabled;

        private MetadataConnection() {
            super();
            this.pendingRequest = MetadataRequestType.METADATA;
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        void doRequestIfNeeded() {
            if (this.nextRequestId == this.nextResponseId) {
                if (this.pendingTopicMetadata == null) {
                    Optional findFirst = NetworkConnectionPool.this.topicMetadataByName.values().stream().filter(topicMetadata -> {
                        return topicMetadata.isGetRequired();
                    }).findFirst();
                    if (findFirst.isPresent()) {
                        this.pendingTopicMetadata = (TopicMetadata) findFirst.get();
                    }
                }
                if (this.pendingTopicMetadata != null) {
                    switch (this.pendingTopicMetadata.nextRequiredRequestType()) {
                        case DESCRIBE_CONFIGS:
                            doDescribeConfigsRequest();
                            return;
                        case METADATA:
                            doMetadataRequest();
                            return;
                        default:
                            return;
                    }
                }
            }
        }

        /* JADX WARN: Type inference failed for: r0v11, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v23, types: [org.reaktivity.nukleus.kafka.internal.types.codec.config.DescribeConfigsRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v31, types: [org.reaktivity.nukleus.kafka.internal.types.codec.config.ResourceRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v41, types: [org.reaktivity.nukleus.kafka.internal.types.String16FW$Builder] */
        /* JADX WARN: Type inference failed for: r0v49, types: [org.reaktivity.nukleus.kafka.internal.types.String16FW$Builder] */
        /* JADX WARN: Type inference failed for: r0v62, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v72, types: [org.reaktivity.nukleus.kafka.internal.types.OctetsFW$Builder] */
        private void doDescribeConfigsRequest() {
            doBeginIfNotConnected();
            if (this.networkRequestBudget > this.networkRequestPadding) {
                MutableDirectBuffer mutableDirectBuffer = NetworkConnectionPool.this.encodeBuffer;
                RequestHeaderFW build = NetworkConnectionPool.this.requestRW.wrap2(mutableDirectBuffer, 0, mutableDirectBuffer.capacity()).size(0).apiKey((short) 32).apiVersion((short) 0).correlationId(0).clientId((String) null).build();
                int limit = NetworkConnectionPool.this.configNameRW.wrap2(mutableDirectBuffer, NetworkConnectionPool.this.configNameRW.wrap2(mutableDirectBuffer, NetworkConnectionPool.this.resourceRequestRW.wrap2(mutableDirectBuffer, NetworkConnectionPool.this.describeConfigsRequestRW.wrap2(mutableDirectBuffer, build.limit(), mutableDirectBuffer.capacity()).resourceCount(1).build().limit(), mutableDirectBuffer.capacity()).type((byte) 2).name(this.pendingTopicMetadata.topicName).configNamesCount(2).build().limit(), mutableDirectBuffer.capacity()).set(NetworkConnectionPool.CLEANUP_POLICY, StandardCharsets.UTF_8).build().limit(), mutableDirectBuffer.capacity()).set(NetworkConnectionPool.DELETE_RETENTION_MS, StandardCharsets.UTF_8).build().limit();
                if ((limit - 0) + this.networkRequestPadding <= this.networkRequestBudget) {
                    int i = this.nextRequestId;
                    this.nextRequestId = i + 1;
                    NetworkConnectionPool.this.requestRW.wrap2(mutableDirectBuffer, build.offset(), build.limit()).size((limit - 0) - 4).apiKey((short) 32).apiVersion((short) 0).correlationId(i).clientId((String) null).build();
                    OctetsFW build2 = NetworkConnectionPool.this.payloadRW.wrap2(mutableDirectBuffer, 0, limit).set((mutableDirectBuffer2, i2, i3) -> {
                        return i3 - i2;
                    }).build();
                    NetworkConnectionPool.this.clientStreamFactory.doData(this.networkTarget, this.networkId, this.networkRequestPadding, build2);
                    this.networkRequestBudget -= build2.sizeof() + this.networkRequestPadding;
                    this.pendingRequest = MetadataRequestType.DESCRIBE_CONFIGS;
                    this.timer.cancel();
                    NetworkConnectionPool.this.clientStreamFactory.scheduler.rescheduleTimeout(NetworkConnectionPool.this.readIdleTimeout, this.timer, this::describeConfigsRequestIdle);
                }
            }
        }

        /* JADX WARN: Type inference failed for: r0v20, types: [org.reaktivity.nukleus.kafka.internal.types.codec.metadata.MetadataRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v34, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v44, types: [org.reaktivity.nukleus.kafka.internal.types.OctetsFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v8, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        private void doMetadataRequest() {
            doBeginIfNotConnected();
            if (this.networkRequestBudget > this.networkRequestPadding) {
                RequestHeaderFW build = NetworkConnectionPool.this.requestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, 0, NetworkConnectionPool.this.encodeBuffer.capacity()).size(0).apiKey((short) 3).apiVersion((short) 5).correlationId(0).clientId((String) null).build();
                int limit = NetworkConnectionPool.this.metadataRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build.limit(), NetworkConnectionPool.this.encodeBuffer.capacity()).topicCount(1).topicName(this.pendingTopicMetadata.topicName).build().limit();
                if ((limit - 0) + this.networkRequestPadding <= this.networkRequestBudget) {
                    int i = this.nextRequestId;
                    this.nextRequestId = i + 1;
                    NetworkConnectionPool.this.requestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build.offset(), build.limit()).size((limit - 0) - 4).apiKey((short) 3).apiVersion((short) 5).correlationId(i).clientId((String) null).build();
                    OctetsFW build2 = NetworkConnectionPool.this.payloadRW.wrap2(NetworkConnectionPool.this.encodeBuffer, 0, limit).set((mutableDirectBuffer, i2, i3) -> {
                        return i3 - i2;
                    }).build();
                    NetworkConnectionPool.this.clientStreamFactory.doData(this.networkTarget, this.networkId, this.networkRequestPadding, build2);
                    this.networkRequestBudget -= build2.sizeof() + this.networkRequestPadding;
                    this.pendingRequest = MetadataRequestType.METADATA;
                    this.timer.cancel();
                    NetworkConnectionPool.this.clientStreamFactory.scheduler.rescheduleTimeout(NetworkConnectionPool.this.readIdleTimeout, this.timer, this::metadataRequestIdle);
                }
            }
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        void handleResponse(long j, DirectBuffer directBuffer, int i, int i2) {
            switch (this.pendingRequest) {
                case DESCRIBE_CONFIGS:
                    handleDescribeConfigsResponse(j, directBuffer, i, i2);
                    return;
                case METADATA:
                    handleMetadataResponse(j, directBuffer, i, i2);
                    return;
                default:
                    return;
            }
        }

        /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
        /* JADX WARN: Code restructure failed: missing block: B:56:0x013b, code lost:
        
            switch(r27) {
                case 0: goto L40;
                case 1: goto L47;
                default: goto L52;
            };
         */
        /* JADX WARN: Code restructure failed: missing block: B:58:0x0159, code lost:
        
            if (r0.value() == null) goto L45;
         */
        /* JADX WARN: Code restructure failed: missing block: B:60:0x0169, code lost:
        
            if (r0.value().asString().equals(org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.COMPACT) == false) goto L45;
         */
        /* JADX WARN: Code restructure failed: missing block: B:61:0x016c, code lost:
        
            r0 = true;
         */
        /* JADX WARN: Code restructure failed: missing block: B:62:0x0171, code lost:
        
            r16 = r0;
            r12 = r0.limit();
         */
        /* JADX WARN: Code restructure failed: missing block: B:64:0x01b6, code lost:
        
            r23 = r23 + 1;
         */
        /* JADX WARN: Code restructure failed: missing block: B:65:0x0170, code lost:
        
            r0 = false;
         */
        /* JADX WARN: Code restructure failed: missing block: B:67:0x0182, code lost:
        
            if (r0.value() != null) goto L50;
         */
        /* JADX WARN: Code restructure failed: missing block: B:68:0x0185, code lost:
        
            r0 = org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.KAFKA_SERVER_DEFAULT_DELETE_RETENTION_MS;
         */
        /* JADX WARN: Code restructure failed: missing block: B:69:0x0195, code lost:
        
            r17 = r0;
         */
        /* JADX WARN: Code restructure failed: missing block: B:71:0x018a, code lost:
        
            r0 = java.lang.Integer.parseInt(r0.value().asString());
         */
        /* JADX WARN: Code restructure failed: missing block: B:73:0x019d, code lost:
        
            if (org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.MetadataConnection.$assertionsDisabled != false) goto L83;
         */
        /* JADX WARN: Code restructure failed: missing block: B:76:0x01b5, code lost:
        
            throw new java.lang.AssertionError(java.lang.String.format("Unexpected config name %s in describe configs response", r0));
         */
        /* JADX WARN: Removed duplicated region for block: B:47:0x00ea  */
        /* JADX WARN: Removed duplicated region for block: B:78:0x00e2 A[SYNTHETIC] */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        private void handleDescribeConfigsResponse(long r9, org.agrona.DirectBuffer r11, int r12, int r13) {
            /*
                Method dump skipped, instructions count: 640
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.MetadataConnection.handleDescribeConfigsResponse(long, org.agrona.DirectBuffer, int, int):void");
        }

        private void handleMetadataResponse(long j, DirectBuffer directBuffer, int i, int i2) {
            this.timer.cancel();
            KafkaError decodeMetadataResponse = decodeMetadataResponse(directBuffer, i, i2);
            TopicMetadata topicMetadata = this.pendingTopicMetadata;
            switch (AnonymousClass3.$SwitchMap$org$reaktivity$nukleus$kafka$internal$stream$KafkaError[decodeMetadataResponse.ordinal()]) {
                case 1:
                case 8:
                    System.out.format("Unable to access metadata for topic \"%s\" due to Kafka error code %s, detaching subscribers.\n", topicMetadata.topicName, decodeMetadataResponse);
                    String str = topicMetadata.topicName;
                    detachSubscribers(str, false);
                    topicMetadata.setErrorCode(decodeMetadataResponse);
                    topicMetadata.flush();
                    NetworkConnectionPool.this.topicsByName.remove(str);
                    NetworkConnectionPool.this.topicMetadataByName.remove(str);
                    this.pendingTopicMetadata = null;
                    return;
                case 2:
                case MessageDispatcher.FLAGS_BLOCKED /* 5 */:
                case 7:
                    if (topicMetadata.hasConsumers()) {
                        if (topicMetadata.retries == 0) {
                            System.out.format("Unable to access metadata for topic \"%s\" due to Kafka error code %s, retrying...\n", topicMetadata.topicName, decodeMetadataResponse);
                        }
                        topicMetadata.scheduleRefresh(NetworkConnectionPool.this.clientStreamFactory.scheduler, NetworkConnectionPool.this.metadataBackoffMillis, NetworkConnectionPool.this.metadataConnection);
                    }
                    this.pendingTopicMetadata = null;
                    return;
                case 3:
                    topicMetadata.nextRequiredRequestType = MetadataRequestType.DESCRIBE_CONFIGS;
                    if (topicMetadata.retries > 0) {
                        System.out.format("Metadata for topic \"%s\" has now been found\n", topicMetadata.topicName);
                        return;
                    }
                    return;
                case 4:
                case 6:
                default:
                    topicMetadata.invalidate();
                    abort();
                    return;
            }
        }

        private KafkaError decodeMetadataResponse(DirectBuffer directBuffer, int i, int i2) {
            MetadataResponseFW tryWrap = NetworkConnectionPool.this.metadataResponseRO.tryWrap(directBuffer, i, i2);
            if (tryWrap == null) {
                return KafkaError.UNEXPECTED_SERVER_ERROR;
            }
            int brokerCount = tryWrap.brokerCount();
            this.pendingTopicMetadata.initializeBrokers(brokerCount);
            int limit = tryWrap.limit();
            KafkaError kafkaError = KafkaError.NONE;
            for (int i3 = 0; i3 < brokerCount; i3++) {
                BrokerMetadataFW tryWrap2 = NetworkConnectionPool.this.brokerMetadataRO.tryWrap(directBuffer, limit, i2);
                if (tryWrap2 == null) {
                    return KafkaError.UNEXPECTED_SERVER_ERROR;
                }
                this.pendingTopicMetadata.addBroker(tryWrap2.nodeId(), tryWrap2.host().asString(), tryWrap2.port());
                limit = tryWrap2.limit();
            }
            MetadataResponsePart2FW tryWrap3 = NetworkConnectionPool.this.metadataResponsePart2RO.tryWrap(directBuffer, limit, i2);
            if (tryWrap3 == null) {
                return KafkaError.UNEXPECTED_SERVER_ERROR;
            }
            int i4 = tryWrap3.topicCount();
            if (!$assertionsDisabled && i4 != 1) {
                throw new AssertionError();
            }
            int limit2 = tryWrap3.limit();
            for (int i5 = 0; i5 < i4 && kafkaError == KafkaError.NONE; i5++) {
                TopicMetadataFW tryWrap4 = NetworkConnectionPool.this.topicMetadataRO.tryWrap(directBuffer, limit2, i2);
                if (tryWrap4 == null) {
                    return KafkaError.UNEXPECTED_SERVER_ERROR;
                }
                String16FW string16FW = tryWrap4.topic();
                if (!$assertionsDisabled && !string16FW.asString().equals(this.pendingTopicMetadata.topicName)) {
                    throw new AssertionError();
                }
                kafkaError = KafkaError.asKafkaError(tryWrap4.errorCode());
                if (kafkaError != KafkaError.NONE) {
                    break;
                }
                int partitionCount = tryWrap4.partitionCount();
                limit2 = tryWrap4.limit();
                if (this.pendingTopicMetadata.initializePartitions(partitionCount)) {
                    for (int i6 = 0; i6 < partitionCount && kafkaError == KafkaError.NONE; i6++) {
                        PartitionMetadataFW tryWrap5 = NetworkConnectionPool.this.partitionMetadataRO.tryWrap(directBuffer, limit2, i2);
                        if (tryWrap5 == null) {
                            return KafkaError.UNEXPECTED_SERVER_ERROR;
                        }
                        kafkaError = KafkaError.asKafkaError(tryWrap5.errorCode());
                        this.pendingTopicMetadata.addPartition(tryWrap5.partitionId(), tryWrap5.leader());
                        limit2 = tryWrap5.limit();
                    }
                } else {
                    kafkaError = KafkaError.PARTITION_COUNT_CHANGED;
                }
            }
            return kafkaError;
        }

        private void detachSubscribers(String str, boolean z) {
            NetworkTopic networkTopic = (NetworkTopic) NetworkConnectionPool.this.topicsByName.get(str);
            if (networkTopic != null) {
                networkTopic.dispatcher.detach(z);
            }
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        void handleConnectionFailed() {
            doReinitialize();
            doRequestIfNeeded();
        }

        static {
            $assertionsDisabled = !NetworkConnectionPool.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$MetadataRequestType.class */
    public enum MetadataRequestType {
        METADATA,
        DESCRIBE_CONFIGS
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$NetworkTopic.class */
    public final class NetworkTopic {
        private final String topicName;
        private final boolean compacted;
        private final Set<IntSupplier> windowSuppliers;
        final NavigableSet<NetworkTopicPartition> partitions;
        private final NetworkTopicPartition candidate;
        private final TopicMessageDispatcher dispatcher;
        private final PartitionProgressHandler progressHandler;
        private BitSet needsHistoricalByPartition;
        private BitSet isLiveByPartition;
        private final boolean proactive;
        static final /* synthetic */ boolean $assertionsDisabled;

        public String toString() {
            return String.format("topicName=%s, partitions=%s, needsHistoricalByPartition=%s, isLiveByPartition=%s", this.topicName, this.partitions, this.needsHistoricalByPartition, this.isLiveByPartition);
        }

        private NetworkTopic(String str, int i, boolean z, boolean z2, int i2, boolean z3) {
            this.needsHistoricalByPartition = new BitSet();
            this.isLiveByPartition = new BitSet();
            this.topicName = str;
            this.compacted = z;
            this.proactive = z2;
            this.windowSuppliers = new HashSet();
            this.partitions = new TreeSet();
            this.candidate = new NetworkTopicPartition();
            this.progressHandler = this::handleProgress;
            PartitionIndex[] partitionIndexArr = new PartitionIndex[i];
            if (z) {
                for (int i3 = 0; i3 < i; i3++) {
                    partitionIndexArr[i3] = new CompactedPartitionIndex(1000, i2, NetworkConnectionPool.this.messageCache);
                }
            } else {
                for (int i4 = 0; i4 < i; i4++) {
                    partitionIndexArr[i4] = NetworkConnectionPool.DEFAULT_PARTITION_INDEX;
                }
            }
            this.dispatcher = new TopicMessageDispatcher(partitionIndexArr, z ? CompactedHeaderValueMessageDispatcher::new : HeaderValueMessageDispatcher::new);
            List list = (List) NetworkConnectionPool.this.routeHeadersByTopic.remove(str);
            if (list != null) {
                list.forEach(this::addRoute);
            }
            if (z2) {
                for (int i5 = 0; i5 < i; i5++) {
                    attachToPartition(i5, 0L, 1);
                }
                this.dispatcher.add(null, -1, Collections.emptyIterator(), new ProgressUpdatingMessageDispatcher(i, this.progressHandler));
            }
        }

        void addRoute(ListFW<KafkaHeaderFW> listFW) {
            if (listFW != null && !listFW.isEmpty()) {
                this.dispatcher.add(null, -1, NetworkConnectionPool.this.headersIterator.wrap(listFW), NetworkConnectionPool.MATCHING_MESSAGE_DISPATCHER);
                this.dispatcher.enableProactiveMessageCaching();
            } else {
                this.dispatcher.add(null, -1, Collections.emptyIterator(), NetworkConnectionPool.MATCHING_MESSAGE_DISPATCHER);
                if (NetworkConnectionPool.this.forceProactiveMessageCache) {
                    this.dispatcher.enableProactiveMessageCaching();
                }
            }
        }

        PartitionProgressHandler doAttach(Long2LongHashMap long2LongHashMap, OctetsFW octetsFW, ListFW<KafkaHeaderFW> listFW, MessageDispatcher messageDispatcher, IntSupplier intSupplier) {
            this.windowSuppliers.add(intSupplier);
            NetworkConnectionPool.this.headersIterator.wrap(listFW);
            if (octetsFW == null) {
                this.dispatcher.add(null, -1, NetworkConnectionPool.this.headersIterator, messageDispatcher);
                Long2LongHashMap.LongIterator it = long2LongHashMap.keySet().iterator();
                while (it.hasNext()) {
                    int nextValue = (int) it.nextValue();
                    long j = long2LongHashMap.get(nextValue);
                    long offset = this.dispatcher.entries(nextValue, j).next().offset();
                    if (offset > j) {
                        long2LongHashMap.put(nextValue, offset);
                        j = offset;
                    }
                    attachToPartition(nextValue, j, 1);
                }
            } else {
                int intValue = long2LongHashMap.keySet().iterator().next().intValue();
                this.dispatcher.add(octetsFW, intValue, NetworkConnectionPool.this.headersIterator, messageDispatcher);
                long j2 = long2LongHashMap.get(intValue);
                if (this.compacted) {
                    long offset2 = this.dispatcher.getEntry(intValue, j2, octetsFW).offset();
                    if (offset2 > j2) {
                        long2LongHashMap.put(intValue, offset2);
                        j2 = offset2;
                    }
                }
                attachToPartition(intValue, j2, 1);
            }
            return this.progressHandler;
        }

        private void attachToPartition(int i, long j, int i2) {
            boolean z;
            this.candidate.id = i;
            this.candidate.offset = j;
            NetworkTopicPartition floor = this.partitions.floor(this.candidate);
            if (j == NetworkConnectionPool.MAX_OFFSET && floor != null && floor.id == this.candidate.id && this.isLiveByPartition.get(i)) {
                this.candidate.offset = floor.offset;
                this.dispatcher.adjustOffset(i, j, floor.offset);
            }
            if (!this.candidate.equals(floor)) {
                if (floor == null || floor.id != this.candidate.id) {
                    NetworkTopicPartition ceiling = this.partitions.ceiling(this.candidate);
                    z = ceiling != null && ceiling.id == this.candidate.id;
                } else {
                    z = true;
                }
                this.needsHistoricalByPartition.set(this.candidate.id, z);
                floor = new NetworkTopicPartition();
                floor.id = this.candidate.id;
                floor.offset = this.candidate.offset;
                add(floor);
                if (j == NetworkConnectionPool.MAX_OFFSET) {
                    this.isLiveByPartition.set(i);
                }
            }
            NetworkTopicPartition.access$3712(floor, i2);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void doDetach(Long2LongHashMap long2LongHashMap, OctetsFW octetsFW, ListFW<KafkaHeaderFW> listFW, MessageDispatcher messageDispatcher, IntSupplier intSupplier) {
            this.windowSuppliers.remove(intSupplier);
            int longValue = (int) (octetsFW == null ? -1L : long2LongHashMap.keySet().iterator().next().longValue());
            NetworkConnectionPool.this.headersIterator.wrap(listFW);
            this.dispatcher.remove(octetsFW, longValue, NetworkConnectionPool.this.headersIterator, messageDispatcher);
            Long2LongHashMap.LongIterator it = long2LongHashMap.keySet().iterator();
            while (it.hasNext()) {
                long nextValue = it.nextValue();
                doDetach((int) nextValue, long2LongHashMap.get(nextValue), intSupplier);
            }
            if (!this.partitions.isEmpty() || this.compacted) {
                return;
            }
            NetworkConnectionPool.this.topicsByName.remove(this.topicName);
            TopicMetadata topicMetadata = (TopicMetadata) NetworkConnectionPool.this.topicMetadataByName.get(this.topicName);
            if (topicMetadata != null && topicMetadata.isComplete() && topicMetadata.consumers.isEmpty()) {
                NetworkConnectionPool.this.topicMetadataByName.remove(this.topicName);
            }
        }

        void doDetach(int i, long j, IntSupplier intSupplier) {
            NetworkTopicPartition floor;
            this.windowSuppliers.remove(intSupplier);
            this.candidate.id = i;
            this.candidate.offset = j;
            NetworkTopicPartition floor2 = this.partitions.floor(this.candidate);
            if (floor2 == null || floor2.id != this.candidate.id || floor2.offset != this.candidate.offset) {
                throw new IllegalStateException(String.format("floor gave %s, expected (id=%d, offset=%d); topic=%s", floor2, Integer.valueOf(i), Long.valueOf(j), this));
            }
            NetworkTopicPartition.access$3710(floor2);
            if (floor2.refs == 0) {
                remove(floor2);
                if (!this.isLiveByPartition.get(i) || (floor = this.partitions.floor(this.candidate)) == null || floor.id != i || floor.offset >= j) {
                    return;
                }
                this.isLiveByPartition.clear(i);
            }
        }

        int maximumWritableBytes(boolean z) {
            int i = 0;
            if (z && this.proactive) {
                i = NetworkConnectionPool.this.fetchPartitionMaxBytes;
            } else {
                Iterator<IntSupplier> it = this.windowSuppliers.iterator();
                while (it.hasNext()) {
                    i = Math.max(i, it.next().getAsInt());
                }
            }
            return i;
        }

        /* JADX WARN: Type inference failed for: r0v47, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.PartitionRequestFW$Builder] */
        int satisfyPartitionRequestsFromCache(int i, IntToLongFunction intToLongFunction, IntLongConsumer intLongConsumer, MutableDirectBuffer mutableDirectBuffer, int i2, int i3, IntConsumer intConsumer) {
            int i4 = i;
            int i5 = i3;
            for (int i6 = 0; i6 < i; i6++) {
                PartitionRequestFW wrap = NetworkConnectionPool.this.partitionRequestRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i3);
                if (!$assertionsDisabled && wrap.limit() > i3) {
                    throw new AssertionError();
                }
                int partitionId = wrap.partitionId();
                long fetchOffset = wrap.fetchOffset();
                long logStartOffset = wrap.logStartOffset();
                int maxBytes = wrap.maxBytes();
                long j = fetchOffset;
                Iterator<PartitionIndex.Entry> entries = this.dispatcher.entries(partitionId, fetchOffset);
                boolean z = false;
                boolean z2 = false;
                long applyAsLong = intToLongFunction.applyAsLong(partitionId);
                while (true) {
                    if (!entries.hasNext()) {
                        break;
                    }
                    PartitionIndex.Entry next = entries.next();
                    j = next.offset();
                    MessageFW messageFW = NetworkConnectionPool.this.messageCache.get(next.message(), NetworkConnectionPool.this.messageRO);
                    if (messageFW == null) {
                        z = true;
                        break;
                    }
                    int dispatch = this.dispatcher.dispatch(partitionId, applyAsLong, j, wrap(NetworkConnectionPool.this.keyBuffer, messageFW.key()), NetworkConnectionPool.this.headersRO.wrap(messageFW.headers().buffer(), messageFW.headers().offset(), messageFW.headers().limit()).headerSupplier(), messageFW.timestamp(), messageFW.traceId(), wrap(NetworkConnectionPool.this.valueBuffer, messageFW.value()));
                    z2 = true;
                    j++;
                    if (MessageDispatcher.blocked(dispatch) && !MessageDispatcher.delivered(dispatch)) {
                        break;
                    }
                }
                if (z) {
                    NetworkConnectionPool.this.clientStreamFactory.counters.cacheMisses.getAsLong();
                    if (j > fetchOffset) {
                        wrap = NetworkConnectionPool.this.partitionRequestRW.wrap2(mutableDirectBuffer, i2, i5).partitionId(partitionId).fetchOffset(j).logStartOffset(logStartOffset).maxBytes(maxBytes).build();
                    }
                    i2 = wrap.limit();
                } else {
                    NetworkConnectionPool.this.clientStreamFactory.counters.cacheHits.getAsLong();
                    if (!entries.hasNext()) {
                        j = this.dispatcher.nextOffset(partitionId);
                    }
                    mutableDirectBuffer.putBytes(i2, mutableDirectBuffer, wrap.limit(), i3);
                    i5 -= wrap.sizeof();
                    i4--;
                }
                if (z2 || j > fetchOffset) {
                    this.dispatcher.flush(partitionId, applyAsLong, j);
                    intLongConsumer.accept(partitionId, j);
                }
            }
            if (!$assertionsDisabled && i3 > i3) {
                throw new AssertionError();
            }
            if (i5 < i3) {
                intConsumer.accept(i5);
            }
            return i4;
        }

        private void handleProgress(int i, long j, long j2) {
            this.candidate.id = i;
            this.candidate.offset = j;
            NetworkTopicPartition floor = this.partitions.floor(this.candidate);
            if (floor == null || floor.id != i || floor.offset != j) {
                throw new IllegalStateException(String.format("floor gave %s, expected (id=%d, offset=%d); nextOffset = %d, topic=%s", floor, Integer.valueOf(i), Long.valueOf(j), Long.valueOf(j2), this));
            }
            NetworkTopicPartition.access$3710(floor);
            this.candidate.offset = j2;
            NetworkTopicPartition floor2 = this.partitions.floor(this.candidate);
            if (floor2 == null || floor2.offset != j2) {
                if (floor2 != null && (floor2 != floor || floor.refs > 0)) {
                    this.needsHistoricalByPartition.set(i);
                }
                floor2 = new NetworkTopicPartition();
                floor2.id = i;
                floor2.offset = j2;
                add(floor2);
            }
            NetworkTopicPartition.access$3708(floor2);
            if (floor.refs == 0) {
                remove(floor);
            }
        }

        private void add(NetworkTopicPartition networkTopicPartition) {
            this.partitions.add(networkTopicPartition);
        }

        private void remove(NetworkTopicPartition networkTopicPartition) {
            this.partitions.remove(networkTopicPartition);
            boolean z = false;
            networkTopicPartition.offset = 0L;
            NetworkTopicPartition ceiling = this.partitions.ceiling(networkTopicPartition);
            if (ceiling != null && ceiling.id == networkTopicPartition.id) {
                networkTopicPartition.offset = NetworkConnectionPool.MAX_OFFSET;
                if (this.partitions.floor(networkTopicPartition) != ceiling) {
                    z = true;
                }
            }
            this.needsHistoricalByPartition.set(networkTopicPartition.id, z);
        }

        boolean needsHistorical() {
            return this.needsHistoricalByPartition.nextSetBit(0) != -1;
        }

        boolean needsHistorical(int i) {
            return this.needsHistoricalByPartition.get(i);
        }

        void setLiveOffset(int i, long j) {
            if (!$assertionsDisabled && !this.isLiveByPartition.get(i)) {
                throw new AssertionError();
            }
            this.candidate.id = i;
            this.candidate.offset = NetworkConnectionPool.MAX_OFFSET;
            NetworkTopicPartition floor = this.partitions.floor(this.candidate);
            if (!$assertionsDisabled && floor.id != i) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && floor.offset != NetworkConnectionPool.MAX_OFFSET) {
                throw new AssertionError();
            }
            this.partitions.remove(floor);
            NetworkTopicPartition floor2 = this.partitions.floor(this.candidate);
            if (floor2 == null || floor2.id != i || floor2.offset != j) {
                NetworkTopicPartition m30clone = floor.m30clone();
                m30clone.offset = j;
                this.partitions.add(m30clone);
                this.needsHistoricalByPartition.set(m30clone.id, true);
                return;
            }
            NetworkTopicPartition.access$3708(floor2);
            NetworkTopicPartition floor3 = this.partitions.floor(floor2);
            if (floor3.id != i || floor3.offset == j) {
                return;
            }
            this.needsHistoricalByPartition.set(i, true);
        }

        DirectBuffer wrap(MutableDirectBuffer mutableDirectBuffer, Flyweight flyweight) {
            MutableDirectBuffer mutableDirectBuffer2 = null;
            if (flyweight != null) {
                mutableDirectBuffer.wrap(flyweight.buffer(), flyweight.offset(), flyweight.sizeof());
                mutableDirectBuffer2 = mutableDirectBuffer;
            }
            return mutableDirectBuffer2;
        }

        static {
            $assertionsDisabled = !NetworkConnectionPool.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$NetworkTopicPartition.class */
    public static final class NetworkTopicPartition implements Comparable<NetworkTopicPartition> {
        private static final NetworkTopicPartition NONE;
        int id;
        long offset;
        private int refs;
        static final /* synthetic */ boolean $assertionsDisabled;

        NetworkTopicPartition() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: clone, reason: merged with bridge method [inline-methods] */
        public NetworkTopicPartition m30clone() {
            NetworkTopicPartition networkTopicPartition = new NetworkTopicPartition();
            networkTopicPartition.id = this.id;
            networkTopicPartition.offset = this.offset;
            networkTopicPartition.refs = this.refs;
            return networkTopicPartition;
        }

        @Override // java.lang.Comparable
        public int compareTo(NetworkTopicPartition networkTopicPartition) {
            int compare = Integer.compare(this.id, networkTopicPartition.id);
            if (compare == 0) {
                compare = Long.compare(this.offset, networkTopicPartition.offset);
            }
            if ($assertionsDisabled || compareToResponseValid(networkTopicPartition, compare)) {
                return compare;
            }
            throw new AssertionError(String.format("compareTo response %d invalid for this=%s, that=%s", Integer.valueOf(compare), this, networkTopicPartition));
        }

        private boolean compareToResponseValid(NetworkTopicPartition networkTopicPartition, int i) {
            return (i == 0 && this.id - networkTopicPartition.id == 0 && this.offset - networkTopicPartition.offset == 0) || (i < 0 && this.id < networkTopicPartition.id) || this.offset < networkTopicPartition.offset || ((i > 0 && this.id > networkTopicPartition.id) || this.offset > networkTopicPartition.offset);
        }

        public boolean equals(Object obj) {
            boolean z = false;
            if (this == obj) {
                z = true;
            } else if (obj != null && obj.getClass() == NetworkTopicPartition.class) {
                NetworkTopicPartition networkTopicPartition = (NetworkTopicPartition) obj;
                z = this.id == networkTopicPartition.id && this.offset == networkTopicPartition.offset;
            }
            return z;
        }

        public int hashCode() {
            return (31 * this.id) + Long.hashCode(this.offset);
        }

        public String toString() {
            return String.format("(id=%d, offset=%d, refs=%d)", Integer.valueOf(this.id), Long.valueOf(this.offset), Integer.valueOf(this.refs));
        }

        static /* synthetic */ int access$3712(NetworkTopicPartition networkTopicPartition, int i) {
            int i2 = networkTopicPartition.refs + i;
            networkTopicPartition.refs = i2;
            return i2;
        }

        static /* synthetic */ int access$3710(NetworkTopicPartition networkTopicPartition) {
            int i = networkTopicPartition.refs;
            networkTopicPartition.refs = i - 1;
            return i;
        }

        static /* synthetic */ int access$3708(NetworkTopicPartition networkTopicPartition) {
            int i = networkTopicPartition.refs;
            networkTopicPartition.refs = i + 1;
            return i;
        }

        static {
            $assertionsDisabled = !NetworkConnectionPool.class.desiredAssertionStatus();
            NONE = new NetworkTopicPartition();
            NONE.id = -1;
        }
    }

    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$ProgressUpdatingMessageDispatcher.class */
    private static final class ProgressUpdatingMessageDispatcher implements MessageDispatcher {
        private final long[] offsets;
        private final PartitionProgressHandler progressHandler;

        ProgressUpdatingMessageDispatcher(int i, PartitionProgressHandler partitionProgressHandler) {
            this.offsets = new long[i];
            this.progressHandler = partitionProgressHandler;
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
        public void adjustOffset(int i, long j, long j2) {
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
        public void detach(boolean z) {
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
        public int dispatch(int i, long j, long j2, DirectBuffer directBuffer, Function<DirectBuffer, Iterator<DirectBuffer>> function, long j3, long j4, DirectBuffer directBuffer2) {
            return 0;
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
        public void flush(int i, long j, long j2) {
            if (j2 > this.offsets[i]) {
                this.progressHandler.handle(i, this.offsets[i], j2);
                this.offsets[i] = j2;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$TopicMetadata.class */
    public static final class TopicMetadata {
        private static final int UNKNOWN_BROKER = -1;
        private final String topicName;
        private boolean compacted;
        private int deleteRetentionMs;
        BrokerMetadata[] brokers;
        private int nextBrokerIndex;
        private int[] nodeIdsByPartition;
        private long[] firstOffsetsByPartition;
        private long[] offsetsOutOfRangeByPartition;
        private int retries;
        private TimerWheel.Timer retryTimer;
        private KafkaError errorCode = KafkaError.NONE;
        private State state = State.GET_REQUIRED;
        private Int2ObjectHashMap<Consumer<TopicMetadata>> consumers = new Int2ObjectHashMap<>();
        private MetadataRequestType nextRequiredRequestType = MetadataRequestType.METADATA;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$TopicMetadata$State.class */
        public enum State {
            GET_REQUIRED,
            GET_SCHEDULED,
            COMPLETE
        }

        TopicMetadata(String str) {
            this.topicName = str;
        }

        boolean isComplete() {
            return this.state == State.COMPLETE;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean isGetRequired() {
            return this.state == State.GET_REQUIRED;
        }

        KafkaError setComplete(KafkaError kafkaError) {
            this.state = State.COMPLETE;
            KafkaError kafkaError2 = this.errorCode;
            this.errorCode = kafkaError;
            if (this.retryTimer != null) {
                this.retryTimer.cancel();
            }
            return kafkaError2;
        }

        void scheduleRefresh(DelayedTaskScheduler delayedTaskScheduler, Backoff backoff, MetadataConnection metadataConnection) {
            if (isComplete()) {
                invalidate();
                this.retries = 0;
            }
            this.state = State.GET_SCHEDULED;
            TimerWheel.Timer orCreateTimer = getOrCreateTimer(delayedTaskScheduler);
            orCreateTimer.cancel();
            delayedTaskScheduler.rescheduleTimeout(backoff.next(this.retries), orCreateTimer, () -> {
                doRefresh(metadataConnection);
            });
        }

        void doRefresh(MetadataConnection metadataConnection) {
            this.state = State.GET_REQUIRED;
            this.retries++;
            metadataConnection.doRequestIfNeeded();
        }

        MetadataRequestType nextRequiredRequestType() {
            return this.nextRequiredRequestType;
        }

        void setCompacted(boolean z) {
            this.compacted = z;
        }

        void setDeleteRetentionMs(int i) {
            this.deleteRetentionMs = i;
        }

        void setErrorCode(KafkaError kafkaError) {
            this.errorCode = kafkaError;
        }

        void initializeBrokers(int i) {
            this.brokers = (this.brokers == null || this.brokers.length != i) ? new BrokerMetadata[i] : this.brokers;
            this.nextBrokerIndex = 0;
        }

        void addBroker(int i, String str, int i2) {
            BrokerMetadata[] brokerMetadataArr = this.brokers;
            int i3 = this.nextBrokerIndex;
            this.nextBrokerIndex = i3 + 1;
            brokerMetadataArr[i3] = new BrokerMetadata(i, str, i2);
        }

        boolean invalidateBroker(int i) {
            boolean z = false;
            if (this.brokers != null) {
                for (int i2 = 0; i2 < this.brokers.length; i2++) {
                    if (this.brokers[i2] != null && this.brokers[i2].nodeId == i) {
                        this.brokers[i2] = null;
                        this.state = State.GET_REQUIRED;
                        z = true;
                    }
                }
                if (z && this.nodeIdsByPartition != null) {
                    this.nextRequiredRequestType = MetadataRequestType.METADATA;
                    for (int i3 = 0; i3 < this.nodeIdsByPartition.length; i3++) {
                        if (this.nodeIdsByPartition[i3] == i) {
                            this.nodeIdsByPartition[i3] = -1;
                        }
                    }
                }
            }
            return z;
        }

        void invalidate() {
            this.state = State.GET_REQUIRED;
            if (this.brokers != null) {
                Arrays.fill(this.brokers, (Object) null);
            }
            if (this.nodeIdsByPartition != null) {
                Arrays.fill(this.nodeIdsByPartition, -1);
            }
            this.nextRequiredRequestType = MetadataRequestType.METADATA;
        }

        boolean initializePartitions(int i) {
            boolean z = true;
            if (this.nodeIdsByPartition == null) {
                this.nodeIdsByPartition = new int[i];
                this.firstOffsetsByPartition = new long[i];
                this.offsetsOutOfRangeByPartition = new long[i];
                Arrays.fill(this.offsetsOutOfRangeByPartition, -1L);
            } else if (this.nodeIdsByPartition.length != i) {
                z = false;
            }
            return z;
        }

        void addPartition(int i, int i2) {
            this.nodeIdsByPartition[i] = i2;
        }

        long firstAvailableOffset(int i) {
            return this.firstOffsetsByPartition[i];
        }

        long ensureOffsetInRange(int i, long j) {
            long j2 = j;
            if (this.offsetsOutOfRangeByPartition[i] == j) {
                j2 = this.firstOffsetsByPartition[i];
                this.offsetsOutOfRangeByPartition[i] = -1;
            }
            return j2;
        }

        int offsetsRequired(int i) {
            int i2 = 0;
            if (this.nodeIdsByPartition != null) {
                for (int i3 = 0; i3 < this.nodeIdsByPartition.length; i3++) {
                    if (this.nodeIdsByPartition[i3] == i && this.offsetsOutOfRangeByPartition[i3] != -1) {
                        i2++;
                    }
                }
            }
            return i2;
        }

        void setFirstOffset(int i, long j) {
            this.firstOffsetsByPartition[i] = j;
        }

        public void setOffsetOutOfRange(int i, long j) {
            this.offsetsOutOfRangeByPartition[i] = j;
        }

        public String toString() {
            return String.format("[TopicMetadata] topicName=%s, errorCode=%s, compacted=%b, state=%s, brokers=%s, nodnodeIdsByPartition=%s", this.topicName, this.errorCode, Boolean.valueOf(this.compacted), this.state, this.brokers, this.nodeIdsByPartition);
        }

        void doAttach(int i, Consumer<TopicMetadata> consumer) {
            this.consumers.put(i, consumer);
            if (this.state == State.COMPLETE) {
                flush();
            }
        }

        void doDetach(int i) {
            this.consumers.remove(i);
            if (this.retryTimer == null || !this.consumers.isEmpty()) {
                return;
            }
            this.retryTimer.cancel();
        }

        void flush() {
            Iterator it = this.consumers.values().iterator();
            while (it.hasNext()) {
                ((Consumer) it.next()).accept(this);
            }
            this.consumers.clear();
        }

        KafkaError errorCode() {
            return this.errorCode;
        }

        boolean hasConsumers() {
            return !this.consumers.isEmpty();
        }

        void visitBrokers(Consumer<BrokerMetadata> consumer) {
            for (BrokerMetadata brokerMetadata : this.brokers) {
                if (brokerMetadata != null) {
                    consumer.accept(brokerMetadata);
                }
            }
        }

        int partitionCount() {
            return this.nodeIdsByPartition.length;
        }

        private TimerWheel.Timer getOrCreateTimer(DelayedTaskScheduler delayedTaskScheduler) {
            if (this.retryTimer == null) {
                this.retryTimer = delayedTaskScheduler.newBlankTimer();
            }
            return this.retryTimer;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NetworkConnectionPool(ClientStreamFactory clientStreamFactory, String str, long j, int i, int i2, BufferPool bufferPool, MessageCache messageCache, Function<String, LongSupplier> function, boolean z, int i3) {
        this.clientStreamFactory = clientStreamFactory;
        this.networkName = str;
        this.networkRef = j;
        this.fetchMaxBytes = i;
        this.fetchPartitionMaxBytes = i2;
        this.bufferPool = bufferPool;
        this.messageCache = messageCache;
        this.forceProactiveMessageCache = z;
        this.routeCounters = clientStreamFactory.counters.supplyRef(str, j);
        this.encodeBuffer = new UnsafeBuffer(new byte[clientStreamFactory.bufferPool.slotCapacity()]);
        this.readIdleTimeout = i3;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int doAttach(String str, Long2LongHashMap long2LongHashMap, int i, OctetsFW octetsFW, ListFW<KafkaHeaderFW> listFW, MessageDispatcher messageDispatcher, IntSupplier intSupplier, Consumer<PartitionProgressHandler> consumer, Consumer<Consumer<Consumer<Boolean>>> consumer2, Consumer<KafkaError> consumer3) {
        TopicMetadata computeIfAbsent = this.topicMetadataByName.computeIfAbsent(str, TopicMetadata::new);
        int i2 = this.nextAttachId;
        this.nextAttachId = i2 + 1;
        computeIfAbsent.doAttach(i2, topicMetadata -> {
            doAttach(str, long2LongHashMap, i, octetsFW, listFW, i2, messageDispatcher, intSupplier, consumer, consumer2, consumer3, topicMetadata);
        });
        if (this.metadataConnection == null) {
            this.metadataConnection = new MetadataConnection();
        }
        this.metadataConnection.doRequestIfNeeded();
        return i2;
    }

    private void doAttach(String str, Long2LongHashMap long2LongHashMap, int i, OctetsFW octetsFW, ListFW<KafkaHeaderFW> listFW, int i2, MessageDispatcher messageDispatcher, IntSupplier intSupplier, Consumer<PartitionProgressHandler> consumer, Consumer<Consumer<Consumer<Boolean>>> consumer2, Consumer<KafkaError> consumer3, TopicMetadata topicMetadata) {
        KafkaError errorCode = topicMetadata.errorCode();
        switch (errorCode) {
            case INVALID_TOPIC_EXCEPTION:
            case UNKNOWN_TOPIC_OR_PARTITION:
                consumer3.accept(errorCode);
                return;
            case NONE:
                long j = (!long2LongHashMap.isEmpty() || topicMetadata.compacted) ? 0L : MAX_OFFSET;
                if (octetsFW != null) {
                    int partition = BufferUtil.partition(i, topicMetadata.partitionCount());
                    long max = Math.max(long2LongHashMap.computeIfAbsent(0L, j2 -> {
                        return j;
                    }), topicMetadata.firstAvailableOffset(partition));
                    if (partition != 0) {
                        long2LongHashMap.remove(0L);
                    }
                    long2LongHashMap.put(partition, max);
                } else {
                    int partitionCount = topicMetadata.partitionCount();
                    for (int i3 = 0; i3 < partitionCount; i3++) {
                        long2LongHashMap.put(i3, Math.max(long2LongHashMap.computeIfAbsent(i3, j3 -> {
                            return j;
                        }), topicMetadata.firstAvailableOffset(i3)));
                    }
                }
                NetworkTopic computeIfAbsent = this.topicsByName.computeIfAbsent(str, str2 -> {
                    return new NetworkTopic(str2, topicMetadata.partitionCount(), topicMetadata.compacted, false, topicMetadata.deleteRetentionMs, false);
                });
                consumer2.accept(consumer4 -> {
                    consumer.accept(computeIfAbsent.doAttach(long2LongHashMap, octetsFW, listFW, messageDispatcher, intSupplier));
                    this.detachersById.put(i2, long2LongHashMap2 -> {
                        computeIfAbsent.doDetach(long2LongHashMap2, octetsFW, listFW, messageDispatcher, intSupplier);
                    });
                    doConnections(topicMetadata);
                    consumer4.accept(Boolean.valueOf(topicMetadata.compacted));
                });
                return;
            default:
                throw new RuntimeException(String.format("Unexpected errorCode %s from metadata query for topic %s", errorCode, str));
        }
    }

    public void addRoute(String str, ListFW<KafkaHeaderFW> listFW, boolean z, BiConsumer<KafkaError, String> biConsumer) {
        TopicMetadata computeIfAbsent = this.topicMetadataByName.computeIfAbsent(str, TopicMetadata::new);
        if (!z) {
            this.routeHeadersByTopic.computeIfAbsent(str, str2 -> {
                return new ArrayList();
            }).add(listFW);
            return;
        }
        int i = this.nextAttachId;
        this.nextAttachId = i + 1;
        computeIfAbsent.doAttach(i, topicMetadata -> {
            addRoute(str, listFW, z, biConsumer, topicMetadata);
        });
        if (this.metadataConnection == null) {
            this.metadataConnection = new MetadataConnection();
        }
        this.metadataConnection.doRequestIfNeeded();
    }

    private void addRoute(String str, ListFW<KafkaHeaderFW> listFW, boolean z, BiConsumer<KafkaError, String> biConsumer, TopicMetadata topicMetadata) {
        KafkaError errorCode = topicMetadata.errorCode();
        switch (errorCode) {
            case INVALID_TOPIC_EXCEPTION:
            case UNKNOWN_TOPIC_OR_PARTITION:
                biConsumer.accept(errorCode, str);
                return;
            case NONE:
                if (topicMetadata.compacted) {
                    this.topicsByName.computeIfAbsent(str, str2 -> {
                        return new NetworkTopic(str2, topicMetadata.partitionCount(), topicMetadata.compacted, z, topicMetadata.deleteRetentionMs, this.forceProactiveMessageCache);
                    }).addRoute(listFW);
                    doConnections(topicMetadata);
                    doFlush();
                    return;
                }
                return;
            default:
                throw new RuntimeException(String.format("Unexpected errorCode %d from metadata query for topic %s", errorCode, str));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doConnections(TopicMetadata topicMetadata) {
        topicMetadata.visitBrokers(brokerMetadata -> {
            this.connections = applyBrokerMetadata(this.connections, brokerMetadata, brokerMetadata -> {
                return new LiveFetchConnection(brokerMetadata);
            });
        });
        topicMetadata.visitBrokers(brokerMetadata2 -> {
            this.historicalConnections = (HistoricalFetchConnection[]) applyBrokerMetadata(this.historicalConnections, brokerMetadata2, brokerMetadata2 -> {
                return new HistoricalFetchConnection(brokerMetadata2);
            });
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v8, types: [org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool$AbstractFetchConnection[]] */
    private <T extends AbstractFetchConnection> T[] applyBrokerMetadata(T[] tArr, BrokerMetadata brokerMetadata, Function<BrokerMetadata, T> function) {
        T[] tArr2 = tArr;
        T t = null;
        int i = 0;
        while (true) {
            if (i >= tArr.length) {
                break;
            }
            T t2 = tArr[i];
            if (t2.brokerId == brokerMetadata.nodeId) {
                t = t2;
                if (!t2.host.equals(brokerMetadata.host) || t2.port != brokerMetadata.port) {
                    t2.close();
                    t = function.apply(brokerMetadata);
                }
            } else {
                i++;
            }
        }
        if (t == null) {
            tArr2 = (AbstractFetchConnection[]) ArrayUtil.add(tArr, function.apply(brokerMetadata));
        }
        return tArr2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doFlush() {
        this.nestedDoFlushCalls++;
        while (this.nestedDoFlushCalls == 1) {
            for (AbstractFetchConnection abstractFetchConnection : this.connections) {
                abstractFetchConnection.doRequestIfNeeded();
            }
            for (HistoricalFetchConnection historicalFetchConnection : this.historicalConnections) {
                historicalFetchConnection.doRequestIfNeeded();
            }
            this.nestedDoFlushCalls = this.nestedDoFlushCalls > 1 ? 1 : 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doDetach(String str, int i, Long2LongHashMap long2LongHashMap) {
        Consumer consumer = (Consumer) this.detachersById.remove(i);
        if (consumer != null) {
            consumer.accept(long2LongHashMap);
            return;
        }
        TopicMetadata topicMetadata = this.topicMetadataByName.get(str);
        if (topicMetadata != null) {
            topicMetadata.doDetach(i);
            if (topicMetadata.hasConsumers()) {
                return;
            }
            this.topicMetadataByName.remove(str);
            this.topicsByName.remove(str);
        }
    }

    void removeConnection(LiveFetchConnection liveFetchConnection) {
        this.connections = (AbstractFetchConnection[]) ArrayUtil.remove(this.connections, liveFetchConnection);
    }

    void removeConnection(HistoricalFetchConnection historicalFetchConnection) {
        this.historicalConnections = (HistoricalFetchConnection[]) ArrayUtil.remove(this.historicalConnections, historicalFetchConnection);
    }

    static /* synthetic */ int access$1908(NetworkConnectionPool networkConnectionPool) {
        int i = networkConnectionPool.nextAttachId;
        networkConnectionPool.nextAttachId = i + 1;
        return i;
    }

    static {
        long j = 0L;
        Objects.requireNonNull(0L);
        NO_COUNTER = j::longValue;
        NOOP_DISPATCHER = new DecoderMessageDispatcher() { // from class: org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.1
            @Override // org.reaktivity.nukleus.kafka.internal.stream.DecoderMessageDispatcher
            public int dispatch(int i, long j2, long j3, long j4, DirectBuffer directBuffer, HeadersFW headersFW, long j5, long j6, DirectBuffer directBuffer2) {
                return 0;
            }

            @Override // org.reaktivity.nukleus.kafka.internal.stream.DecoderMessageDispatcher
            public void flush(int i, long j2, long j3) {
            }
        };
        MATCHING_MESSAGE_DISPATCHER = new MessageDispatcher() { // from class: org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.2
            @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
            public void adjustOffset(int i, long j2, long j3) {
            }

            @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
            public void detach(boolean z) {
            }

            @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
            public int dispatch(int i, long j2, long j3, DirectBuffer directBuffer, Function<DirectBuffer, Iterator<DirectBuffer>> function, long j4, long j5, DirectBuffer directBuffer2) {
                return 1;
            }

            @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
            public void flush(int i, long j2, long j3) {
            }
        };
    }
}
