package org.reaktivity.nukleus.kafka.internal.stream;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.IntSupplier;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.ArrayUtil;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2LongHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.buffer.BufferPool;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.kafka.internal.function.PartitionProgressHandler;
import org.reaktivity.nukleus.kafka.internal.function.PartitionResponseConsumer;
import org.reaktivity.nukleus.kafka.internal.types.Flyweight;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.ResponseHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.FetchRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.FetchResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.PartitionRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.PartitionResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.RecordSetFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.TopicRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.TopicResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.metadata.BrokerMetadataFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.metadata.MetadataRequestFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.metadata.MetadataResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.metadata.MetadataResponsePart2FW;
import org.reaktivity.nukleus.kafka.internal.types.codec.metadata.PartitionMetadataFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.metadata.TopicMetadataFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.AbortFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.DataFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.EndFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.TcpBeginExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.WindowFW;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool.class */
public final class NetworkConnectionPool {
    private static final short FETCH_API_VERSION = 5;
    private static final short FETCH_API_KEY = 1;
    private static final short METADATA_API_VERSION = 5;
    private static final short METADATA_API_KEY = 3;
    private static final byte[] ANY_IP_ADDR = new byte[4];
    final MutableDirectBuffer encodeBuffer;
    private final ClientStreamFactory clientStreamFactory;
    private final String networkName;
    private final long networkRef;
    private final BufferPool bufferPool;
    private int nextAttachId;
    final RequestHeaderFW.Builder requestRW = new RequestHeaderFW.Builder();
    final FetchRequestFW.Builder fetchRequestRW = new FetchRequestFW.Builder();
    final TopicRequestFW.Builder topicRequestRW = new TopicRequestFW.Builder();
    final PartitionRequestFW.Builder partitionRequestRW = new PartitionRequestFW.Builder();
    final MetadataRequestFW.Builder metadataRequestRW = new MetadataRequestFW.Builder();
    final WindowFW windowRO = new WindowFW();
    final OctetsFW.Builder payloadRW = new OctetsFW.Builder();
    final TcpBeginExFW.Builder tcpBeginExRW = new TcpBeginExFW.Builder();
    final ResponseHeaderFW responseRO = new ResponseHeaderFW();
    final FetchResponseFW fetchResponseRO = new FetchResponseFW();
    final TopicResponseFW topicResponseRO = new TopicResponseFW();
    final PartitionResponseFW partitionResponseRO = new PartitionResponseFW();
    final RecordSetFW recordSetRO = new RecordSetFW();
    final MetadataResponseFW metadataResponseRO = new MetadataResponseFW();
    final BrokerMetadataFW brokerMetadataRO = new BrokerMetadataFW();
    final MetadataResponsePart2FW metadataResponsePart2RO = new MetadataResponsePart2FW();
    final TopicMetadataFW topicMetadataRO = new TopicMetadataFW();
    final PartitionMetadataFW partitionMetadataRO = new PartitionMetadataFW();
    private AbstractFetchConnection[] connections = new LiveFetchConnection[0];
    private HistoricalFetchConnection[] historicalConnections = new HistoricalFetchConnection[0];
    private final MetadataConnection metadataConnection = new MetadataConnection();
    private final Map<String, NetworkTopic> topicsByName = new LinkedHashMap();
    private final Map<String, TopicMetadata> topicMetadataByName = new HashMap();
    private final Int2ObjectHashMap<Consumer<Long2LongHashMap>> detachersById = new Int2ObjectHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$AbstractFetchConnection.class */
    public abstract class AbstractFetchConnection extends AbstractNetworkConnection {
        private final String host;
        private final int port;
        final int brokerId;
        int encodeLimit;
        int maxFetchBytes;

        private AbstractFetchConnection(BrokerMetadata brokerMetadata) {
            super();
            this.brokerId = brokerMetadata.nodeId;
            this.host = brokerMetadata.host;
            this.port = brokerMetadata.port;
        }

        /* JADX WARN: Type inference failed for: r0v10, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v21, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.FetchRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v46, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.FetchRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v58, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v68, types: [org.reaktivity.nukleus.kafka.internal.types.OctetsFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v84, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.TopicRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v95, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.TopicRequestFW$Builder] */
        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        void doRequestIfNeeded() {
            if (this.nextRequestId == this.nextResponseId) {
                doBeginIfNotConnected((mutableDirectBuffer, i, i2) -> {
                    return NetworkConnectionPool.this.tcpBeginExRW.wrap2(mutableDirectBuffer, i, i2).localAddress(builder -> {
                        builder.ipv4Address(builder -> {
                            builder.put(NetworkConnectionPool.ANY_IP_ADDR);
                        });
                    }).localPort(0).remoteAddress(builder2 -> {
                        builder2.host(this.host);
                    }).remotePort(this.port).limit();
                });
                if (this.networkRequestBudget > this.networkRequestPadding) {
                    this.encodeLimit = 512;
                    RequestHeaderFW build = NetworkConnectionPool.this.requestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, this.encodeLimit, NetworkConnectionPool.this.encodeBuffer.capacity()).size(0).apiKey((short) 1).apiVersion((short) 5).correlationId(0).clientId((String) null).build();
                    this.encodeLimit = build.limit();
                    FetchRequestFW build2 = NetworkConnectionPool.this.fetchRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, this.encodeLimit, NetworkConnectionPool.this.encodeBuffer.capacity()).maxWaitTimeMillis(500).minBytes(1).maxBytes(0).isolationLevel((byte) 0).topicCount(0).build();
                    this.encodeLimit = build2.limit();
                    this.maxFetchBytes = 0;
                    int i3 = 0;
                    for (String str : NetworkConnectionPool.this.topicsByName.keySet()) {
                        int i4 = this.encodeLimit;
                        TopicRequestFW build3 = NetworkConnectionPool.this.topicRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, this.encodeLimit, NetworkConnectionPool.this.encodeBuffer.capacity()).name(str).partitionCount(0).build();
                        this.encodeLimit = build3.limit();
                        int addTopicToRequest = addTopicToRequest(str);
                        if (addTopicToRequest > 0) {
                            NetworkConnectionPool.this.topicRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build3.offset(), build3.limit()).name(build3.name()).partitionCount(addTopicToRequest).build();
                            i3++;
                        } else {
                            this.encodeLimit = i4;
                        }
                    }
                    if (i3 <= 0 || (this.encodeLimit - 512) + this.networkRequestPadding > this.networkRequestBudget) {
                        return;
                    }
                    NetworkConnectionPool.this.fetchRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build2.offset(), build2.limit()).maxWaitTimeMillis(build2.maxWaitTimeMillis()).minBytes(build2.minBytes()).maxBytes(this.maxFetchBytes).isolationLevel(build2.isolationLevel()).topicCount(i3).build();
                    int i5 = this.nextRequestId;
                    this.nextRequestId = i5 + 1;
                    NetworkConnectionPool.this.requestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build.offset(), build.limit()).size((this.encodeLimit - 512) - 4).apiKey((short) 1).apiVersion((short) 5).correlationId(i5).clientId((String) null).build();
                    OctetsFW build4 = NetworkConnectionPool.this.payloadRW.wrap2(NetworkConnectionPool.this.encodeBuffer, 512, this.encodeLimit).set((mutableDirectBuffer2, i6, i7) -> {
                        return i7 - i6;
                    }).build();
                    NetworkConnectionPool.this.clientStreamFactory.doData(this.networkTarget, this.networkId, build4);
                    this.networkRequestBudget -= build4.sizeof() + this.networkRequestPadding;
                }
            }
        }

        abstract int addTopicToRequest(String str);

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        void handleResponse(DirectBuffer directBuffer, int i, int i2) {
            FetchResponseFW wrap = NetworkConnectionPool.this.fetchResponseRO.wrap(directBuffer, i, i2);
            int i3 = wrap.topicCount();
            int limit = wrap.limit();
            for (int i4 = 0; i4 < i3; i4++) {
                TopicResponseFW wrap2 = NetworkConnectionPool.this.topicResponseRO.wrap(directBuffer, limit, i2);
                String asString = wrap2.name().asString();
                int partitionCount = wrap2.partitionCount();
                limit = wrap2.limit();
                NetworkTopic networkTopic = (NetworkTopic) NetworkConnectionPool.this.topicsByName.get(asString);
                for (int i5 = 0; i5 < partitionCount; i5++) {
                    PartitionResponseFW wrap3 = NetworkConnectionPool.this.partitionResponseRO.wrap(directBuffer, limit, i2);
                    RecordSetFW wrap4 = NetworkConnectionPool.this.recordSetRO.wrap(directBuffer, wrap3.limit(), i2);
                    limit = wrap4.limit() + wrap4.recordBatchSize();
                    if (networkTopic != null) {
                        networkTopic.onPartitionResponse(wrap3.buffer(), wrap3.offset(), limit - wrap3.offset(), getRequestedOffset(networkTopic, wrap3.partitionId()));
                    }
                }
            }
        }

        abstract long getRequestedOffset(NetworkTopic networkTopic, int i);

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        public String toString() {
            return String.format("[brokerId=%d, host=%s, port=%d, budget=%d, padding=%d]", Integer.valueOf(this.brokerId), this.host, Integer.valueOf(this.port), Integer.valueOf(this.networkRequestBudget), Integer.valueOf(this.networkRequestPadding));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$AbstractNetworkConnection.class */
    public abstract class AbstractNetworkConnection {
        final MessageConsumer networkTarget;
        long networkId;
        int networkRequestBudget;
        int networkRequestPadding;
        int networkResponseBudget;
        int networkResponsePadding;
        MessageConsumer networkReplyThrottle;
        long networkReplyId;
        private MessageConsumer streamState;
        int networkSlot;
        int networkSlotOffset;
        int nextRequestId;
        int nextResponseId;
        static final /* synthetic */ boolean $assertionsDisabled;

        private AbstractNetworkConnection() {
            this.networkSlot = -1;
            this.networkTarget = NetworkConnectionPool.this.clientStreamFactory.router.supplyTarget(NetworkConnectionPool.this.networkName);
        }

        public String toString() {
            return String.format("[budget=%d, padding=%d]", Integer.valueOf(this.networkRequestBudget), Integer.valueOf(this.networkRequestPadding));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public MessageConsumer onCorrelated(MessageConsumer messageConsumer, long j) {
            this.networkReplyThrottle = messageConsumer;
            this.networkReplyId = j;
            this.streamState = this::beforeBegin;
            return this::handleStream;
        }

        void doBeginIfNotConnected() {
            doBeginIfNotConnected((mutableDirectBuffer, i, i2) -> {
                return 0;
            });
        }

        void doBeginIfNotConnected(Flyweight.Builder.Visitor visitor) {
            if (this.networkId == 0) {
                long asLong = NetworkConnectionPool.this.clientStreamFactory.supplyStreamId.getAsLong();
                long asLong2 = NetworkConnectionPool.this.clientStreamFactory.supplyCorrelationId.getAsLong();
                NetworkConnectionPool.this.clientStreamFactory.correlations.put(asLong2, this);
                NetworkConnectionPool.this.clientStreamFactory.doBegin(this.networkTarget, asLong, NetworkConnectionPool.this.networkRef, asLong2, visitor);
                NetworkConnectionPool.this.clientStreamFactory.router.setThrottle(NetworkConnectionPool.this.networkName, asLong, this::handleThrottle);
                this.networkId = asLong;
            }
        }

        abstract void doRequestIfNeeded();

        void close() {
            if (this.networkId != 0) {
                NetworkConnectionPool.this.clientStreamFactory.doEnd(this.networkTarget, this.networkId);
                this.networkId = 0L;
            }
        }

        private void handleThrottle(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1073741825:
                    handleReset(NetworkConnectionPool.this.clientStreamFactory.resetRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741826:
                    handleWindow(NetworkConnectionPool.this.windowRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                default:
                    return;
            }
        }

        private void handleWindow(WindowFW windowFW) {
            int credit = windowFW.credit();
            int padding = windowFW.padding();
            this.networkRequestBudget += credit;
            this.networkRequestPadding = padding;
            doRequestIfNeeded();
        }

        private void handleReset(ResetFW resetFW) {
            doReinitialize();
            doRequestIfNeeded();
        }

        private void handleStream(int i, DirectBuffer directBuffer, int i2, int i3) {
            this.streamState.accept(i, directBuffer, i2, i3);
        }

        private void beforeBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
            if (i == 1) {
                handleBegin(NetworkConnectionPool.this.clientStreamFactory.beginRO.wrap(directBuffer, i2, i2 + i3));
            } else {
                NetworkConnectionPool.this.clientStreamFactory.doReset(this.networkReplyThrottle, this.networkReplyId);
            }
        }

        private void afterBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 2:
                    handleData(NetworkConnectionPool.this.clientStreamFactory.dataRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 3:
                    handleEnd(NetworkConnectionPool.this.clientStreamFactory.endRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 4:
                    handleAbort(NetworkConnectionPool.this.clientStreamFactory.abortRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                default:
                    NetworkConnectionPool.this.clientStreamFactory.doReset(this.networkReplyThrottle, this.networkReplyId);
                    return;
            }
        }

        private void handleBegin(BeginFW beginFW) {
            doOfferResponseBudget();
            this.streamState = this::afterBegin;
        }

        private void handleData(DataFW dataFW) {
            OctetsFW payload = dataFW.payload();
            this.networkResponseBudget -= payload.sizeof() + this.networkResponsePadding;
            if (this.networkResponseBudget < 0) {
                NetworkConnectionPool.this.clientStreamFactory.doReset(this.networkReplyThrottle, this.networkReplyId);
                return;
            }
            try {
                DirectBuffer buffer = payload.buffer();
                int offset = payload.offset();
                int limit = payload.limit();
                if (this.networkSlot != -1) {
                    DirectBuffer buffer2 = NetworkConnectionPool.this.clientStreamFactory.bufferPool.buffer(this.networkSlot);
                    int i = limit - offset;
                    buffer2.putBytes(this.networkSlotOffset, buffer, offset, i);
                    this.networkSlotOffset += i;
                    buffer = buffer2;
                    offset = 0;
                    limit = this.networkSlotOffset;
                }
                ResponseHeaderFW responseHeaderFW = null;
                if (offset + 4 <= limit) {
                    responseHeaderFW = NetworkConnectionPool.this.responseRO.wrap(buffer, offset, limit);
                    offset = responseHeaderFW.limit();
                }
                if (responseHeaderFW != null && offset + responseHeaderFW.size() <= limit) {
                    handleResponse(buffer, offset, limit);
                    int size = offset + responseHeaderFW.size();
                    this.nextResponseId++;
                    if (size < limit) {
                        if (this.networkSlot == -1) {
                            this.networkSlot = NetworkConnectionPool.this.clientStreamFactory.bufferPool.acquire(this.networkReplyId);
                        }
                        NetworkConnectionPool.this.clientStreamFactory.bufferPool.buffer(this.networkSlot).putBytes(0, buffer, size, limit - size);
                        this.networkSlotOffset = limit - size;
                    } else {
                        if (!$assertionsDisabled && size != limit) {
                            throw new AssertionError();
                        }
                        this.networkSlotOffset = 0;
                    }
                    doOfferResponseBudget();
                    doRequestIfNeeded();
                } else if (this.networkSlot == -1) {
                    this.networkSlot = NetworkConnectionPool.this.clientStreamFactory.bufferPool.acquire(this.networkReplyId);
                    NetworkConnectionPool.this.clientStreamFactory.bufferPool.buffer(this.networkSlot).putBytes(this.networkSlotOffset, payload.buffer(), payload.offset(), payload.sizeof());
                    this.networkSlotOffset += payload.sizeof();
                }
                if (this.networkSlotOffset != 0 || this.networkSlot == -1) {
                    return;
                }
                NetworkConnectionPool.this.clientStreamFactory.bufferPool.release(this.networkSlot);
                this.networkSlot = -1;
            } catch (Throwable th) {
                if (this.networkSlotOffset == 0 && this.networkSlot != -1) {
                    NetworkConnectionPool.this.clientStreamFactory.bufferPool.release(this.networkSlot);
                    this.networkSlot = -1;
                }
                throw th;
            }
        }

        abstract void handleResponse(DirectBuffer directBuffer, int i, int i2);

        private void handleEnd(EndFW endFW) {
            doReinitialize();
            doRequestIfNeeded();
        }

        private void handleAbort(AbortFW abortFW) {
            doReinitialize();
            doRequestIfNeeded();
        }

        void doOfferResponseBudget() {
            int max = Math.max((NetworkConnectionPool.this.bufferPool.slotCapacity() - this.networkSlotOffset) - this.networkResponseBudget, 0);
            NetworkConnectionPool.this.clientStreamFactory.doWindow(this.networkReplyThrottle, this.networkReplyId, max, this.networkResponsePadding, 0L);
            this.networkResponseBudget += max;
        }

        private void doReinitialize() {
            this.networkRequestBudget = 0;
            this.networkRequestPadding = 0;
            this.networkId = 0L;
            this.nextRequestId = 0;
            this.nextResponseId = 0;
        }

        static {
            $assertionsDisabled = !NetworkConnectionPool.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$BrokerMetadata.class */
    public static final class BrokerMetadata {
        final int nodeId;
        final String host;
        final int port;

        BrokerMetadata(int i, String str, int i2) {
            this.nodeId = i;
            this.host = str;
            this.port = i2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$HistoricalFetchConnection.class */
    public final class HistoricalFetchConnection extends AbstractFetchConnection {
        private HistoricalFetchConnection(BrokerMetadata brokerMetadata) {
            super(brokerMetadata);
        }

        /* JADX WARN: Type inference failed for: r0v34, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.PartitionRequestFW$Builder] */
        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractFetchConnection
        int addTopicToRequest(String str) {
            int i = 0;
            NetworkTopic networkTopic = (NetworkTopic) NetworkConnectionPool.this.topicsByName.get(str);
            if (networkTopic.needsHistorical()) {
                int maximumWritableBytes = networkTopic.maximumWritableBytes();
                int[] iArr = ((TopicMetadata) NetworkConnectionPool.this.topicMetadataByName.get(str)).nodeIdsByPartition;
                int i2 = -1;
                for (NetworkTopicPartition networkTopicPartition : networkTopic.partitions) {
                    if (networkTopic.needsHistorical(networkTopicPartition.id) && i2 < networkTopicPartition.id && iArr[networkTopicPartition.id] == this.brokerId) {
                        this.encodeLimit = NetworkConnectionPool.this.partitionRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, this.encodeLimit, NetworkConnectionPool.this.encodeBuffer.capacity()).partitionId(networkTopicPartition.id).fetchOffset(networkTopicPartition.offset).maxBytes(maximumWritableBytes).build().limit();
                        i2 = networkTopicPartition.id;
                        i++;
                        this.maxFetchBytes += maximumWritableBytes;
                    }
                }
            }
            return i;
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractFetchConnection
        long getRequestedOffset(NetworkTopic networkTopic, int i) {
            return networkTopic.getLowestOffset(i);
        }
    }

    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$LiveFetchConnection.class */
    private final class LiveFetchConnection extends AbstractFetchConnection {
        LiveFetchConnection(BrokerMetadata brokerMetadata) {
            super(brokerMetadata);
        }

        /* JADX WARN: Type inference failed for: r0v38, types: [org.reaktivity.nukleus.kafka.internal.types.codec.fetch.PartitionRequestFW$Builder] */
        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractFetchConnection
        int addTopicToRequest(String str) {
            NetworkTopic networkTopic = (NetworkTopic) NetworkConnectionPool.this.topicsByName.get(str);
            int maximumWritableBytes = networkTopic.maximumWritableBytes();
            int[] iArr = ((TopicMetadata) NetworkConnectionPool.this.topicMetadataByName.get(str)).nodeIdsByPartition;
            int i = 0;
            Iterator<NetworkTopicPartition> it = networkTopic.partitions.iterator();
            NetworkTopicPartition next = it.hasNext() ? it.next() : null;
            while (true) {
                NetworkTopicPartition networkTopicPartition = next;
                if (networkTopicPartition == null) {
                    return i;
                }
                NetworkTopicPartition next2 = it.hasNext() ? it.next() : null;
                if ((next2 == null || next2.id != networkTopicPartition.id) && iArr[networkTopicPartition.id] == this.brokerId) {
                    this.encodeLimit = NetworkConnectionPool.this.partitionRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, this.encodeLimit, NetworkConnectionPool.this.encodeBuffer.capacity()).partitionId(networkTopicPartition.id).fetchOffset(networkTopicPartition.offset).maxBytes(maximumWritableBytes).build().limit();
                    i++;
                    this.maxFetchBytes += maximumWritableBytes;
                }
                next = next2;
            }
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractFetchConnection
        long getRequestedOffset(NetworkTopic networkTopic, int i) {
            return networkTopic.getHighestOffset(i);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$MetadataConnection.class */
    public final class MetadataConnection extends AbstractNetworkConnection {
        TopicMetadata pendingTopicMetadata;
        static final /* synthetic */ boolean $assertionsDisabled;

        private MetadataConnection() {
            super();
        }

        /* JADX WARN: Type inference failed for: r0v19, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v31, types: [org.reaktivity.nukleus.kafka.internal.types.codec.metadata.MetadataRequestFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v45, types: [org.reaktivity.nukleus.kafka.internal.types.codec.RequestHeaderFW$Builder] */
        /* JADX WARN: Type inference failed for: r0v55, types: [org.reaktivity.nukleus.kafka.internal.types.OctetsFW$Builder] */
        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        void doRequestIfNeeded() {
            if (this.nextRequestId == this.nextResponseId) {
                Optional findFirst = NetworkConnectionPool.this.topicMetadataByName.values().stream().filter(topicMetadata -> {
                    return topicMetadata.nodeIdsByPartition == null;
                }).findFirst();
                if (findFirst.isPresent()) {
                    this.pendingTopicMetadata = (TopicMetadata) findFirst.get();
                    doBeginIfNotConnected();
                    if (this.networkRequestBudget > this.networkRequestPadding) {
                        RequestHeaderFW build = NetworkConnectionPool.this.requestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, 0, NetworkConnectionPool.this.encodeBuffer.capacity()).size(0).apiKey((short) 3).apiVersion((short) 5).correlationId(0).clientId((String) null).build();
                        int limit = NetworkConnectionPool.this.metadataRequestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build.limit(), NetworkConnectionPool.this.encodeBuffer.capacity()).topicCount(1).topicName(((TopicMetadata) findFirst.get()).topicName).build().limit();
                        if ((limit - 0) + this.networkRequestPadding <= this.networkRequestBudget) {
                            int i = this.nextRequestId;
                            this.nextRequestId = i + 1;
                            NetworkConnectionPool.this.requestRW.wrap2(NetworkConnectionPool.this.encodeBuffer, build.offset(), build.limit()).size((limit - 0) - 4).apiKey((short) 3).apiVersion((short) 5).correlationId(i).clientId((String) null).build();
                            OctetsFW build2 = NetworkConnectionPool.this.payloadRW.wrap2(NetworkConnectionPool.this.encodeBuffer, 0, limit).set((mutableDirectBuffer, i2, i3) -> {
                                return i3 - i2;
                            }).build();
                            NetworkConnectionPool.this.clientStreamFactory.doData(this.networkTarget, this.networkId, build2);
                            this.networkRequestBudget -= build2.sizeof() + this.networkRequestPadding;
                        }
                    }
                }
            }
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool.AbstractNetworkConnection
        void handleResponse(DirectBuffer directBuffer, int i, int i2) {
            MetadataResponseFW wrap = NetworkConnectionPool.this.metadataResponseRO.wrap(directBuffer, i, i2);
            int brokerCount = wrap.brokerCount();
            this.pendingTopicMetadata.initializeBrokers(brokerCount);
            int limit = wrap.limit();
            for (int i3 = 0; i3 < brokerCount; i3++) {
                BrokerMetadataFW wrap2 = NetworkConnectionPool.this.brokerMetadataRO.wrap(directBuffer, limit, i2);
                this.pendingTopicMetadata.addBroker(wrap2.nodeId(), wrap2.host().asString(), wrap2.port());
                limit = wrap2.limit();
            }
            MetadataResponsePart2FW wrap3 = NetworkConnectionPool.this.metadataResponsePart2RO.wrap(directBuffer, limit, i2);
            int i4 = wrap3.topicCount();
            if (!$assertionsDisabled && i4 != 1) {
                throw new AssertionError();
            }
            int limit2 = wrap3.limit();
            short s = 0;
            for (int i5 = 0; i5 < i4 && s == 0; i5++) {
                TopicMetadataFW wrap4 = NetworkConnectionPool.this.topicMetadataRO.wrap(directBuffer, limit2, i2);
                String asString = wrap4.topic().asString();
                if (!$assertionsDisabled && !asString.equals(this.pendingTopicMetadata.topicName)) {
                    throw new AssertionError();
                }
                s = wrap4.errorCode();
                if (s != 0) {
                    break;
                }
                int partitionCount = wrap4.partitionCount();
                limit2 = wrap4.limit();
                this.pendingTopicMetadata.initializePartitions(partitionCount);
                for (int i6 = 0; i6 < partitionCount && s == 0; i6++) {
                    PartitionMetadataFW wrap5 = NetworkConnectionPool.this.partitionMetadataRO.wrap(directBuffer, limit2, i2);
                    s = wrap5.errorCode();
                    this.pendingTopicMetadata.addPartition(wrap5.partitionId(), wrap5.leader());
                    limit2 = wrap5.limit();
                }
            }
            if (KafkaErrors.isRecoverable(s)) {
                this.pendingTopicMetadata.reset();
                doRequestIfNeeded();
            } else {
                this.pendingTopicMetadata.setErrorCode(s);
                this.pendingTopicMetadata.flush();
            }
        }

        static {
            $assertionsDisabled = !NetworkConnectionPool.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$NetworkTopic.class */
    public final class NetworkTopic {
        private final String topicName;
        static final /* synthetic */ boolean $assertionsDisabled;
        private BitSet needsHistoricalByPartition = new BitSet();
        private final Set<PartitionResponseConsumer> recordConsumers = new HashSet();
        private final Set<IntSupplier> windowSuppliers = new HashSet();
        final NavigableSet<NetworkTopicPartition> partitions = new TreeSet();
        private final NetworkTopicPartition candidate = new NetworkTopicPartition();
        private final PartitionProgressHandler progressHandler = this::handleProgress;

        public String toString() {
            return String.format("topicName=%s, partitions=%s", this.topicName, this.partitions);
        }

        NetworkTopic(String str) {
            this.topicName = str;
        }

        void doAttach(Long2LongHashMap long2LongHashMap, PartitionResponseConsumer partitionResponseConsumer, IntSupplier intSupplier) {
            this.recordConsumers.add(partitionResponseConsumer);
            this.windowSuppliers.add(intSupplier);
            this.candidate.id = -1;
            Long2LongHashMap.LongIterator it = long2LongHashMap.values().iterator();
            while (it.hasNext()) {
                this.candidate.id++;
                this.candidate.offset = it.nextValue();
                NetworkTopicPartition floor = this.partitions.floor(this.candidate);
                if (floor == null || floor.id != this.candidate.id) {
                    NetworkTopicPartition ceiling = this.partitions.ceiling(this.candidate);
                    this.needsHistoricalByPartition.set(this.candidate.id, ceiling != null && ceiling.id == this.candidate.id);
                    floor = new NetworkTopicPartition();
                    floor.id = this.candidate.id;
                    floor.offset = this.candidate.offset;
                    this.partitions.add(floor);
                }
                NetworkTopicPartition.access$1508(floor);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void doDetach(Long2LongHashMap long2LongHashMap, PartitionResponseConsumer partitionResponseConsumer, IntSupplier intSupplier) {
            this.recordConsumers.remove(partitionResponseConsumer);
            this.windowSuppliers.remove(intSupplier);
            this.candidate.id = -1;
            Long2LongHashMap.LongIterator it = long2LongHashMap.values().iterator();
            while (it.hasNext()) {
                this.candidate.id++;
                this.candidate.offset = it.nextValue();
                NetworkTopicPartition floor = this.partitions.floor(this.candidate);
                if (floor != null) {
                    if (!$assertionsDisabled && floor.id != this.candidate.id) {
                        throw new AssertionError();
                    }
                    NetworkTopicPartition.access$1510(floor);
                    if (floor.refs == 0) {
                        remove(floor);
                    }
                }
            }
        }

        long getHighestOffset(int i) {
            this.candidate.id = i;
            this.candidate.offset = Long.MAX_VALUE;
            NetworkTopicPartition floor = this.partitions.floor(this.candidate);
            if ($assertionsDisabled || floor.id == i) {
                return floor.offset;
            }
            throw new AssertionError();
        }

        long getLowestOffset(int i) {
            this.candidate.id = i;
            this.candidate.offset = 0L;
            NetworkTopicPartition ceiling = this.partitions.ceiling(this.candidate);
            if ($assertionsDisabled || ceiling.id == i) {
                return ceiling.offset;
            }
            throw new AssertionError();
        }

        void onPartitionResponse(DirectBuffer directBuffer, int i, int i2, long j) {
            Iterator<PartitionResponseConsumer> it = this.recordConsumers.iterator();
            while (it.hasNext()) {
                it.next().accept(directBuffer, i, i2, j, this.progressHandler);
            }
        }

        int maximumWritableBytes() {
            int i = 0;
            Iterator<IntSupplier> it = this.windowSuppliers.iterator();
            while (it.hasNext()) {
                i = Math.max(i, it.next().getAsInt());
            }
            return i;
        }

        private void handleProgress(int i, long j, long j2) {
            this.candidate.id = i;
            this.candidate.offset = j;
            NetworkTopicPartition floor = this.partitions.floor(this.candidate);
            if (!$assertionsDisabled && floor == null) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && floor.offset != j) {
                throw new AssertionError();
            }
            NetworkTopicPartition.access$1510(floor);
            this.candidate.offset = j2;
            NetworkTopicPartition floor2 = this.partitions.floor(this.candidate);
            if (floor2 == null || floor2.offset != j2) {
                if (floor2 != null && floor2 != floor) {
                    this.needsHistoricalByPartition.set(i);
                }
                floor2 = new NetworkTopicPartition();
                floor2.id = i;
                floor2.offset = j2;
                this.partitions.add(floor2);
            }
            NetworkTopicPartition.access$1508(floor2);
            if (floor.refs == 0) {
                remove(floor);
            }
        }

        private void remove(NetworkTopicPartition networkTopicPartition) {
            this.partitions.remove(networkTopicPartition);
            boolean z = false;
            networkTopicPartition.offset = 0L;
            NetworkTopicPartition ceiling = this.partitions.ceiling(networkTopicPartition);
            if (ceiling != null && ceiling.id == networkTopicPartition.id) {
                networkTopicPartition.offset = Long.MAX_VALUE;
                if (this.partitions.floor(networkTopicPartition) != ceiling) {
                    z = true;
                }
            }
            this.needsHistoricalByPartition.set(networkTopicPartition.id, z);
        }

        boolean needsHistorical() {
            return this.needsHistoricalByPartition.nextSetBit(0) != -1;
        }

        boolean needsHistorical(int i) {
            return this.needsHistoricalByPartition.get(i);
        }

        static {
            $assertionsDisabled = !NetworkConnectionPool.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$NetworkTopicPartition.class */
    public static final class NetworkTopicPartition implements Comparable<NetworkTopicPartition> {
        private static final NetworkTopicPartition NONE = new NetworkTopicPartition();
        int id;
        long offset;
        private int refs;

        private NetworkTopicPartition() {
        }

        @Override // java.lang.Comparable
        public int compareTo(NetworkTopicPartition networkTopicPartition) {
            int i = this.id - networkTopicPartition.id;
            if (i == 0) {
                i = (int) (((this.offset - networkTopicPartition.offset) >> 32) & (-1));
            }
            if (i == 0) {
                i = (int) ((this.offset - networkTopicPartition.offset) & (-1));
            }
            return i;
        }

        public String toString() {
            return String.format("id=%d, offset=%d, refs=%d", Integer.valueOf(this.id), Long.valueOf(this.offset), Integer.valueOf(this.refs));
        }

        static /* synthetic */ int access$1508(NetworkTopicPartition networkTopicPartition) {
            int i = networkTopicPartition.refs;
            networkTopicPartition.refs = i + 1;
            return i;
        }

        static /* synthetic */ int access$1510(NetworkTopicPartition networkTopicPartition) {
            int i = networkTopicPartition.refs;
            networkTopicPartition.refs = i - 1;
            return i;
        }

        static {
            NONE.id = -1;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/NetworkConnectionPool$TopicMetadata.class */
    public static final class TopicMetadata {
        private final String topicName;
        private short errorCode;
        BrokerMetadata[] brokers;
        private int nextBrokerIndex;
        private int[] nodeIdsByPartition;
        private List<Consumer<TopicMetadata>> consumers = new ArrayList();

        TopicMetadata(String str) {
            this.topicName = str;
        }

        void setErrorCode(short s) {
            this.errorCode = s;
        }

        void initializeBrokers(int i) {
            this.brokers = new BrokerMetadata[i];
        }

        void addBroker(int i, String str, int i2) {
            BrokerMetadata[] brokerMetadataArr = this.brokers;
            int i3 = this.nextBrokerIndex;
            this.nextBrokerIndex = i3 + 1;
            brokerMetadataArr[i3] = new BrokerMetadata(i, str, i2);
        }

        void initializePartitions(int i) {
            this.nodeIdsByPartition = new int[i];
        }

        void addPartition(int i, int i2) {
            this.nodeIdsByPartition[i] = i2;
        }

        void doAttach(Consumer<TopicMetadata> consumer) {
            this.consumers.add(consumer);
            if (this.nodeIdsByPartition != null) {
                flush();
            }
        }

        void flush() {
            Iterator<Consumer<TopicMetadata>> it = this.consumers.iterator();
            while (it.hasNext()) {
                it.next().accept(this);
            }
            this.consumers.clear();
        }

        short errorCode() {
            return this.errorCode;
        }

        void visitBrokers(Consumer<BrokerMetadata> consumer) {
            for (BrokerMetadata brokerMetadata : this.brokers) {
                consumer.accept(brokerMetadata);
            }
        }

        int partitionCount() {
            return this.nodeIdsByPartition.length;
        }

        void reset() {
            this.brokers = null;
            this.nodeIdsByPartition = null;
            this.nextBrokerIndex = 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NetworkConnectionPool(ClientStreamFactory clientStreamFactory, String str, long j, BufferPool bufferPool) {
        this.clientStreamFactory = clientStreamFactory;
        this.networkName = str;
        this.networkRef = j;
        this.bufferPool = bufferPool;
        this.encodeBuffer = new UnsafeBuffer(new byte[clientStreamFactory.bufferPool.slotCapacity()]);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doAttach(String str, Long2LongHashMap long2LongHashMap, PartitionResponseConsumer partitionResponseConsumer, IntSupplier intSupplier, IntConsumer intConsumer, IntConsumer intConsumer2) {
        this.topicMetadataByName.computeIfAbsent(str, TopicMetadata::new).doAttach(topicMetadata -> {
            doAttach(str, long2LongHashMap, partitionResponseConsumer, intSupplier, intConsumer, intConsumer2, topicMetadata);
        });
        this.metadataConnection.doRequestIfNeeded();
    }

    private void doAttach(String str, Long2LongHashMap long2LongHashMap, PartitionResponseConsumer partitionResponseConsumer, IntSupplier intSupplier, IntConsumer intConsumer, IntConsumer intConsumer2, TopicMetadata topicMetadata) {
        short errorCode = topicMetadata.errorCode();
        switch (errorCode) {
            case 0:
                break;
            case 3:
                intConsumer2.accept(errorCode);
                return;
            case 5:
                return;
            case 17:
                intConsumer2.accept(errorCode);
                return;
            default:
                throw new RuntimeException(String.format("Unexpected errorCode %d from metadata query", Short.valueOf(errorCode)));
        }
        while (long2LongHashMap.size() < topicMetadata.partitionCount()) {
            long2LongHashMap.put(long2LongHashMap.size(), 0L);
        }
        NetworkTopic computeIfAbsent = this.topicsByName.computeIfAbsent(str, str2 -> {
            return new NetworkTopic(str2);
        });
        computeIfAbsent.doAttach(long2LongHashMap, partitionResponseConsumer, intSupplier);
        int i = this.nextAttachId;
        this.nextAttachId = i + 1;
        this.detachersById.put(i, long2LongHashMap2 -> {
            computeIfAbsent.doDetach(long2LongHashMap2, partitionResponseConsumer, intSupplier);
        });
        topicMetadata.visitBrokers(brokerMetadata -> {
            this.connections = applyBrokerMetadata(this.connections, brokerMetadata, brokerMetadata -> {
                return new LiveFetchConnection(brokerMetadata);
            });
        });
        topicMetadata.visitBrokers(brokerMetadata2 -> {
            this.historicalConnections = (HistoricalFetchConnection[]) applyBrokerMetadata(this.historicalConnections, brokerMetadata2, brokerMetadata2 -> {
                return new HistoricalFetchConnection(brokerMetadata2);
            });
        });
        intConsumer.accept(i);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v8, types: [org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool$AbstractFetchConnection[]] */
    private <T extends AbstractFetchConnection> T[] applyBrokerMetadata(T[] tArr, BrokerMetadata brokerMetadata, Function<BrokerMetadata, T> function) {
        T[] tArr2 = tArr;
        T t = null;
        int i = 0;
        while (true) {
            if (i >= tArr.length) {
                break;
            }
            T t2 = tArr[i];
            if (t2.brokerId == brokerMetadata.nodeId) {
                t = t2;
                if (!((AbstractFetchConnection) t2).host.equals(brokerMetadata.host) || ((AbstractFetchConnection) t2).port != brokerMetadata.port) {
                    t2.close();
                    t = function.apply(brokerMetadata);
                }
            } else {
                i++;
            }
        }
        if (t == null) {
            tArr2 = (AbstractFetchConnection[]) ArrayUtil.add(tArr, function.apply(brokerMetadata));
        }
        return tArr2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doFlush(long j) {
        for (AbstractFetchConnection abstractFetchConnection : this.connections) {
            abstractFetchConnection.doRequestIfNeeded();
        }
        for (HistoricalFetchConnection historicalFetchConnection : this.historicalConnections) {
            historicalFetchConnection.doRequestIfNeeded();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doDetach(int i, Long2LongHashMap long2LongHashMap) {
        Consumer consumer = (Consumer) this.detachersById.remove(i);
        if (consumer != null) {
            consumer.accept(long2LongHashMap);
        }
    }
}
