package org.reaktivity.nukleus.kafka.internal.stream;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.function.LongUnaryOperator;
import java.util.function.ToIntFunction;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.budget.BudgetDebitor;
import org.reaktivity.nukleus.buffer.BufferPool;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.function.MessageFunction;
import org.reaktivity.nukleus.kafka.internal.KafkaConfiguration;
import org.reaktivity.nukleus.kafka.internal.KafkaNukleus;
import org.reaktivity.nukleus.kafka.internal.cache.KafkaCache;
import org.reaktivity.nukleus.kafka.internal.cache.KafkaCacheCursorFactory;
import org.reaktivity.nukleus.kafka.internal.cache.KafkaCachePartition;
import org.reaktivity.nukleus.kafka.internal.types.ArrayFW;
import org.reaktivity.nukleus.kafka.internal.types.KafkaDeltaType;
import org.reaktivity.nukleus.kafka.internal.types.KafkaFilterFW;
import org.reaktivity.nukleus.kafka.internal.types.KafkaHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.KafkaKeyFW;
import org.reaktivity.nukleus.kafka.internal.types.KafkaOffsetFW;
import org.reaktivity.nukleus.kafka.internal.types.KafkaOffsetType;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;
import org.reaktivity.nukleus.kafka.internal.types.String16FW;
import org.reaktivity.nukleus.kafka.internal.types.cache.KafkaCacheEntryFW;
import org.reaktivity.nukleus.kafka.internal.types.control.KafkaRouteExFW;
import org.reaktivity.nukleus.kafka.internal.types.control.RouteFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.AbortFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.DataFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.EndFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.ExtensionFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.FlushFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaBeginExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaDataExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaFetchBeginExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaFlushExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.WindowFW;
import org.reaktivity.nukleus.route.RouteManager;
import org.reaktivity.nukleus.stream.StreamFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:57)
    */
/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/KafkaCacheClientFetchFactory.class */
public final class KafkaCacheClientFetchFactory implements StreamFactory {
    private static final Consumer<OctetsFW.Builder> EMPTY_EXTENSION;
    private static final long OFFSET_LATEST;
    private static final long OFFSET_EARLIEST;
    private static final long OFFSET_MAXIMUM = Long.MAX_VALUE;
    private static final int FLAG_FIN = 1;
    private static final int FLAG_INIT = 2;
    private static final int FLAG_NONE = 0;
    private final RouteFW routeRO = new RouteFW();
    private final KafkaRouteExFW routeExRO = new KafkaRouteExFW();
    private final BeginFW beginRO = new BeginFW();
    private final FlushFW flushRO = new FlushFW();
    private final EndFW endRO = new EndFW();
    private final AbortFW abortRO = new AbortFW();
    private final ResetFW resetRO = new ResetFW();
    private final WindowFW windowRO = new WindowFW();
    private final ExtensionFW extensionRO = new ExtensionFW();
    private final KafkaBeginExFW kafkaBeginExRO = new KafkaBeginExFW();
    private final KafkaFlushExFW kafkaFlushExRO = new KafkaFlushExFW();
    private final BeginFW.Builder beginRW = new BeginFW.Builder();
    private final DataFW.Builder dataRW = new DataFW.Builder();
    private final EndFW.Builder endRW = new EndFW.Builder();
    private final AbortFW.Builder abortRW = new AbortFW.Builder();
    private final ResetFW.Builder resetRW = new ResetFW.Builder();
    private final WindowFW.Builder windowRW = new WindowFW.Builder();
    private final KafkaBeginExFW.Builder kafkaBeginExRW = new KafkaBeginExFW.Builder();
    private final KafkaDataExFW.Builder kafkaDataExRW = new KafkaDataExFW.Builder();
    private final MessageFunction<RouteFW> wrapRoute = (i, directBuffer, i2, i3) -> {
        return this.routeRO.wrap(directBuffer, i2, i2 + i3);
    };
    private final OctetsFW valueFragmentRO = new OctetsFW();
    private final KafkaCacheEntryFW entryRO = new KafkaCacheEntryFW();
    private final int kafkaTypeId;
    private final RouteManager router;
    private final MutableDirectBuffer writeBuffer;
    private final BufferPool bufferPool;
    private final LongUnaryOperator supplyInitialId;
    private final LongUnaryOperator supplyReplyId;
    private final LongFunction<BudgetDebitor> supplyDebitor;
    private final Function<String, KafkaCache> supplyCache;
    private final LongFunction<KafkaCacheRoute> supplyCacheRoute;
    private final Long2ObjectHashMap<MessageConsumer> correlations;
    private final KafkaCacheCursorFactory cursorFactory;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/KafkaCacheClientFetchFactory$KafkaCacheClientFetchFanout.class */
    public final class KafkaCacheClientFetchFanout {
        private final long routeId;
        private final long authorization;
        private final KafkaCachePartition partition;
        private final List<KafkaCacheClientFetchStream> members;
        private long affinity;
        private long initialId;
        private long replyId;
        private MessageConsumer receiver;
        private int state;
        private long partitionOffset;
        static final /* synthetic */ boolean $assertionsDisabled;

        private KafkaCacheClientFetchFanout(long j, long j2, long j3, KafkaCachePartition kafkaCachePartition) {
            this.routeId = j;
            this.authorization = j2;
            this.partition = kafkaCachePartition;
            this.partitionOffset = KafkaCacheClientFetchFactory.OFFSET_MAXIMUM;
            this.members = new ArrayList();
            this.affinity = j3;
        }

        public void onClientFanoutMemberOpening(long j, KafkaCacheClientFetchStream kafkaCacheClientFetchStream) {
            this.members.add(kafkaCacheClientFetchStream);
            if (!$assertionsDisabled && this.members.isEmpty()) {
                throw new AssertionError();
            }
            doClientFanoutInitialBeginIfNecessary(j);
            if (KafkaState.initialOpened(this.state)) {
                kafkaCacheClientFetchStream.doClientInitialWindowIfNecessary(j, 0L, 0, 0);
            }
            if (isFanoutReplyOpened()) {
                kafkaCacheClientFetchStream.doClientReplyBeginIfNecessary(j);
            }
        }

        public boolean isFanoutReplyOpened() {
            return KafkaState.replyOpened(this.state);
        }

        public void onClientFanoutMemberClosed(long j, KafkaCacheClientFetchStream kafkaCacheClientFetchStream) {
            this.members.remove(kafkaCacheClientFetchStream);
            if (this.members.isEmpty()) {
                KafkaCacheClientFetchFactory.this.correlations.remove(this.replyId);
                doClientFanoutInitialAbortIfNecessary(j);
                doClientFanoutReplyResetIfNecessary(j);
            }
        }

        private void doClientFanoutInitialBeginIfNecessary(long j) {
            if (KafkaState.initialClosed(this.state) && KafkaState.replyClosed(this.state)) {
                this.state = 0;
            }
            if (KafkaState.initialOpening(this.state)) {
                return;
            }
            if (this.partitionOffset == KafkaCacheClientFetchFactory.OFFSET_MAXIMUM) {
                this.members.forEach(this::setMinimumPartitionOffset);
                if (this.partitionOffset == KafkaCacheClientFetchFactory.OFFSET_MAXIMUM) {
                    this.partitionOffset = KafkaCacheClientFetchFactory.OFFSET_LATEST;
                }
            }
            doClientFanoutInitialBegin(j);
        }

        private void setMinimumPartitionOffset(KafkaCacheClientFetchStream kafkaCacheClientFetchStream) {
            if (kafkaCacheClientFetchStream.partitionOffset >= this.partitionOffset || kafkaCacheClientFetchStream.partitionOffset == KafkaCacheClientFetchFactory.OFFSET_LATEST) {
                return;
            }
            this.partitionOffset = kafkaCacheClientFetchStream.partitionOffset;
        }

        private void doClientFanoutInitialBegin(long j) {
            if (!$assertionsDisabled && this.state != 0) {
                throw new AssertionError();
            }
            this.initialId = KafkaCacheClientFetchFactory.this.supplyInitialId.applyAsLong(this.routeId);
            this.replyId = KafkaCacheClientFetchFactory.this.supplyReplyId.applyAsLong(this.initialId);
            this.receiver = KafkaCacheClientFetchFactory.this.router.supplyReceiver(this.initialId);
            KafkaCacheClientFetchFactory.this.correlations.put(this.replyId, this::onClientFanoutMessage);
            KafkaCacheClientFetchFactory.this.router.setThrottle(this.initialId, this::onClientFanoutMessage);
            KafkaCacheClientFetchFactory.this.doBegin(this.receiver, this.routeId, this.initialId, j, this.authorization, this.affinity, builder -> {
                builder.set((mutableDirectBuffer, i, i2) -> {
                    return KafkaCacheClientFetchFactory.this.kafkaBeginExRW.wrap2(mutableDirectBuffer, i, i2).typeId(KafkaCacheClientFetchFactory.this.kafkaTypeId).fetch(builder -> {
                        builder.topic(this.partition.topic()).partition(builder -> {
                            builder.partitionId(this.partition.id()).partitionOffset(this.partitionOffset);
                        }).deltaType(builder2 -> {
                            builder2.set(KafkaDeltaType.NONE);
                        });
                    }).build().sizeof();
                });
            });
            this.state = KafkaState.openingInitial(this.state);
        }

        private void doClientFanoutInitialAbortIfNecessary(long j) {
            if (KafkaState.initialClosed(this.state)) {
                return;
            }
            doClientFanoutInitialAbort(j);
        }

        private void doClientFanoutInitialAbort(long j) {
            KafkaCacheClientFetchFactory.this.doAbort(this.receiver, this.routeId, this.initialId, j, this.authorization, KafkaCacheClientFetchFactory.EMPTY_EXTENSION);
            this.state = KafkaState.closedInitial(this.state);
        }

        private void onClientFanoutMessage(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1:
                    onClientFanoutReplyBegin(KafkaCacheClientFetchFactory.this.beginRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 3:
                    onClientFanoutReplyEnd(KafkaCacheClientFetchFactory.this.endRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 4:
                    onClientFanoutReplyAbort(KafkaCacheClientFetchFactory.this.abortRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 5:
                    onClientFanoutReplyFlush(KafkaCacheClientFetchFactory.this.flushRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741825:
                    onClientFanoutInitialReset(KafkaCacheClientFetchFactory.this.resetRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741826:
                    onClientFanoutInitialWindow(KafkaCacheClientFetchFactory.this.windowRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                default:
                    return;
            }
        }

        private void onClientFanoutReplyBegin(BeginFW beginFW) {
            long traceId = beginFW.traceId();
            OctetsFW extension = beginFW.extension();
            ExtensionFW extensionFW = KafkaCacheClientFetchFactory.this.extensionRO;
            Objects.requireNonNull(extensionFW);
            ExtensionFW extensionFW2 = (ExtensionFW) extension.get(extensionFW::tryWrap);
            if (!$assertionsDisabled && (extensionFW2 == null || extensionFW2.typeId() != KafkaCacheClientFetchFactory.this.kafkaTypeId)) {
                throw new AssertionError();
            }
            KafkaBeginExFW kafkaBeginExFW = KafkaCacheClientFetchFactory.this.kafkaBeginExRO;
            Objects.requireNonNull(kafkaBeginExFW);
            KafkaBeginExFW kafkaBeginExFW2 = (KafkaBeginExFW) extension.get(kafkaBeginExFW::wrap);
            if (!$assertionsDisabled && kafkaBeginExFW2.kind() != 1) {
                throw new AssertionError();
            }
            KafkaOffsetFW partition = kafkaBeginExFW2.fetch().partition();
            int partitionId = partition.partitionId();
            long partitionOffset = partition.partitionOffset();
            this.state = KafkaState.openedReply(this.state);
            if (!$assertionsDisabled && partitionId != this.partition.id()) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && (partitionOffset < 0 || partitionOffset < this.partitionOffset)) {
                throw new AssertionError();
            }
            this.partitionOffset = partitionOffset;
            this.members.forEach(kafkaCacheClientFetchStream -> {
                kafkaCacheClientFetchStream.doClientReplyBeginIfNecessary(traceId);
            });
            doClientFanoutReplyWindow(traceId, KafkaCacheClientFetchFactory.this.bufferPool.slotCapacity());
        }

        private void onClientFanoutReplyFlush(FlushFW flushFW) {
            long traceId = flushFW.traceId();
            int reserved = flushFW.reserved();
            OctetsFW extension = flushFW.extension();
            KafkaFlushExFW kafkaFlushExFW = KafkaCacheClientFetchFactory.this.kafkaFlushExRO;
            Objects.requireNonNull(kafkaFlushExFW);
            long partitionOffset = ((KafkaFlushExFW) extension.get(kafkaFlushExFW::wrap)).fetch().partition().partitionOffset();
            if (!$assertionsDisabled && partitionOffset < this.partitionOffset) {
                throw new AssertionError();
            }
            this.partitionOffset = partitionOffset;
            this.members.forEach(kafkaCacheClientFetchStream -> {
                kafkaCacheClientFetchStream.doClientReplyDataIfNecessary(traceId);
            });
            doClientFanoutReplyWindow(traceId, reserved);
        }

        private void onClientFanoutReplyEnd(EndFW endFW) {
            long traceId = endFW.traceId();
            this.members.forEach(kafkaCacheClientFetchStream -> {
                kafkaCacheClientFetchStream.doClientReplyEndIfNecessary(traceId);
            });
            this.state = KafkaState.closedReply(this.state);
        }

        private void onClientFanoutReplyAbort(AbortFW abortFW) {
            long traceId = abortFW.traceId();
            this.members.forEach(kafkaCacheClientFetchStream -> {
                kafkaCacheClientFetchStream.doClientReplyAbortIfNecessary(traceId);
            });
            this.state = KafkaState.closedReply(this.state);
        }

        private void onClientFanoutInitialReset(ResetFW resetFW) {
            long traceId = resetFW.traceId();
            this.members.forEach(kafkaCacheClientFetchStream -> {
                kafkaCacheClientFetchStream.doInitialResetIfNecessary(traceId);
            });
            this.state = KafkaState.closedInitial(this.state);
            doClientFanoutReplyResetIfNecessary(traceId);
        }

        private void onClientFanoutInitialWindow(WindowFW windowFW) {
            if (KafkaState.initialOpened(this.state)) {
                return;
            }
            long traceId = windowFW.traceId();
            this.state = KafkaState.openedInitial(this.state);
            this.members.forEach(kafkaCacheClientFetchStream -> {
                kafkaCacheClientFetchStream.doClientInitialWindowIfNecessary(traceId, 0L, 0, 0);
            });
        }

        private void doClientFanoutReplyResetIfNecessary(long j) {
            if (KafkaState.replyClosed(this.state)) {
                return;
            }
            doClientFanoutReplyReset(j);
        }

        private void doClientFanoutReplyReset(long j) {
            this.state = KafkaState.closedReply(this.state);
            KafkaCacheClientFetchFactory.this.doReset(this.receiver, this.routeId, this.replyId, j, this.authorization);
        }

        private void doClientFanoutReplyWindow(long j, int i) {
            this.state = KafkaState.openedReply(this.state);
            KafkaCacheClientFetchFactory.this.doWindow(this.receiver, this.routeId, this.replyId, j, this.authorization, 0L, i, 0);
        }

        /* synthetic */ KafkaCacheClientFetchFanout(KafkaCacheClientFetchFactory kafkaCacheClientFetchFactory, long j, long j2, long j3, KafkaCachePartition kafkaCachePartition, AnonymousClass1 anonymousClass1) {
            this(j, j2, j3, kafkaCachePartition);
        }

        /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.reaktivity.nukleus.kafka.internal.stream.KafkaCacheClientFetchFactory.KafkaCacheClientFetchFanout.access$102(org.reaktivity.nukleus.kafka.internal.stream.KafkaCacheClientFetchFactory$KafkaCacheClientFetchFanout, long):long
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        static /* synthetic */ long access$102(org.reaktivity.nukleus.kafka.internal.stream.KafkaCacheClientFetchFactory.KafkaCacheClientFetchFanout r6, long r7) {
            /*
                r0 = r6
                r1 = r7
                // decode failed: arraycopy: source index -1 out of bounds for object array[6]
                r0.affinity = r1
                return r-1
            */
            throw new UnsupportedOperationException("Method not decompiled: org.reaktivity.nukleus.kafka.internal.stream.KafkaCacheClientFetchFactory.KafkaCacheClientFetchFanout.access$102(org.reaktivity.nukleus.kafka.internal.stream.KafkaCacheClientFetchFactory$KafkaCacheClientFetchFanout, long):long");
        }

        static {
            $assertionsDisabled = !KafkaCacheClientFetchFactory.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/KafkaCacheClientFetchFactory$KafkaCacheClientFetchStream.class */
    public final class KafkaCacheClientFetchStream {
        private final KafkaCacheClientFetchFanout group;
        private final MessageConsumer sender;
        private final long routeId;
        private final long initialId;
        private final long replyId;
        private final long affinity;
        private final long authorization;
        private final KafkaCacheCursorFactory.KafkaCacheCursor cursor;
        private final KafkaDeltaType deltaType;
        private int state;
        private long replyDebitorIndex = -1;
        private BudgetDebitor replyDebitor;
        private long replyBudgetId;
        private int replyBudget;
        private int replyPadding;
        private long partitionOffset;
        private int messageOffset;
        static final /* synthetic */ boolean $assertionsDisabled;
        final /* synthetic */ KafkaCacheClientFetchFactory this$0;

        KafkaCacheClientFetchStream(KafkaCacheClientFetchFactory kafkaCacheClientFetchFactory, KafkaCacheClientFetchFanout kafkaCacheClientFetchFanout, MessageConsumer messageConsumer, long j, long j2, long j3, long j4, long j5, KafkaCacheCursorFactory.KafkaFilterCondition kafkaFilterCondition, KafkaDeltaType kafkaDeltaType) {
            this.this$0 = kafkaCacheClientFetchFactory;
            this.group = kafkaCacheClientFetchFanout;
            this.sender = messageConsumer;
            this.routeId = j;
            this.initialId = j2;
            this.replyId = kafkaCacheClientFetchFactory.supplyReplyId.applyAsLong(j2);
            this.affinity = j3;
            this.authorization = j4;
            this.partitionOffset = j5;
            this.cursor = kafkaCacheClientFetchFactory.cursorFactory.newCursor(kafkaFilterCondition, kafkaDeltaType);
            this.deltaType = kafkaDeltaType;
        }

        public void onClientMessage(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1:
                    onClientInitialBegin(this.this$0.beginRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 3:
                    onClientInitialEnd(this.this$0.endRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 4:
                    onClientInitialAbort(this.this$0.abortRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741825:
                    onClientReplyReset(this.this$0.resetRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741826:
                    onClientReplyWindow(this.this$0.windowRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                default:
                    return;
            }
        }

        private void onClientInitialBegin(BeginFW beginFW) {
            long traceId = beginFW.traceId();
            this.state = KafkaState.openingInitial(this.state);
            this.group.onClientFanoutMemberOpening(traceId, this);
        }

        private void onClientInitialEnd(EndFW endFW) {
            long traceId = endFW.traceId();
            this.state = KafkaState.closedInitial(this.state);
            this.group.onClientFanoutMemberClosed(traceId, this);
            doClientReplyEndIfNecessary(traceId);
        }

        private void onClientInitialAbort(AbortFW abortFW) {
            long traceId = abortFW.traceId();
            this.state = KafkaState.closedInitial(this.state);
            this.group.onClientFanoutMemberClosed(traceId, this);
            doClientReplyAbortIfNecessary(traceId);
        }

        public void doClientInitialWindowIfNecessary(long j, long j2, int i, int i2) {
            if (!KafkaState.initialOpened(this.state) || i > 0) {
                doClientInitialWindow(j, j2, i, i2);
            }
        }

        private void doClientInitialWindow(long j, long j2, int i, int i2) {
            this.state = KafkaState.openedInitial(this.state);
            this.this$0.doWindow(this.sender, this.routeId, this.initialId, j, this.authorization, j2, i, i2);
        }

        public void doInitialResetIfNecessary(long j) {
            if (!KafkaState.initialOpening(this.state) || KafkaState.initialClosed(this.state)) {
                return;
            }
            doClientInitialReset(j);
        }

        private void doClientInitialReset(long j) {
            this.state = KafkaState.closedInitial(this.state);
            this.this$0.doReset(this.sender, this.routeId, this.initialId, j, this.authorization);
        }

        public void doClientReplyBeginIfNecessary(long j) {
            if (KafkaState.replyOpening(this.state)) {
                return;
            }
            doClientReplyBegin(j);
        }

        private void doClientReplyBegin(long j) {
            this.state = KafkaState.openingReply(this.state);
            if (this.partitionOffset == KafkaCacheClientFetchFactory.OFFSET_LATEST) {
                this.partitionOffset = this.group.partitionOffset;
            } else if (this.partitionOffset == KafkaCacheClientFetchFactory.OFFSET_EARLIEST) {
                KafkaCachePartition.Node seekNotBefore = this.group.partition.seekNotBefore(0L);
                if (!$assertionsDisabled && seekNotBefore.sentinel()) {
                    throw new AssertionError();
                }
                this.partitionOffset = seekNotBefore.segment().baseOffset();
            }
            if (!$assertionsDisabled && this.partitionOffset < 0) {
                throw new AssertionError();
            }
            KafkaCachePartition.Node seekNotAfter = this.group.partition.seekNotAfter(this.partitionOffset);
            if (!$assertionsDisabled && seekNotAfter.sentinel()) {
                throw new AssertionError(String.format("%s @ %d", this.group.partition, Long.valueOf(this.partitionOffset)));
            }
            this.cursor.init(seekNotAfter, this.partitionOffset);
            this.this$0.router.setThrottle(this.replyId, this::onClientMessage);
            this.this$0.doBegin(this.sender, this.routeId, this.replyId, j, this.authorization, this.affinity, builder -> {
                builder.set((mutableDirectBuffer, i, i2) -> {
                    return this.this$0.kafkaBeginExRW.wrap2(mutableDirectBuffer, i, i2).typeId(this.this$0.kafkaTypeId).fetch(builder -> {
                        builder.topic(this.group.partition.topic()).partition(builder -> {
                            builder.partitionId(this.group.partition.id()).partitionOffset(this.partitionOffset);
                        }).deltaType(builder2 -> {
                            builder2.set(this.deltaType);
                        });
                    }).build().sizeof();
                });
            });
        }

        public void doClientReplyDataIfNecessary(long j) {
            KafkaCacheEntryFW next;
            while (KafkaState.replyOpened(this.state) && this.partitionOffset <= this.group.partitionOffset && (next = this.cursor.next(this.this$0.entryRO)) != null) {
                int i = this.replyBudget;
                doClientReplyData(j, next);
                if (this.replyBudget == i || this.replyBudget < this.replyPadding) {
                    return;
                }
            }
        }

        private void doClientReplyData(long j, KafkaCacheEntryFW kafkaCacheEntryFW) {
            if (!$assertionsDisabled && kafkaCacheEntryFW == null) {
                throw new AssertionError();
            }
            long offset$ = kafkaCacheEntryFW.offset$();
            long timestamp = kafkaCacheEntryFW.timestamp();
            KafkaKeyFW key = kafkaCacheEntryFW.key();
            ArrayFW<KafkaHeaderFW> headers = kafkaCacheEntryFW.headers();
            long ancestor = kafkaCacheEntryFW.ancestor();
            OctetsFW value = kafkaCacheEntryFW.value();
            int sizeof = value != null ? value.sizeof() - this.messageOffset : 0;
            int min = Math.min(sizeof, 1024);
            int i = sizeof + this.replyPadding;
            int i2 = min + this.replyPadding;
            if (!$assertionsDisabled && offset$ < this.partitionOffset) {
                throw new AssertionError(String.format("%d >= %d", Long.valueOf(offset$), Long.valueOf(this.partitionOffset)));
            }
            if (this.replyBudget >= i2) {
                int i3 = i;
                if (this.replyDebitorIndex != -1) {
                    i3 = this.replyDebitor.claim(this.replyDebitorIndex, this.replyId, i2, i);
                }
                if (i3 >= this.replyPadding) {
                    if (i3 != this.replyPadding || value == null) {
                        int i4 = i3 - this.replyPadding;
                        if (!$assertionsDisabled && i4 < 0) {
                            throw new AssertionError(String.format("%d >= 0", Integer.valueOf(i4)));
                        }
                        int i5 = 0;
                        if (this.messageOffset == 0) {
                            i5 = 0 | 2;
                        }
                        if (i4 == sizeof) {
                            i5 |= 1;
                        }
                        OctetsFW octetsFW = value;
                        if (i5 != 3) {
                            int offset = value.offset() + this.messageOffset;
                            octetsFW = this.this$0.valueFragmentRO.wrap(value.buffer(), offset, offset + i4);
                        }
                        int id = this.group.partition.id();
                        switch (i5) {
                            case 0:
                                doClientReplyDataNone(j, octetsFW, i3, i4, i5);
                                break;
                            case 1:
                                doClientReplyDataFin(j, headers, this.deltaType, ancestor, octetsFW, i3, i4, i5, id, offset$);
                                break;
                            case 2:
                                doClientReplyDataInit(j, timestamp, key, this.deltaType, ancestor, octetsFW, i3, i4, i5, id, offset$);
                                break;
                            case 3:
                                doClientReplyDataFull(j, timestamp, key, headers, this.deltaType, ancestor, octetsFW, i3, i5, id, offset$);
                                break;
                        }
                        if ((i5 & 1) == 0) {
                            this.messageOffset += i4;
                            return;
                        }
                        long j2 = offset$ + 1;
                        this.partitionOffset = j2;
                        this.messageOffset = 0;
                        this.cursor.advance(j2);
                    }
                }
            }
        }

        private void doClientReplyDataFull(long j, long j2, KafkaKeyFW kafkaKeyFW, ArrayFW<KafkaHeaderFW> arrayFW, KafkaDeltaType kafkaDeltaType, long j3, OctetsFW octetsFW, int i, int i2, int i3, long j4) {
            this.replyBudget -= i;
            this.this$0.doData(this.sender, this.routeId, this.replyId, j, this.authorization, i2, this.replyBudgetId, i, octetsFW, builder -> {
                builder.set((mutableDirectBuffer, i4, i5) -> {
                    return this.this$0.kafkaDataExRW.wrap2(mutableDirectBuffer, i4, i5).typeId(this.this$0.kafkaTypeId).fetch(builder -> {
                        builder.timestamp(j2).partition(builder -> {
                            builder.partitionId(i3).partitionOffset(j4);
                        }).key(builder2 -> {
                            builder2.length(kafkaKeyFW.length()).value(kafkaKeyFW.value());
                        }).delta(builder3 -> {
                            builder3.type(builder3 -> {
                                builder3.set(kafkaDeltaType);
                            }).ancestorOffset(j3);
                        }).headers(builder4 -> {
                            arrayFW.forEach(kafkaHeaderFW -> {
                                builder4.item(builder4 -> {
                                    builder4.nameLen(kafkaHeaderFW.nameLen()).name(kafkaHeaderFW.name()).valueLen(kafkaHeaderFW.valueLen()).value(kafkaHeaderFW.value());
                                });
                            });
                        });
                    }).build().sizeof();
                });
            });
        }

        private void doClientReplyDataInit(long j, long j2, KafkaKeyFW kafkaKeyFW, KafkaDeltaType kafkaDeltaType, long j3, OctetsFW octetsFW, int i, int i2, int i3, int i4, long j4) {
            this.replyBudget -= i;
            this.this$0.doData(this.sender, this.routeId, this.replyId, j, this.authorization, i3, this.replyBudgetId, i, octetsFW, builder -> {
                builder.set((mutableDirectBuffer, i5, i6) -> {
                    return this.this$0.kafkaDataExRW.wrap2(mutableDirectBuffer, i5, i6).typeId(this.this$0.kafkaTypeId).fetch(builder -> {
                        builder.timestamp(j2).partition(builder -> {
                            builder.partitionId(i4).partitionOffset(j4);
                        }).key(builder2 -> {
                            builder2.length(kafkaKeyFW.length()).value(kafkaKeyFW.value());
                        }).delta(builder3 -> {
                            builder3.type(builder3 -> {
                                builder3.set(kafkaDeltaType);
                            }).ancestorOffset(j3);
                        });
                    }).build().sizeof();
                });
            });
        }

        private void doClientReplyDataNone(long j, OctetsFW octetsFW, int i, int i2, int i3) {
            this.replyBudget -= i;
            this.this$0.doData(this.sender, this.routeId, this.replyId, j, this.authorization, i3, this.replyBudgetId, i, octetsFW, KafkaCacheClientFetchFactory.EMPTY_EXTENSION);
        }

        private void doClientReplyDataFin(long j, ArrayFW<KafkaHeaderFW> arrayFW, KafkaDeltaType kafkaDeltaType, long j2, OctetsFW octetsFW, int i, int i2, int i3, int i4, long j3) {
            this.replyBudget -= i;
            this.this$0.doData(this.sender, this.routeId, this.replyId, j, this.authorization, i3, this.replyBudgetId, i, octetsFW, builder -> {
                builder.set((mutableDirectBuffer, i5, i6) -> {
                    return this.this$0.kafkaDataExRW.wrap2(mutableDirectBuffer, i5, i6).typeId(this.this$0.kafkaTypeId).fetch(builder -> {
                        builder.partition(builder -> {
                            builder.partitionId(i4).partitionOffset(j3);
                        }).delta(builder2 -> {
                            builder2.type(builder2 -> {
                                builder2.set(kafkaDeltaType);
                            }).ancestorOffset(j2);
                        }).headers(builder3 -> {
                            arrayFW.forEach(kafkaHeaderFW -> {
                                builder3.item(builder3 -> {
                                    builder3.nameLen(kafkaHeaderFW.nameLen()).name(kafkaHeaderFW.name()).valueLen(kafkaHeaderFW.valueLen()).value(kafkaHeaderFW.value());
                                });
                            });
                        });
                    }).build().sizeof();
                });
            });
        }

        private void doClientReplyEnd(long j) {
            this.state = KafkaState.closedReply(this.state);
            this.this$0.doEnd(this.sender, this.routeId, this.replyId, j, this.authorization, KafkaCacheClientFetchFactory.EMPTY_EXTENSION);
            doCleanupClient();
        }

        private void doClientReplyAbort(long j) {
            this.state = KafkaState.closedReply(this.state);
            this.this$0.doAbort(this.sender, this.routeId, this.replyId, j, this.authorization, KafkaCacheClientFetchFactory.EMPTY_EXTENSION);
            doCleanupClient();
        }

        public void doClientReplyEndIfNecessary(long j) {
            if (!KafkaState.replyOpening(this.state) || KafkaState.replyClosed(this.state)) {
                return;
            }
            doClientReplyEnd(j);
        }

        public void doClientReplyAbortIfNecessary(long j) {
            if (!KafkaState.replyOpening(this.state) || KafkaState.replyClosed(this.state)) {
                return;
            }
            doClientReplyAbort(j);
        }

        private void onClientReplyWindow(WindowFW windowFW) {
            long traceId = windowFW.traceId();
            long budgetId = windowFW.budgetId();
            int credit = windowFW.credit();
            int padding = windowFW.padding();
            this.replyBudgetId = budgetId;
            this.replyBudget += credit;
            this.replyPadding = padding;
            if (!KafkaState.replyOpened(this.state)) {
                this.state = KafkaState.openedReply(this.state);
                if (this.replyBudgetId != 0 && this.replyDebitorIndex == -1) {
                    this.replyDebitor = (BudgetDebitor) this.this$0.supplyDebitor.apply(this.replyBudgetId);
                    this.replyDebitorIndex = this.replyDebitor.acquire(this.replyBudgetId, this.replyId, this::doClientReplyDataIfNecessary);
                }
            }
            if (this.group.isFanoutReplyOpened()) {
                doClientReplyDataIfNecessary(traceId);
            }
        }

        private void onClientReplyReset(ResetFW resetFW) {
            long traceId = resetFW.traceId();
            this.state = KafkaState.closedReply(this.state);
            doCleanupClient();
            this.group.onClientFanoutMemberClosed(traceId, this);
            doInitialResetIfNecessary(traceId);
        }

        private void doCleanupClient() {
            if (this.replyDebitor != null && this.replyDebitorIndex != -1) {
                this.replyDebitor.release(this.replyBudgetId, this.replyId);
                this.replyDebitorIndex = -1L;
            }
            this.cursor.close();
        }

        static {
            $assertionsDisabled = !KafkaCacheClientFetchFactory.class.desiredAssertionStatus();
        }
    }

    public KafkaCacheClientFetchFactory(KafkaConfiguration kafkaConfiguration, RouteManager routeManager, MutableDirectBuffer mutableDirectBuffer, BufferPool bufferPool, LongUnaryOperator longUnaryOperator, LongUnaryOperator longUnaryOperator2, LongSupplier longSupplier, ToIntFunction<String> toIntFunction, LongFunction<BudgetDebitor> longFunction, Function<String, KafkaCache> function, LongFunction<KafkaCacheRoute> longFunction2, Long2ObjectHashMap<MessageConsumer> long2ObjectHashMap) {
        this.kafkaTypeId = toIntFunction.applyAsInt(KafkaNukleus.NAME);
        this.router = routeManager;
        this.writeBuffer = new UnsafeBuffer(new byte[mutableDirectBuffer.capacity()]);
        this.bufferPool = bufferPool;
        this.supplyInitialId = longUnaryOperator;
        this.supplyReplyId = longUnaryOperator2;
        this.supplyDebitor = longFunction;
        this.supplyCache = function;
        this.supplyCacheRoute = longFunction2;
        this.correlations = long2ObjectHashMap;
        this.cursorFactory = new KafkaCacheCursorFactory(mutableDirectBuffer);
    }

    public MessageConsumer newStream(int i, DirectBuffer directBuffer, int i2, int i3, MessageConsumer messageConsumer) {
        BeginFW wrap = this.beginRO.wrap(directBuffer, i2, i2 + i3);
        long routeId = wrap.routeId();
        long streamId = wrap.streamId();
        long affinity = wrap.affinity();
        long authorization = wrap.authorization();
        if (!$assertionsDisabled && (streamId & 1) == 0) {
            throw new AssertionError();
        }
        OctetsFW extension = wrap.extension();
        ExtensionFW extensionFW = this.extensionRO;
        Objects.requireNonNull(extensionFW);
        ExtensionFW extensionFW2 = (ExtensionFW) extension.get(extensionFW::tryWrap);
        if (!$assertionsDisabled && (extensionFW2 == null || extensionFW2.typeId() != this.kafkaTypeId)) {
            throw new AssertionError();
        }
        KafkaBeginExFW kafkaBeginExFW = this.kafkaBeginExRO;
        Objects.requireNonNull(kafkaBeginExFW);
        KafkaBeginExFW kafkaBeginExFW2 = (KafkaBeginExFW) extension.get(kafkaBeginExFW::wrap);
        if (!$assertionsDisabled && kafkaBeginExFW2.kind() != 1) {
            throw new AssertionError();
        }
        KafkaFetchBeginExFW fetch = kafkaBeginExFW2.fetch();
        String16FW string16FW = fetch.topic();
        KafkaOffsetFW partition = fetch.partition();
        ArrayFW<KafkaFilterFW> filters = fetch.filters();
        KafkaDeltaType kafkaDeltaType = fetch.deltaType().get();
        MessageConsumer messageConsumer2 = null;
        RouteFW routeFW = (RouteFW) this.router.resolve(routeId, authorization, (i4, directBuffer2, i5, i6) -> {
            RouteFW routeFW2 = (RouteFW) this.wrapRoute.apply(i4, directBuffer2, i5, i6);
            OctetsFW extension2 = routeFW2.extension();
            KafkaRouteExFW kafkaRouteExFW = this.routeExRO;
            Objects.requireNonNull(kafkaRouteExFW);
            KafkaRouteExFW kafkaRouteExFW2 = (KafkaRouteExFW) extension2.get(kafkaRouteExFW::tryWrap);
            String16FW string16FW2 = kafkaRouteExFW2 != null ? kafkaRouteExFW2.topic() : null;
            return !routeFW2.localAddress().equals(routeFW2.remoteAddress()) && string16FW2 != null && Objects.equals(string16FW2, string16FW) && ((kafkaRouteExFW2 != null ? kafkaRouteExFW2.deltaType().get() : KafkaDeltaType.NONE) == kafkaDeltaType || kafkaDeltaType == KafkaDeltaType.NONE);
        }, this.wrapRoute);
        if (routeFW != null) {
            long correlationId = routeFW.correlationId();
            String asString = string16FW.asString();
            int partitionId = partition.partitionId();
            long partitionOffset = partition.partitionOffset();
            KafkaCacheRoute apply = this.supplyCacheRoute.apply(correlationId);
            long j = apply.topicPartitionKey(asString, partitionId);
            KafkaCacheClientFetchFanout kafkaCacheClientFetchFanout = (KafkaCacheClientFetchFanout) apply.clientFetchFanoutsByTopicPartition.get(j);
            if (kafkaCacheClientFetchFanout == null) {
                KafkaCacheClientFetchFanout kafkaCacheClientFetchFanout2 = new KafkaCacheClientFetchFanout(correlationId, authorization, affinity, this.supplyCache.apply(routeFW.remoteAddress().asString()).supplyTopic(asString).supplyPartition(partitionId));
                apply.clientFetchFanoutsByTopicPartition.put(j, kafkaCacheClientFetchFanout2);
                kafkaCacheClientFetchFanout = kafkaCacheClientFetchFanout2;
            }
            if (kafkaCacheClientFetchFanout != null) {
                if (!$assertionsDisabled && kafkaCacheClientFetchFanout.affinity != affinity && kafkaCacheClientFetchFanout.state != 0) {
                    throw new AssertionError();
                }
                KafkaCacheClientFetchFanout.access$102(kafkaCacheClientFetchFanout, affinity);
                KafkaCacheClientFetchStream kafkaCacheClientFetchStream = new KafkaCacheClientFetchStream(this, kafkaCacheClientFetchFanout, messageConsumer, routeId, streamId, affinity, authorization, partitionOffset, this.cursorFactory.asCondition(filters), kafkaDeltaType);
                messageConsumer2 = (i7, directBuffer3, i8, i9) -> {
                    kafkaCacheClientFetchStream.onClientMessage(i7, directBuffer3, i8, i9);
                };
            }
        }
        return messageConsumer2;
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW$Builder] */
    public void doBegin(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, long j5, Consumer<OctetsFW.Builder> consumer) {
        BeginFW build = this.beginRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).affinity(j5).extension(consumer).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.DataFW$Builder] */
    public void doData(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, int i, long j5, int i2, OctetsFW octetsFW, Consumer<OctetsFW.Builder> consumer) {
        DataFW build = this.dataRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).flags(i).budgetId(j5).reserved(i2).payload(octetsFW).extension(consumer).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.EndFW$Builder] */
    public void doEnd(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, Consumer<OctetsFW.Builder> consumer) {
        EndFW build = this.endRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).extension(consumer).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.AbortFW$Builder] */
    public void doAbort(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, Consumer<OctetsFW.Builder> consumer) {
        AbortFW build = this.abortRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).extension(consumer).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.WindowFW$Builder] */
    public void doWindow(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, long j5, int i, int i2) {
        WindowFW build = this.windowRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).budgetId(j5).credit(i).padding(i2).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.ResetFW$Builder] */
    public void doReset(MessageConsumer messageConsumer, long j, long j2, long j3, long j4) {
        ResetFW build = this.resetRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    static {
        $assertionsDisabled = !KafkaCacheClientFetchFactory.class.desiredAssertionStatus();
        EMPTY_EXTENSION = builder -> {
        };
        OFFSET_LATEST = KafkaOffsetType.LATEST.value();
        OFFSET_EARLIEST = KafkaOffsetType.EARLIEST.value();
    }
}
