package org.reaktivity.nukleus.kafka.internal.stream;

import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.function.IntToLongFunction;
import java.util.function.LongSupplier;
import java.util.function.LongUnaryOperator;
import java.util.function.Predicate;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Long2LongHashMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.buffer.BufferPool;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.kafka.internal.KafkaConfiguration;
import org.reaktivity.nukleus.kafka.internal.KafkaCounters;
import org.reaktivity.nukleus.kafka.internal.cache.DefaultMessageCache;
import org.reaktivity.nukleus.kafka.internal.cache.ImmutableTopicCache;
import org.reaktivity.nukleus.kafka.internal.cache.MessageCache;
import org.reaktivity.nukleus.kafka.internal.function.AttachDetailsConsumer;
import org.reaktivity.nukleus.kafka.internal.function.PartitionProgressHandler;
import org.reaktivity.nukleus.kafka.internal.function.StringLongLongFunction;
import org.reaktivity.nukleus.kafka.internal.memory.MemoryManager;
import org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool;
import org.reaktivity.nukleus.kafka.internal.types.ArrayFW;
import org.reaktivity.nukleus.kafka.internal.types.Flyweight;
import org.reaktivity.nukleus.kafka.internal.types.KafkaHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.ListFW;
import org.reaktivity.nukleus.kafka.internal.types.MessageFW;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;
import org.reaktivity.nukleus.kafka.internal.types.Varint64FW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.PartitionResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.RecordSetFW;
import org.reaktivity.nukleus.kafka.internal.types.control.KafkaRouteExFW;
import org.reaktivity.nukleus.kafka.internal.types.control.RouteFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.AbortFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.DataFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.EndFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.FrameFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaBeginExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaDataExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaEndExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.WindowFW;
import org.reaktivity.nukleus.kafka.internal.util.BufferUtil;
import org.reaktivity.nukleus.kafka.internal.util.DelayedTaskScheduler;
import org.reaktivity.nukleus.kafka.internal.util.Flags;
import org.reaktivity.nukleus.route.RouteManager;
import org.reaktivity.nukleus.stream.StreamFactory;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/ClientStreamFactory.class */
public final class ClientStreamFactory implements StreamFactory {
    private static final long[] ZERO_OFFSETS;
    private static final PartitionProgressHandler NOOP_PROGRESS_HANDLER;
    private static final Runnable NOOP;
    public static final long INTERNAL_ERRORS_TO_LOG = 100;
    final RouteManager router;
    final LongSupplier supplyInitialId;
    final LongUnaryOperator supplyReplyId;
    final LongSupplier supplyTrace;
    final LongSupplier supplyCorrelationId;
    private Function<String, LongSupplier> supplyCounter;
    final BufferPool bufferPool;
    final MessageCache messageCache;
    private final MutableDirectBuffer writeBuffer;
    final DelayedTaskScheduler scheduler;
    final KafkaCounters counters;
    final Long2ObjectHashMap<NetworkConnectionPool.AbstractNetworkConnection> correlations;
    private final Map<String, Long2ObjectHashMap<NetworkConnectionPool>> connectionPools;
    private final int fetchMaxBytes;
    private final int fetchPartitionMaxBytes;
    private final boolean forceProactiveMessageCache;
    private final int readIdleTimeout;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final UnsafeBuffer keyBuffer = new UnsafeBuffer(BufferUtil.EMPTY_BYTE_ARRAY);
    private final UnsafeBuffer valueBuffer = new UnsafeBuffer(BufferUtil.EMPTY_BYTE_ARRAY);
    private final RouteFW routeRO = new RouteFW();
    final FrameFW frameRO = new FrameFW();
    final BeginFW beginRO = new BeginFW();
    final DataFW dataRO = new DataFW();
    final EndFW endRO = new EndFW();
    final AbortFW abortRO = new AbortFW();
    private final KafkaRouteExFW routeExRO = new KafkaRouteExFW();
    private final KafkaBeginExFW beginExRO = new KafkaBeginExFW();
    final WindowFW windowRO = new WindowFW();
    final ResetFW resetRO = new ResetFW();
    private final BeginFW.Builder beginRW = new BeginFW.Builder();
    private final DataFW.Builder dataRW = new DataFW.Builder();
    private final EndFW.Builder endRW = new EndFW.Builder();
    private final AbortFW.Builder abortRW = new AbortFW.Builder();
    private final KafkaDataExFW.Builder dataExRW = new KafkaDataExFW.Builder();
    private final KafkaEndExFW.Builder endExRW = new KafkaEndExFW.Builder();
    private final WindowFW.Builder windowRW = new WindowFW.Builder();
    private final ResetFW.Builder resetRW = new ResetFW.Builder();
    final PartitionResponseFW partitionResponseRO = new PartitionResponseFW();
    final RecordSetFW recordSetRO = new RecordSetFW();
    private final OctetsFW messageKeyRO = new OctetsFW();
    private final OctetsFW messageValueRO = new OctetsFW();
    private final HeadersFW headersRO = new HeadersFW();
    final BudgetManager budgetManager = new BudgetManager();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/ClientStreamFactory$ClientAcceptStream.class */
    public final class ClientAcceptStream implements MessageDispatcher {
        private static final int NO_PARTITION = -1;
        private static final int UNATTACHED = -1;
        private final MessageConsumer applicationThrottle;
        private final long applicationRouteId;
        private final long applicationId;
        private final NetworkConnectionPool networkPool;
        private final Long2LongHashMap fetchOffsets;
        private boolean subscribedByKey;
        private String applicationName;
        private long applicationCorrelationId;
        private byte[] applicationBeginExtension;
        private MessageConsumer applicationReply;
        private long applicationReplyId;
        private int applicationReplyPadding;
        private long groupId;
        private Budget budget;
        private int networkAttachId;
        private boolean compacted;
        private PartitionProgressHandler progressHandler;
        private PartitionProgressHandler poolProgressHandler;
        private MessageConsumer streamState;
        private boolean dispatchBlocked;
        int fragmentedMessageBytesWritten;
        int fragmentedMessageLength;
        long fragmentedMessageOffset;
        int fragmentedMessagePartition;
        boolean fragmentedMessageDispatched;
        private long progressStartOffset;
        private long progressEndOffset;
        private String topicName;
        private ListFW<KafkaHeaderFW> headers;
        private OctetsFW fetchKey;
        private int hashCode;
        private AttachDetailsConsumer attacher;
        private AttachDetailsConsumer detacher;
        private Runnable deferredDetach;
        private ImmutableTopicCache historicalCache;
        private final Runnable dispatchFromCacheState;
        private final Runnable dispatchFragmentedFromCacheState;
        private final Runnable dispatchFromPoolState;
        private Runnable dispatchState;
        private final Runnable dispatchUsingCurrentState;
        private final IntSupplier writeableBytes;
        static final /* synthetic */ boolean $assertionsDisabled;

        private ClientAcceptStream(MessageConsumer messageConsumer, long j, long j2, NetworkConnectionPool networkConnectionPool) {
            this.budget = BudgetManager.NO_BUDGET;
            this.networkAttachId = -1;
            this.progressHandler = ClientStreamFactory.NOOP_PROGRESS_HANDLER;
            this.fragmentedMessageOffset = -1L;
            this.fragmentedMessagePartition = -1;
            this.progressStartOffset = -1L;
            this.dispatchFromCacheState = this::dispatchMessagesFromCache;
            this.dispatchFragmentedFromCacheState = this::dispatchFragmentedMessageFromCache;
            this.dispatchFromPoolState = this::dispatchMessagesFromPool;
            this.dispatchState = ClientStreamFactory.NOOP;
            this.dispatchUsingCurrentState = this::dispatchMessages;
            this.writeableBytes = this::writeableBytes;
            this.applicationThrottle = messageConsumer;
            this.applicationRouteId = j;
            this.applicationId = j2;
            this.networkPool = networkConnectionPool;
            this.fetchOffsets = new Long2LongHashMap(-1L);
            this.streamState = this::beforeBegin;
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
        public void adjustOffset(int i, long j, long j2) {
            if (KafkaConfiguration.DEBUG) {
                System.out.format("CAS.adjustOffset: partition=%d, oldOffset = %d, newOffset = %d, this = %s\n", Integer.valueOf(i), Long.valueOf(j), Long.valueOf(j2), this);
            }
            if (this.fetchOffsets.get(i) == j) {
                this.fetchOffsets.put(i, j2);
            }
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
        public void detach(boolean z) {
            this.progressHandler = ClientStreamFactory.NOOP_PROGRESS_HANDLER;
            if (this.detacher == null || this.dispatchState != this.dispatchFromPoolState) {
                this.networkPool.doDetach(this.topicName, this.networkAttachId);
            } else {
                invoke(this.detacher);
                this.detacher = null;
            }
            this.networkAttachId = -1;
            this.budget.closing(this.applicationReplyId, 0);
            Runnable runnable = z ? this::doDeferredDetachWithReattach : this::doDeferredDetach;
            if (this.budget.hasUnackedBudget(this.applicationReplyId)) {
                this.deferredDetach = runnable;
            } else {
                runnable.run();
            }
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
        public int dispatch(int i, long j, long j2, DirectBuffer directBuffer, Function<DirectBuffer, Iterator<DirectBuffer>> function, long j3, long j4, DirectBuffer directBuffer2) {
            int i2 = 1;
            if (this.progressStartOffset == -1) {
                this.progressStartOffset = this.fetchOffsets.get(i);
                this.progressEndOffset = this.progressStartOffset;
            }
            boolean z = false;
            if (this.fragmentedMessageOffset != -1) {
                if (this.fragmentedMessagePartition != i) {
                    this.dispatchBlocked = true;
                    z = true;
                    ClientStreamFactory.this.counters.dispatchNeedOtherMessage.getAsLong();
                } else if (j2 == this.fragmentedMessageOffset) {
                    this.fragmentedMessageDispatched = true;
                    if (directBuffer2.capacity() != this.fragmentedMessageLength) {
                        if (this.networkPool.getRouteCounters().internalErrors.getAsLong() <= 100) {
                            System.out.format("Internal Error: unexpected value length for partially delivered message, partition=%d, requestOffset=%d, messageStartOffset=%d, key=%s, value=%s, %s\n", Integer.valueOf(i), Long.valueOf(j), Long.valueOf(j2), toString(directBuffer), toString(directBuffer2), this);
                        }
                        this.networkPool.getRouteCounters().forcedDetaches.getAsLong();
                        detach(false);
                    }
                } else {
                    z = true;
                    ClientStreamFactory.this.counters.dispatchNeedOtherMessage.getAsLong();
                }
            }
            if (j <= this.progressStartOffset && j2 >= this.progressStartOffset && !z) {
                int capacity = directBuffer2 == null ? 0 : directBuffer2.capacity() - this.fragmentedMessageBytesWritten;
                if (!$assertionsDisabled && capacity < 0) {
                    throw new AssertionError(String.format("fragmentedMessageBytesWritten = %d payloadLength = %d", Integer.valueOf(this.fragmentedMessageBytesWritten), Integer.valueOf(capacity)));
                }
                int budget = this.budget.getBudget() - this.applicationReplyPadding;
                if (budget > 0) {
                    int min = Math.min(capacity, budget);
                    this.budget.decBudget(this.applicationReplyId, min + this.applicationReplyPadding);
                    if (min < capacity) {
                        this.dispatchBlocked = true;
                    } else {
                        i2 = 1 | 7;
                    }
                    writeMessage(i, j2, j4, directBuffer, j3, directBuffer2, this.fragmentedMessageBytesWritten, this.fragmentedMessageBytesWritten + min);
                } else {
                    this.dispatchBlocked = true;
                    ClientStreamFactory.this.counters.dispatchNoWindow.getAsLong();
                }
                i2 |= 3;
            }
            return i2;
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
        public void flush(int i, long j, long j2) {
            long j3 = this.progressStartOffset;
            long j4 = this.progressEndOffset;
            if (j3 == -1) {
                j3 = this.fetchOffsets.get(i);
                j4 = j2;
            } else if (j <= j3 && !this.dispatchBlocked) {
                j4 = j2;
            }
            if (this.fragmentedMessageOffset != -1 && this.fragmentedMessagePartition == i && !this.fragmentedMessageDispatched && j <= this.fragmentedMessageOffset && j3 <= this.fragmentedMessageOffset && j2 > this.fragmentedMessageOffset) {
                this.networkPool.getRouteCounters().forcedDetaches.getAsLong();
                detach(false);
            }
            if (j4 > j3 && j <= j3) {
                this.progressHandler.handle(i, this.fetchOffsets.put(i, j4), j4, this);
            }
            if (this.dispatchBlocked && this.dispatchState == this.dispatchFromPoolState) {
                enterDispatchFromCacheState();
            }
            this.progressStartOffset = -1L;
            this.dispatchBlocked = false;
            this.fragmentedMessageDispatched = false;
        }

        public String toString() {
            Object[] objArr = new Object[13];
            objArr[0] = getClass().getSimpleName();
            objArr[1] = Integer.toHexString(System.identityHashCode(this));
            objArr[2] = this.topicName;
            objArr[3] = Boolean.valueOf(this.subscribedByKey);
            objArr[4] = this.fetchOffsets;
            objArr[5] = Long.valueOf(this.fragmentedMessageOffset);
            objArr[6] = Integer.valueOf(this.fragmentedMessagePartition);
            objArr[7] = Integer.valueOf(this.fragmentedMessageLength);
            objArr[8] = Integer.valueOf(this.fragmentedMessageBytesWritten);
            objArr[9] = Long.valueOf(this.applicationId);
            objArr[10] = Long.valueOf(this.applicationReplyId);
            objArr[11] = this.dispatchState == this.dispatchFragmentedFromCacheState ? "fragmentedFromCache" : this.dispatchState == this.dispatchFromCacheState ? "cache" : "pool";
            objArr[12] = this.budget;
            return String.format("%s@%s(topic=\"%s\", subscribedByKey=%b, fetchOffsets=%s, fragmentedMessageOffset=%d, fragmentedMessagePartition=%d, fragmentedMessageLength=%d, fragmentedMessageBytesWritten=%d, applicationId=%x, applicationReplyId=%x, dispatchState=%s, budget=%s)", objArr);
        }

        private void detachFromNetworkPool() {
            if (this.detacher != null && this.dispatchState == this.dispatchFromPoolState) {
                invoke(this.detacher);
            } else if (this.detacher == null) {
                this.networkPool.doDetach(this.topicName, this.networkAttachId);
            }
            this.networkAttachId = -1;
        }

        private void dispatchMessages() {
            this.dispatchState.run();
        }

        private void dispatchMessagesFromCache() {
            Iterator<ImmutableTopicCache.MessageRef> messages = this.historicalCache.getMessages(this.fetchOffsets, this.fetchKey, this.headers);
            int i = -1;
            long j = -1;
            long j2 = -1;
            while (writeableBytes() > 0 && messages.hasNext()) {
                ImmutableTopicCache.MessageRef next = messages.next();
                MessageFW message = next.message();
                int partition = next.partition();
                long offset = next.offset();
                if (j2 == -1) {
                    j2 = this.fetchOffsets.get(partition);
                }
                if (i != -1 && next.partition() != i) {
                    flush(i, j2, j);
                    j2 = this.fetchOffsets.get(partition);
                }
                if (message != null) {
                    dispatch(partition, j2, offset, BufferUtil.wrap((DirectBuffer) ClientStreamFactory.this.keyBuffer, message.key()), ClientStreamFactory.this.headersRO.wrap(message.headers()).headerSupplier(), message.timestamp(), message.traceId(), BufferUtil.wrap((DirectBuffer) ClientStreamFactory.this.valueBuffer, message.value()));
                    i = partition;
                    offset++;
                    j = offset;
                }
                if (message == null || writeableBytes() == 0) {
                    flush(partition, j2, offset);
                    j2 = -1;
                }
            }
            if (this.fragmentedMessageOffset != -1) {
                this.dispatchState = this.dispatchFragmentedFromCacheState;
            }
            if (messages.hasNext()) {
                return;
            }
            enterDispatchFromPoolState();
            if (writeableBytes() > 0) {
                this.dispatchState.run();
            }
        }

        private void dispatchFragmentedMessageFromCache() {
            if (!$assertionsDisabled && this.fragmentedMessageOffset == -1) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && this.fragmentedMessagePartition == -1) {
                throw new AssertionError();
            }
            ImmutableTopicCache.MessageRef message = this.historicalCache.getMessage(this.fragmentedMessagePartition, this.fragmentedMessageOffset);
            MessageFW message2 = message.message();
            int partition = message.partition();
            long offset = message.offset();
            if (message2 == null) {
                enterDispatchFromPoolState();
                this.dispatchState.run();
                return;
            }
            long j = this.fetchOffsets.get(partition);
            dispatch(partition, j, offset, BufferUtil.wrap((DirectBuffer) ClientStreamFactory.this.keyBuffer, message2.key()), ClientStreamFactory.this.headersRO.wrap(message2.headers()).headerSupplier(), message2.timestamp(), message2.traceId(), BufferUtil.wrap((DirectBuffer) ClientStreamFactory.this.valueBuffer, message2.value()));
            flush(partition, j, offset + 1);
            if (this.fragmentedMessageOffset == -1) {
                this.dispatchState = this.dispatchFromCacheState;
                if (writeableBytes() > 0) {
                    this.dispatchState.run();
                }
            }
        }

        private void dispatchMessagesFromPool() {
            this.networkPool.doFlush();
        }

        private void enterDispatchFromCacheState() {
            if (this.historicalCache == null || !this.historicalCache.hasMessages(this.fetchOffsets, this.fetchKey, this.headers)) {
                return;
            }
            this.dispatchState = this.fragmentedMessageOffset == -1 ? this.dispatchFromCacheState : this.dispatchFragmentedFromCacheState;
            invoke(this.detacher);
            this.progressHandler = ClientStreamFactory.NOOP_PROGRESS_HANDLER;
        }

        private void enterDispatchFromPoolState() {
            this.dispatchState = this.dispatchFromPoolState;
            invoke(this.attacher);
            this.progressHandler = this.poolProgressHandler;
        }

        private void doDeferredDetach() {
            this.budget.closed(this.applicationReplyId);
            ClientStreamFactory.this.doAbort(this.applicationReply, this.applicationRouteId, this.applicationReplyId);
        }

        private void doDeferredDetachWithReattach() {
            this.budget.closed(this.applicationReplyId);
            ClientStreamFactory.this.doEnd(this.applicationReply, this.applicationRouteId, this.applicationReplyId, ClientStreamFactory.ZERO_OFFSETS);
        }

        private String toString(DirectBuffer directBuffer) {
            return directBuffer == null ? "null" : String.format("%s(capacity=%d)", directBuffer.getClass().getSimpleName() + "@" + Integer.toHexString(System.identityHashCode(directBuffer)), Integer.valueOf(directBuffer.capacity()));
        }

        private void writeMessage(int i, long j, long j2, DirectBuffer directBuffer, long j3, DirectBuffer directBuffer2, int i2, int i3) {
            byte b = (directBuffer2 == null || directBuffer2.capacity() == i3) ? (byte) 1 : (byte) 0;
            long j4 = j + 1;
            if (i2 == 0) {
                b = (byte) (b | 2);
                long put = this.fetchOffsets.put(i, j4);
                ClientStreamFactory.this.doKafkaData(this.applicationReply, this.applicationRouteId, this.applicationReplyId, j2, this.applicationReplyPadding, b, this.compacted ? directBuffer : null, j3, directBuffer2, i3, this.fetchOffsets);
                this.fetchOffsets.put(i, put);
            } else {
                ClientStreamFactory.this.doKafkaDataContinuation(this.applicationReply, this.applicationRouteId, this.applicationReplyId, j2, this.applicationReplyPadding, b, directBuffer2, i2, i3);
            }
            if (Flags.fin(b)) {
                this.fragmentedMessageBytesWritten = 0;
                this.fragmentedMessageOffset = -1L;
                this.fragmentedMessagePartition = -1;
                this.fragmentedMessageLength = 0;
                this.progressEndOffset = j4;
                this.progressHandler.handle(i, this.fetchOffsets.put(i, j4), j4, this);
                return;
            }
            if (this.fragmentedMessagePartition == -1) {
                this.fragmentedMessageLength = directBuffer2.capacity();
            }
            this.fragmentedMessageBytesWritten += i3 - i2;
            this.fragmentedMessageOffset = j;
            this.fragmentedMessagePartition = i;
            this.fragmentedMessageDispatched = true;
            this.progressEndOffset = j;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void handleStream(int i, DirectBuffer directBuffer, int i2, int i3) {
            this.streamState.accept(i, directBuffer, i2, i3);
        }

        private void beforeBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
            if (i == 1) {
                handleBegin(ClientStreamFactory.this.beginRO.wrap(directBuffer, i2, i2 + i3));
            } else {
                ClientStreamFactory.this.doReset(this.applicationThrottle, this.applicationRouteId, this.applicationId);
            }
        }

        private void afterBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 2:
                    ClientStreamFactory.this.doReset(this.applicationThrottle, this.applicationRouteId, this.applicationId);
                    detachFromNetworkPool();
                    return;
                case 3:
                    return;
                case 4:
                    detach(false);
                    return;
                default:
                    ClientStreamFactory.this.doReset(this.applicationThrottle, this.applicationRouteId, this.applicationId);
                    return;
            }
        }

        private void handleBegin(BeginFW beginFW) {
            this.applicationName = beginFW.source().asString();
            this.applicationCorrelationId = beginFW.correlationId();
            OctetsFW extension = beginFW.extension();
            if (extension.sizeof() == 0) {
                ClientStreamFactory.this.doReset(this.applicationThrottle, this.applicationRouteId, this.applicationId);
                return;
            }
            this.applicationBeginExtension = new byte[extension.sizeof()];
            extension.buffer().getBytes(extension.offset(), this.applicationBeginExtension);
            KafkaBeginExFW kafkaBeginExFW = ClientStreamFactory.this.beginExRO;
            Objects.requireNonNull(kafkaBeginExFW);
            KafkaBeginExFW kafkaBeginExFW2 = (KafkaBeginExFW) extension.get(kafkaBeginExFW::wrap);
            this.topicName = kafkaBeginExFW2.topicName().asString();
            ArrayFW<Varint64FW> fetchOffsets = kafkaBeginExFW2.fetchOffsets();
            this.fetchOffsets.clear();
            fetchOffsets.forEach(varint64FW -> {
                this.fetchOffsets.put(this.fetchOffsets.size(), varint64FW.value());
            });
            OctetsFW fetchKey = kafkaBeginExFW2.fetchKey();
            byte fetchKeyHashCount = kafkaBeginExFW2.fetchKeyHashCount();
            if ((fetchKey != null && this.fetchOffsets.size() > 1) || fetchKeyHashCount > 1 || (fetchKeyHashCount == 1 && fetchKey == null)) {
                ClientStreamFactory.this.doReset(this.applicationThrottle, this.applicationRouteId, this.applicationId);
                return;
            }
            ListFW<KafkaHeaderFW> headers = kafkaBeginExFW2.headers();
            if (headers != null && headers.sizeof() > 0) {
                DirectBuffer unsafeBuffer = new UnsafeBuffer(new byte[headers.sizeof()]);
                unsafeBuffer.putBytes(0, headers.buffer(), headers.offset(), headers.sizeof());
                this.headers = new ListFW(new KafkaHeaderFW()).wrap(unsafeBuffer, 0, unsafeBuffer.capacity());
            }
            if (fetchKey != null) {
                this.subscribedByKey = true;
                DirectBuffer unsafeBuffer2 = new UnsafeBuffer(new byte[fetchKey.sizeof()]);
                unsafeBuffer2.putBytes(0, fetchKey.buffer(), fetchKey.offset(), fetchKey.sizeof());
                this.fetchKey = new OctetsFW().wrap(unsafeBuffer2, 0, unsafeBuffer2.capacity());
                this.hashCode = fetchKeyHashCount == 1 ? kafkaBeginExFW2.fetchKeyHash().nextInt() : BufferUtil.defaultHashCode(fetchKey.buffer(), fetchKey.offset(), fetchKey.limit());
            }
            this.networkAttachId = this.networkPool.doAttach(this.topicName, this::onAttachPrepared, this::onMetadataError);
            this.streamState = this::afterBegin;
            long applyAsLong = ClientStreamFactory.this.supplyReplyId.applyAsLong(this.applicationId);
            MessageConsumer supplyTarget = ClientStreamFactory.this.router.supplyTarget(this.applicationName);
            ClientStreamFactory.this.doKafkaBegin(supplyTarget, this.applicationRouteId, applyAsLong, 0L, this.applicationCorrelationId, this.applicationBeginExtension);
            ClientStreamFactory.this.router.setThrottle(this.applicationName, applyAsLong, this::handleThrottle);
            ClientStreamFactory.this.doWindow(this.applicationThrottle, this.applicationRouteId, this.applicationId, 0, 0, 0L);
            this.applicationReply = supplyTarget;
            this.applicationReplyId = applyAsLong;
        }

        private void onAttachPrepared(AttachDetailsConsumer attachDetailsConsumer, AttachDetailsConsumer attachDetailsConsumer2, PartitionProgressHandler partitionProgressHandler, ImmutableTopicCache immutableTopicCache, int i, boolean z, IntToLongFunction intToLongFunction) {
            this.compacted = z;
            this.attacher = attachDetailsConsumer;
            this.detacher = attachDetailsConsumer2;
            if (immutableTopicCache != null) {
                this.historicalCache = immutableTopicCache;
                this.poolProgressHandler = partitionProgressHandler;
                this.dispatchState = this.dispatchFromCacheState;
            } else {
                this.dispatchState = this.dispatchFromPoolState;
                invoke(attachDetailsConsumer);
                this.progressHandler = partitionProgressHandler;
            }
            long j = (!this.fetchOffsets.isEmpty() || z) ? 0L : Long.MAX_VALUE;
            if (this.fetchKey != null) {
                int partition = BufferUtil.partition(this.hashCode, i);
                long max = Math.max(this.fetchOffsets.computeIfAbsent(0L, j2 -> {
                    return j;
                }), intToLongFunction.applyAsLong(partition));
                if (partition != 0) {
                    this.fetchOffsets.remove(0L);
                }
                this.fetchOffsets.put(partition, max);
            } else {
                for (int i2 = 0; i2 < i; i2++) {
                    this.fetchOffsets.put(i2, Math.max(this.fetchOffsets.computeIfAbsent(i2, j3 -> {
                        return j;
                    }), intToLongFunction.applyAsLong(i2)));
                }
            }
            if (writeableBytes() > 0) {
                this.dispatchState.run();
            }
        }

        private void invoke(AttachDetailsConsumer attachDetailsConsumer) {
            attachDetailsConsumer.apply(this.fetchOffsets, this.fetchKey, this.headers, this, this.writeableBytes);
        }

        private void onMetadataError(KafkaError kafkaError) {
            ClientStreamFactory.this.doReset(this.applicationThrottle, this.applicationRouteId, this.applicationId);
        }

        private void handleThrottle(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1073741825:
                    handleReset(ClientStreamFactory.this.resetRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741826:
                    handleWindow(ClientStreamFactory.this.windowRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                default:
                    return;
            }
        }

        private void handleWindow(WindowFW windowFW) {
            this.applicationReplyPadding = windowFW.padding();
            this.groupId = windowFW.groupId();
            if (this.budget == BudgetManager.NO_BUDGET) {
                this.budget = ClientStreamFactory.this.budgetManager.createBudget(this.groupId);
            }
            this.budget.incBudget(this.applicationReplyId, windowFW.credit(), this.dispatchUsingCurrentState);
            if (this.deferredDetach == null || this.budget.hasUnackedBudget(this.applicationReplyId)) {
                return;
            }
            this.deferredDetach.run();
        }

        private void handleReset(ResetFW resetFW) {
            ClientStreamFactory.this.doReset(this.applicationThrottle, this.applicationRouteId, this.applicationId);
            this.progressHandler = ClientStreamFactory.NOOP_PROGRESS_HANDLER;
            detachFromNetworkPool();
            this.networkAttachId = -1;
            this.budget.closed(this.applicationReplyId);
        }

        private int writeableBytes() {
            return Math.max(0, this.budget.getBudget() - this.applicationReplyPadding);
        }

        static {
            $assertionsDisabled = !ClientStreamFactory.class.desiredAssertionStatus();
        }
    }

    public ClientStreamFactory(KafkaConfiguration kafkaConfiguration, RouteManager routeManager, MutableDirectBuffer mutableDirectBuffer, BufferPool bufferPool, MemoryManager memoryManager, LongSupplier longSupplier, LongUnaryOperator longUnaryOperator, LongSupplier longSupplier2, LongSupplier longSupplier3, Function<String, LongSupplier> function, Long2ObjectHashMap<NetworkConnectionPool.AbstractNetworkConnection> long2ObjectHashMap, Map<String, Long2ObjectHashMap<NetworkConnectionPool>> map, Consumer<StringLongLongFunction<NetworkConnectionPool>> consumer, DelayedTaskScheduler delayedTaskScheduler, KafkaCounters kafkaCounters) {
        this.fetchMaxBytes = kafkaConfiguration.fetchMaxBytes();
        this.fetchPartitionMaxBytes = kafkaConfiguration.fetchPartitionMaxBytes();
        this.forceProactiveMessageCache = kafkaConfiguration.messageCacheProactive();
        this.readIdleTimeout = kafkaConfiguration.readIdleTimeout();
        this.router = (RouteManager) Objects.requireNonNull(routeManager);
        this.writeBuffer = (MutableDirectBuffer) Objects.requireNonNull(mutableDirectBuffer);
        this.bufferPool = (BufferPool) Objects.requireNonNull(bufferPool);
        this.messageCache = new DefaultMessageCache((MemoryManager) Objects.requireNonNull(memoryManager));
        this.supplyInitialId = (LongSupplier) Objects.requireNonNull(longSupplier);
        this.supplyReplyId = (LongUnaryOperator) Objects.requireNonNull(longUnaryOperator);
        this.supplyTrace = (LongSupplier) Objects.requireNonNull(longSupplier2);
        this.supplyCorrelationId = longSupplier3;
        this.supplyCounter = (Function) Objects.requireNonNull(function);
        this.correlations = (Long2ObjectHashMap) Objects.requireNonNull(long2ObjectHashMap);
        this.connectionPools = map;
        consumer.accept((str, j, j2) -> {
            return new NetworkConnectionPool(this, j, str, j2, this.fetchMaxBytes, this.fetchPartitionMaxBytes, bufferPool, this.messageCache, function, this.forceProactiveMessageCache, this.readIdleTimeout);
        });
        this.scheduler = delayedTaskScheduler;
        this.counters = kafkaCounters;
    }

    public MessageConsumer newStream(int i, DirectBuffer directBuffer, int i2, int i3, MessageConsumer messageConsumer) {
        BeginFW wrap = this.beginRO.wrap(directBuffer, i2, i2 + i3);
        return wrap.sourceRef() == 0 ? newConnectReplyStream(wrap, messageConsumer) : newAcceptStream(wrap, messageConsumer);
    }

    private MessageConsumer newAcceptStream(BeginFW beginFW, MessageConsumer messageConsumer) {
        long authorization = beginFW.authorization();
        long sourceRef = beginFW.sourceRef();
        OctetsFW extension = this.beginRO.extension();
        MessageConsumer messageConsumer2 = null;
        if (extension.sizeof() > 0) {
            KafkaBeginExFW kafkaBeginExFW = this.beginExRO;
            Objects.requireNonNull(kafkaBeginExFW);
            KafkaBeginExFW kafkaBeginExFW2 = (KafkaBeginExFW) extension.get(kafkaBeginExFW::wrap);
            RouteFW resolveRoute = resolveRoute(authorization, sourceRef, kafkaBeginExFW2.topicName().asString(), kafkaBeginExFW2.headers());
            if (resolveRoute != null) {
                long routeId = beginFW.routeId();
                long streamId = beginFW.streamId();
                long correlationId = resolveRoute.correlationId();
                String asString = resolveRoute.target().asString();
                ClientAcceptStream clientAcceptStream = new ClientAcceptStream(messageConsumer, routeId, streamId, (NetworkConnectionPool) this.connectionPools.computeIfAbsent(asString, this::newConnectionPoolsByRef).computeIfAbsent(resolveRoute.targetRef(), j -> {
                    return new NetworkConnectionPool(this, correlationId, asString, j, this.fetchMaxBytes, this.fetchPartitionMaxBytes, this.bufferPool, this.messageCache, this.supplyCounter, this.forceProactiveMessageCache, this.readIdleTimeout);
                }));
                messageConsumer2 = (i, directBuffer, i2, i3) -> {
                    clientAcceptStream.handleStream(i, directBuffer, i2, i3);
                };
            }
        }
        return messageConsumer2;
    }

    private Long2ObjectHashMap<NetworkConnectionPool> newConnectionPoolsByRef(String str) {
        return new Long2ObjectHashMap<>();
    }

    private MessageConsumer newConnectReplyStream(BeginFW beginFW, MessageConsumer messageConsumer) {
        NetworkConnectionPool.AbstractNetworkConnection abstractNetworkConnection;
        long streamId = beginFW.streamId();
        long sourceRef = beginFW.sourceRef();
        long correlationId = beginFW.correlationId();
        MessageConsumer messageConsumer2 = null;
        if (sourceRef == 0 && (abstractNetworkConnection = (NetworkConnectionPool.AbstractNetworkConnection) this.correlations.remove(correlationId)) != null) {
            messageConsumer2 = abstractNetworkConnection.onCorrelated(messageConsumer, streamId);
        }
        return messageConsumer2;
    }

    private RouteFW resolveRoute(long j, long j2, String str, ListFW<KafkaHeaderFW> listFW) {
        return (RouteFW) this.router.resolve(j, (i, directBuffer, i2, i3) -> {
            RouteFW wrap = this.routeRO.wrap(directBuffer, i2, i2 + i3);
            OctetsFW extension = wrap.extension();
            Predicate predicate = str2 -> {
                return true;
            };
            Predicate predicate2 = listFW2 -> {
                return true;
            };
            if (extension.sizeof() > 0) {
                KafkaRouteExFW kafkaRouteExFW = this.routeExRO;
                Objects.requireNonNull(kafkaRouteExFW);
                KafkaRouteExFW kafkaRouteExFW2 = (KafkaRouteExFW) extension.get(kafkaRouteExFW::wrap);
                String asString = kafkaRouteExFW2.topicName().asString();
                Objects.requireNonNull(asString);
                predicate = (v1) -> {
                    return r0.equals(v1);
                };
                if (kafkaRouteExFW2.headers().sizeof() > 0) {
                    predicate2 = listFW3 -> {
                        return !kafkaRouteExFW2.headers().anyMatch(kafkaHeaderFW -> {
                            return !listFW3.anyMatch(kafkaHeaderFW -> {
                                return BufferUtil.matches(kafkaHeaderFW.key(), kafkaHeaderFW.key()) && BufferUtil.matches(kafkaHeaderFW.value(), kafkaHeaderFW.value());
                            });
                        });
                    };
                }
            }
            return wrap.sourceRef() == j2 && predicate.test(str) && predicate2.test(listFW);
        }, this::wrapRoute);
    }

    private RouteFW wrapRoute(int i, DirectBuffer directBuffer, int i2, int i3) {
        if ($assertionsDisabled || i == 1) {
            return this.routeRO.wrap(directBuffer, i2, i2 + i3);
        }
        throw new AssertionError();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW$Builder] */
    public void doBegin(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, Flyweight.Builder.Visitor visitor) {
        BeginFW build = this.beginRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).trace(this.supplyTrace.getAsLong()).source("kafka").sourceRef(j3).correlationId(j4).extension(builder -> {
            builder.set(visitor);
        }).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.DataFW$Builder] */
    public void doData(MessageConsumer messageConsumer, long j, long j2, int i, OctetsFW octetsFW) {
        DataFW build = this.dataRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).trace(this.supplyTrace.getAsLong()).groupId(0L).padding(i).payload(builder -> {
            builder.set(octetsFW.buffer(), octetsFW.offset(), octetsFW.sizeof());
        }).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.EndFW$Builder] */
    public void doEnd(MessageConsumer messageConsumer, long j, long j2, long[] jArr) {
        EndFW build = this.endRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).trace(this.supplyTrace.getAsLong()).extension(builder -> {
            builder.set(visitKafkaEndEx(jArr));
        }).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    private Flyweight.Builder.Visitor visitKafkaEndEx(long[] jArr) {
        return (mutableDirectBuffer, i, i2) -> {
            return this.endExRW.wrap2(mutableDirectBuffer, i, i2).fetchOffsets(builder -> {
                if (jArr != null) {
                    for (long j : jArr) {
                        builder.item(builder -> {
                            builder.set(j);
                        });
                    }
                }
            }).build().sizeof();
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.AbortFW$Builder] */
    public void doAbort(MessageConsumer messageConsumer, long j, long j2) {
        AbortFW build = this.abortRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).trace(this.supplyTrace.getAsLong()).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.WindowFW$Builder] */
    public void doWindow(MessageConsumer messageConsumer, long j, long j2, int i, int i2, long j3) {
        WindowFW build = this.windowRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).trace(this.supplyTrace.getAsLong()).credit(i).padding(i2).groupId(j3).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.ResetFW$Builder] */
    public void doReset(MessageConsumer messageConsumer, long j, long j2) {
        ResetFW build = this.resetRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).trace(this.supplyTrace.getAsLong()).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW$Builder] */
    public void doKafkaBegin(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, byte[] bArr) {
        BeginFW build = this.beginRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).trace(this.supplyTrace.getAsLong()).source("kafka").sourceRef(j3).correlationId(j4).extension(builder -> {
            builder.set(bArr);
        }).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r0v12, types: [org.reaktivity.nukleus.kafka.internal.types.stream.DataFW$Builder] */
    public void doKafkaData(MessageConsumer messageConsumer, long j, long j2, long j3, int i, byte b, DirectBuffer directBuffer, long j4, DirectBuffer directBuffer2, int i2, Long2LongHashMap long2LongHashMap) {
        OctetsFW wrap = directBuffer == null ? null : this.messageKeyRO.wrap(directBuffer, 0, directBuffer.capacity());
        DataFW build = this.dataRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).trace(j3).flags(b).groupId(0L).padding(i).payload(directBuffer2 == null ? null : this.messageValueRO.wrap(directBuffer2, 0, i2)).extension(builder -> {
            builder.set(visitKafkaDataEx(j4, long2LongHashMap, wrap));
        }).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r0v7, types: [org.reaktivity.nukleus.kafka.internal.types.stream.DataFW$Builder] */
    public void doKafkaDataContinuation(MessageConsumer messageConsumer, long j, long j2, long j3, int i, byte b, DirectBuffer directBuffer, int i2, int i3) {
        DataFW build = this.dataRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).trace(j3).flags(b).groupId(0L).padding(i).payload(directBuffer == null ? null : this.messageValueRO.wrap(directBuffer, i2, i3)).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    private Flyweight.Builder.Visitor visitKafkaDataEx(long j, Long2LongHashMap long2LongHashMap, OctetsFW octetsFW) {
        return (mutableDirectBuffer, i, i2) -> {
            return this.dataExRW.wrap2(mutableDirectBuffer, i, i2).timestamp(j).fetchOffsets(builder -> {
                if (long2LongHashMap.size() == 1) {
                    long nextValue = long2LongHashMap.values().iterator().nextValue();
                    builder.item(builder -> {
                        builder.set(nextValue);
                    });
                    return;
                }
                int i = -1;
                while (true) {
                    i++;
                    if (long2LongHashMap.get(i) == long2LongHashMap.missingValue()) {
                        return;
                    }
                    long j2 = long2LongHashMap.get(i);
                    builder.item(builder2 -> {
                        builder2.set(j2);
                    });
                }
            }).messageKey(octetsFW).build().sizeof();
        };
    }

    static {
        $assertionsDisabled = !ClientStreamFactory.class.desiredAssertionStatus();
        ZERO_OFFSETS = new long[]{0};
        NOOP_PROGRESS_HANDLER = (i, j, j2, messageDispatcher) -> {
        };
        NOOP = () -> {
        };
    }
}
