package org.reaktivity.nukleus.kafka.internal.stream;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.function.LongUnaryOperator;
import java.util.function.ToIntFunction;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.buffer.BufferPool;
import org.reaktivity.nukleus.concurrent.Signaler;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.function.MessageFunction;
import org.reaktivity.nukleus.kafka.internal.KafkaConfiguration;
import org.reaktivity.nukleus.kafka.internal.KafkaNukleus;
import org.reaktivity.nukleus.kafka.internal.cache.KafkaCache;
import org.reaktivity.nukleus.kafka.internal.cache.KafkaCacheCursorRecord;
import org.reaktivity.nukleus.kafka.internal.cache.KafkaCacheIndexFile;
import org.reaktivity.nukleus.kafka.internal.cache.KafkaCachePartition;
import org.reaktivity.nukleus.kafka.internal.cache.KafkaCacheSegment;
import org.reaktivity.nukleus.kafka.internal.types.Array32FW;
import org.reaktivity.nukleus.kafka.internal.types.Flyweight;
import org.reaktivity.nukleus.kafka.internal.types.KafkaDeltaFW;
import org.reaktivity.nukleus.kafka.internal.types.KafkaDeltaType;
import org.reaktivity.nukleus.kafka.internal.types.KafkaHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.KafkaKeyFW;
import org.reaktivity.nukleus.kafka.internal.types.KafkaOffsetFW;
import org.reaktivity.nukleus.kafka.internal.types.KafkaOffsetType;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;
import org.reaktivity.nukleus.kafka.internal.types.String16FW;
import org.reaktivity.nukleus.kafka.internal.types.cache.KafkaCacheEntryFW;
import org.reaktivity.nukleus.kafka.internal.types.control.KafkaRouteExFW;
import org.reaktivity.nukleus.kafka.internal.types.control.RouteFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.AbortFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.DataFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.EndFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.ExtensionFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.FlushFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaBeginExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaDataExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaFetchBeginExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaFetchDataExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaFlushExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaResetExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.SignalFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.WindowFW;
import org.reaktivity.nukleus.route.RouteManager;
import org.reaktivity.nukleus.stream.StreamFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:57)
    */
/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/KafkaCacheServerFetchFactory.class */
public final class KafkaCacheServerFetchFactory implements StreamFactory {
    static final int SIZE_OF_FLUSH_WITH_EXTENSION = 64;
    private static final int ERROR_NOT_LEADER_FOR_PARTITION = 6;
    private static final DirectBuffer EMPTY_BUFFER;
    private static final OctetsFW EMPTY_OCTETS;
    private static final Consumer<OctetsFW.Builder> EMPTY_EXTENSION;
    private static final int FLAGS_INIT = 2;
    private static final int FLAGS_FIN = 1;
    private static final int SIGNAL_RECONNECT = 1;
    private static final int SIGNAL_SEGMENT_RETAIN = 2;
    private static final int SIGNAL_SEGMENT_DELETE = 3;
    private static final int SIGNAL_SEGMENT_COMPACT = 4;
    private final RouteFW routeRO = new RouteFW();
    private final KafkaRouteExFW routeExRO = new KafkaRouteExFW();
    private final BeginFW beginRO = new BeginFW();
    private final DataFW dataRO = new DataFW();
    private final EndFW endRO = new EndFW();
    private final AbortFW abortRO = new AbortFW();
    private final ResetFW resetRO = new ResetFW();
    private final WindowFW windowRO = new WindowFW();
    private final SignalFW signalRO = new SignalFW();
    private final ExtensionFW extensionRO = new ExtensionFW();
    private final KafkaBeginExFW kafkaBeginExRO = new KafkaBeginExFW();
    private final KafkaDataExFW kafkaDataExRO = new KafkaDataExFW();
    private final KafkaResetExFW kafkaResetExRO = new KafkaResetExFW();
    private final BeginFW.Builder beginRW = new BeginFW.Builder();
    private final FlushFW.Builder flushRW = new FlushFW.Builder();
    private final EndFW.Builder endRW = new EndFW.Builder();
    private final AbortFW.Builder abortRW = new AbortFW.Builder();
    private final ResetFW.Builder resetRW = new ResetFW.Builder();
    private final WindowFW.Builder windowRW = new WindowFW.Builder();
    private final KafkaBeginExFW.Builder kafkaBeginExRW = new KafkaBeginExFW.Builder();
    private final KafkaFlushExFW.Builder kafkaFlushExRW = new KafkaFlushExFW.Builder();
    private final MessageFunction<RouteFW> wrapRoute = (i, directBuffer, i2, i3) -> {
        return this.routeRO.wrap(directBuffer, i2, i2 + i3);
    };
    private final KafkaCacheEntryFW ancestorEntryRO = new KafkaCacheEntryFW();
    private final int kafkaTypeId;
    private final RouteManager router;
    private final MutableDirectBuffer writeBuffer;
    private final BufferPool bufferPool;
    private final Signaler signaler;
    private final LongUnaryOperator supplyInitialId;
    private final LongUnaryOperator supplyReplyId;
    private final LongSupplier supplyTraceId;
    private final Function<String, KafkaCache> supplyCache;
    private final LongFunction<KafkaCacheRoute> supplyCacheRoute;
    private final Long2ObjectHashMap<MessageConsumer> correlations;
    private final int reconnectDelay;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/KafkaCacheServerFetchFactory$KafkaCacheServerFetchFanout.class */
    public final class KafkaCacheServerFetchFanout {
        private final long routeId;
        private final long authorization;
        private final KafkaCachePartition partition;
        private final KafkaDeltaType deltaType;
        private final KafkaOffsetType defaultOffset;
        private final long retentionMillisMax;
        private final List<KafkaCacheServerFetchStream> members;
        private long affinity;
        private long initialId;
        private long replyId;
        private MessageConsumer receiver;
        private int state;
        private long partitionOffset;
        private long retainId;
        private long deleteId;
        private long compactId;
        private long compactAt;
        private long reconnectAt;
        private int reconnectAttempt;
        static final /* synthetic */ boolean $assertionsDisabled;

        private KafkaCacheServerFetchFanout(long j, long j2, long j3, KafkaCachePartition kafkaCachePartition, KafkaDeltaType kafkaDeltaType, KafkaOffsetType kafkaOffsetType) {
            this.retainId = -1L;
            this.deleteId = -1L;
            this.compactId = -1L;
            this.compactAt = Long.MAX_VALUE;
            this.reconnectAt = -1L;
            this.routeId = j;
            this.authorization = j2;
            this.partition = kafkaCachePartition;
            this.deltaType = kafkaDeltaType;
            this.defaultOffset = kafkaOffsetType;
            this.retentionMillisMax = kafkaOffsetType == KafkaOffsetType.LATEST ? TimeUnit.SECONDS.toMillis(30L) : Long.MAX_VALUE;
            this.members = new ArrayList();
            this.affinity = j3;
        }

        public void onServerFanoutMemberOpening(long j, KafkaCacheServerFetchStream kafkaCacheServerFetchStream) {
            this.members.add(kafkaCacheServerFetchStream);
            if (!$assertionsDisabled && this.members.isEmpty()) {
                throw new AssertionError();
            }
            doServerFanoutInitialBeginIfNecessary(j);
            if (KafkaState.initialOpened(this.state)) {
                kafkaCacheServerFetchStream.doServerInitialWindowIfNecessary(j, 0L, 0, 0);
            }
            if (KafkaState.replyOpened(this.state)) {
                kafkaCacheServerFetchStream.doServerReplyBeginIfNecessary(j);
            }
        }

        public void onServerFanoutMemberClosed(long j, KafkaCacheServerFetchStream kafkaCacheServerFetchStream) {
            this.members.remove(kafkaCacheServerFetchStream);
            if (this.members.isEmpty()) {
                if (this.reconnectAt != -1) {
                    KafkaCacheServerFetchFactory.this.signaler.cancel(this.reconnectAt);
                    this.reconnectAt = -1L;
                }
                KafkaCacheServerFetchFactory.this.correlations.remove(this.replyId);
                doServerFanoutInitialAbortIfNecessary(j);
                doServerFanoutReplyResetIfNecessary(j);
            }
        }

        private void doServerFanoutInitialBeginIfNecessary(long j) {
            if (KafkaState.closed(this.state)) {
                this.state = 0;
            }
            if (KafkaState.initialOpening(this.state)) {
                return;
            }
            if (KafkaConfiguration.DEBUG) {
                System.out.format("%s FETCH connect, affinity %d\n", this.partition, Long.valueOf(this.affinity));
            }
            doServerFanoutInitialBegin(j);
        }

        private void doServerFanoutInitialBegin(long j) {
            if (!$assertionsDisabled && this.state != 0) {
                throw new AssertionError();
            }
            this.initialId = KafkaCacheServerFetchFactory.this.supplyInitialId.applyAsLong(this.routeId);
            this.replyId = KafkaCacheServerFetchFactory.this.supplyReplyId.applyAsLong(this.initialId);
            this.receiver = KafkaCacheServerFetchFactory.this.router.supplyReceiver(this.initialId);
            this.partitionOffset = this.partition.nextOffset(this.defaultOffset);
            KafkaCacheServerFetchFactory.this.correlations.put(this.replyId, this::onServerFanoutMessage);
            KafkaCacheServerFetchFactory.this.router.setThrottle(this.initialId, this::onServerFanoutMessage);
            KafkaCacheServerFetchFactory.this.doBegin(this.receiver, this.routeId, this.initialId, j, this.authorization, this.affinity, builder -> {
                builder.set((mutableDirectBuffer, i, i2) -> {
                    return KafkaCacheServerFetchFactory.this.kafkaBeginExRW.wrap2(mutableDirectBuffer, i, i2).typeId(KafkaCacheServerFetchFactory.this.kafkaTypeId).fetch(builder -> {
                        builder.topic(this.partition.topic()).partition(builder -> {
                            builder.partitionId(this.partition.id()).partitionOffset(this.partitionOffset);
                        });
                    }).build().sizeof();
                });
            });
            this.state = KafkaState.openingInitial(this.state);
        }

        private void doServerFanoutInitialEndIfNecessary(long j) {
            if (KafkaState.initialClosed(this.state)) {
                return;
            }
            doServerFanoutInitialEnd(j);
        }

        private void doServerFanoutInitialEnd(long j) {
            KafkaCacheServerFetchFactory.this.doEnd(this.receiver, this.routeId, this.initialId, j, this.authorization, KafkaCacheServerFetchFactory.EMPTY_EXTENSION);
            this.state = KafkaState.closedInitial(this.state);
        }

        private void doServerFanoutInitialAbortIfNecessary(long j) {
            if (KafkaState.initialClosed(this.state)) {
                return;
            }
            doServerFanoutInitialAbort(j);
        }

        private void doServerFanoutInitialAbort(long j) {
            KafkaCacheServerFetchFactory.this.doAbort(this.receiver, this.routeId, this.initialId, j, this.authorization, KafkaCacheServerFetchFactory.EMPTY_EXTENSION);
            this.state = KafkaState.closedInitial(this.state);
        }

        private void onServerFanoutMessage(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1:
                    onServerFanoutReplyBegin(KafkaCacheServerFetchFactory.this.beginRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 2:
                    onServerFanoutReplyData(KafkaCacheServerFetchFactory.this.dataRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 3:
                    onServerFanoutReplyEnd(KafkaCacheServerFetchFactory.this.endRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 4:
                    onServerFanoutReplyAbort(KafkaCacheServerFetchFactory.this.abortRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741825:
                    onServerFanoutInitialReset(KafkaCacheServerFetchFactory.this.resetRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741826:
                    onServerFanoutInitialWindow(KafkaCacheServerFetchFactory.this.windowRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741827:
                    onServerFanoutInitialSignal(KafkaCacheServerFetchFactory.this.signalRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                default:
                    return;
            }
        }

        private void onServerFanoutReplyBegin(BeginFW beginFW) {
            long traceId = beginFW.traceId();
            OctetsFW extension = beginFW.extension();
            ExtensionFW extensionFW = KafkaCacheServerFetchFactory.this.extensionRO;
            Objects.requireNonNull(extensionFW);
            ExtensionFW extensionFW2 = (ExtensionFW) extension.get(extensionFW::tryWrap);
            if (!$assertionsDisabled && (extensionFW2 == null || extensionFW2.typeId() != KafkaCacheServerFetchFactory.this.kafkaTypeId)) {
                throw new AssertionError();
            }
            KafkaBeginExFW kafkaBeginExFW = KafkaCacheServerFetchFactory.this.kafkaBeginExRO;
            Objects.requireNonNull(kafkaBeginExFW);
            KafkaBeginExFW kafkaBeginExFW2 = (KafkaBeginExFW) extension.get(kafkaBeginExFW::wrap);
            if (!$assertionsDisabled && kafkaBeginExFW2.kind() != 1) {
                throw new AssertionError();
            }
            KafkaOffsetFW partition = kafkaBeginExFW2.fetch().partition();
            int partitionId = partition.partitionId();
            long partitionOffset = partition.partitionOffset();
            this.state = KafkaState.openedReply(this.state);
            if (!$assertionsDisabled && partitionId != this.partition.id()) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && (partitionOffset < 0 || partitionOffset < this.partitionOffset)) {
                throw new AssertionError();
            }
            this.partitionOffset = partitionOffset;
            this.partition.newHeadIfNecessary(partitionOffset);
            this.members.forEach(kafkaCacheServerFetchStream -> {
                kafkaCacheServerFetchStream.doServerReplyBeginIfNecessary(traceId);
            });
            doServerFanoutReplyWindow(traceId, KafkaCacheServerFetchFactory.this.bufferPool.slotCapacity());
        }

        private void onServerFanoutReplyData(DataFW dataFW) {
            long traceId = dataFW.traceId();
            int reserved = dataFW.reserved();
            int flags = dataFW.flags();
            OctetsFW payload = dataFW.payload();
            KafkaFetchDataExFW kafkaFetchDataExFW = null;
            if ((flags & 3) != 0) {
                OctetsFW extension = dataFW.extension();
                ExtensionFW extensionFW = KafkaCacheServerFetchFactory.this.extensionRO;
                Objects.requireNonNull(extensionFW);
                ExtensionFW extensionFW2 = (ExtensionFW) extension.get(extensionFW::tryWrap);
                if (!$assertionsDisabled && (extensionFW2 == null || extensionFW2.typeId() != KafkaCacheServerFetchFactory.this.kafkaTypeId)) {
                    throw new AssertionError();
                }
                KafkaDataExFW kafkaDataExFW = KafkaCacheServerFetchFactory.this.kafkaDataExRO;
                Objects.requireNonNull(kafkaDataExFW);
                KafkaDataExFW kafkaDataExFW2 = (KafkaDataExFW) extension.get(kafkaDataExFW::wrap);
                if (!$assertionsDisabled && kafkaDataExFW2.kind() != 1) {
                    throw new AssertionError();
                }
                kafkaFetchDataExFW = kafkaDataExFW2.fetch();
            }
            if ((flags & 2) != 0) {
                if (!$assertionsDisabled && kafkaFetchDataExFW == null) {
                    throw new AssertionError();
                }
                int deferred = kafkaFetchDataExFW.deferred();
                int partitionId = kafkaFetchDataExFW.partition().partitionId();
                long partitionOffset = kafkaFetchDataExFW.partition().partitionOffset();
                int max = Math.max(kafkaFetchDataExFW.headers().sizeof(), kafkaFetchDataExFW.headersSizeMax());
                long timestamp = kafkaFetchDataExFW.timestamp();
                KafkaKeyFW key = kafkaFetchDataExFW.key();
                KafkaDeltaFW delta = kafkaFetchDataExFW.delta();
                int sizeof = payload != null ? payload.sizeof() + deferred : -1;
                if (!$assertionsDisabled && delta.type().get() != KafkaDeltaType.NONE) {
                    throw new AssertionError();
                }
                if (!$assertionsDisabled && delta.ancestorOffset() != -1) {
                    throw new AssertionError();
                }
                if (!$assertionsDisabled && partitionId != this.partition.id()) {
                    throw new AssertionError();
                }
                if (!$assertionsDisabled && partitionOffset < this.partitionOffset) {
                    throw new AssertionError(String.format("%d >= %d", Long.valueOf(partitionOffset), Long.valueOf(this.partitionOffset)));
                }
                KafkaCachePartition.Node head = this.partition.head();
                KafkaCachePartition.Node newHeadIfNecessary = this.partition.newHeadIfNecessary(partitionOffset, key, sizeof, max);
                long nextOffset = this.partition.nextOffset(this.defaultOffset);
                if (!$assertionsDisabled && (partitionOffset < 0 || partitionOffset < nextOffset)) {
                    throw new AssertionError(String.format("%d >= 0 && %d >= %d", Long.valueOf(partitionOffset), Long.valueOf(partitionOffset), Long.valueOf(nextOffset)));
                }
                if (newHeadIfNecessary != head) {
                    if (this.retainId != -1) {
                        KafkaCacheServerFetchFactory.this.signaler.cancel(this.retainId);
                        this.retainId = -1L;
                    }
                    if (!$assertionsDisabled && this.retainId != -1) {
                        throw new AssertionError();
                    }
                    this.retainId = doServerFanoutInitialSignalAt(this.partition.retainAt(newHeadIfNecessary.segment()), 2);
                    if (this.deleteId == -1 && this.partition.cleanupPolicy().delete() && !newHeadIfNecessary.previous().sentinel()) {
                        this.deleteId = doServerFanoutInitialSignalAt(this.partition.deleteAt(newHeadIfNecessary.previous().segment(), this.retentionMillisMax), 3);
                    }
                }
                long computeKeyHash = this.partition.computeKeyHash(key);
                this.partition.writeEntryStart(partitionOffset, timestamp, key, computeKeyHash, sizeof, findAndMarkAncestor(key, newHeadIfNecessary, (int) computeKeyHash, partitionOffset), this.deltaType);
            }
            if (payload != null) {
                this.partition.writeEntryContinue(payload);
            }
            if ((flags & 1) != 0) {
                if (!$assertionsDisabled && kafkaFetchDataExFW == null) {
                    throw new AssertionError();
                }
                int partitionId2 = kafkaFetchDataExFW.partition().partitionId();
                long partitionOffset2 = kafkaFetchDataExFW.partition().partitionOffset();
                KafkaDeltaFW delta2 = kafkaFetchDataExFW.delta();
                Array32FW<KafkaHeaderFW> headers = kafkaFetchDataExFW.headers();
                if (!$assertionsDisabled && delta2.type().get() != KafkaDeltaType.NONE) {
                    throw new AssertionError();
                }
                if (!$assertionsDisabled && delta2.ancestorOffset() != -1) {
                    throw new AssertionError();
                }
                if (!$assertionsDisabled && partitionId2 != this.partition.id()) {
                    throw new AssertionError();
                }
                if (!$assertionsDisabled && partitionOffset2 < this.partitionOffset) {
                    throw new AssertionError();
                }
                this.partition.writeEntryFinish(headers, this.deltaType);
                this.partitionOffset = partitionOffset2;
                this.members.forEach(kafkaCacheServerFetchStream -> {
                    kafkaCacheServerFetchStream.doServerReplyFlushIfNecessary(traceId);
                });
            }
            doServerFanoutReplyWindow(traceId, reserved);
        }

        private KafkaCacheEntryFW findAndMarkAncestor(KafkaKeyFW kafkaKeyFW, KafkaCachePartition.Node node, int i, long j) {
            KafkaCacheEntryFW kafkaCacheEntryFW = null;
            if (kafkaKeyFW.length() != -1) {
                kafkaCacheEntryFW = node.findAndMarkAncestor(kafkaKeyFW, i, j, KafkaCacheServerFetchFactory.this.ancestorEntryRO);
                if (kafkaCacheEntryFW == null) {
                    KafkaCachePartition.Node previous = node.previous();
                    while (true) {
                        KafkaCachePartition.Node node2 = previous;
                        if (node2.sentinel()) {
                            break;
                        }
                        KafkaCacheSegment segment = node2.segment();
                        KafkaCacheIndexFile keysFile = segment.keysFile();
                        long last = keysFile.last(i);
                        while (true) {
                            long j2 = last;
                            if (j2 != KafkaCacheCursorRecord.NEXT_SEGMENT && KafkaCacheCursorRecord.cursorValue(j2) != KafkaCacheCursorRecord.cursorValue(KafkaCacheCursorRecord.RETRY_SEGMENT)) {
                                int cursorValue = KafkaCacheCursorRecord.cursorValue(j2);
                                if (!$assertionsDisabled && cursorValue > 0) {
                                    throw new AssertionError();
                                }
                                long baseOffset = segment.baseOffset() + cursorValue;
                                KafkaCachePartition.Node seekAncestor = node2.seekAncestor(baseOffset);
                                if (!seekAncestor.sentinel()) {
                                    KafkaCacheSegment segment2 = seekAncestor.segment();
                                    long baseOffset2 = segment2.baseOffset();
                                    if (!$assertionsDisabled && baseOffset2 != baseOffset) {
                                        throw new AssertionError(String.format("%d == %d", Long.valueOf(baseOffset2), Long.valueOf(baseOffset)));
                                    }
                                    kafkaCacheEntryFW = seekAncestor.findAndMarkAncestor(kafkaKeyFW, i, j, KafkaCacheServerFetchFactory.this.ancestorEntryRO);
                                    if (kafkaCacheEntryFW != null) {
                                        if (this.partition.cleanupPolicy().compact()) {
                                            long compactAt = this.partition.compactAt(segment2);
                                            if (compactAt != Long.MAX_VALUE) {
                                                if (this.compactId != -1 && compactAt < this.compactAt) {
                                                    KafkaCacheServerFetchFactory.this.signaler.cancel(this.compactId);
                                                    this.compactId = -1L;
                                                }
                                                if (this.compactId == -1) {
                                                    this.compactAt = compactAt;
                                                    this.compactId = doServerFanoutInitialSignalAt(compactAt, 4);
                                                }
                                            }
                                        }
                                    }
                                }
                                long lower = keysFile.lower(i, j2);
                                if (lower != KafkaCacheCursorRecord.NEXT_SEGMENT && !KafkaCacheCursorRecord.cursorRetryValue(lower)) {
                                    last = lower;
                                }
                            }
                        }
                        previous = node2.previous();
                    }
                } else if (this.partition.cleanupPolicy().compact()) {
                    long compactAt2 = this.partition.compactAt(node.segment());
                    if (compactAt2 != Long.MAX_VALUE) {
                        if (this.compactId != -1 && compactAt2 < this.compactAt) {
                            KafkaCacheServerFetchFactory.this.signaler.cancel(this.compactId);
                            this.compactId = -1L;
                        }
                        if (this.compactId == -1) {
                            this.compactAt = compactAt2;
                            this.compactId = doServerFanoutInitialSignalAt(compactAt2, 4);
                        }
                    }
                }
            }
            return kafkaCacheEntryFW;
        }

        private void onServerFanoutReplyEnd(EndFW endFW) {
            long traceId = endFW.traceId();
            this.state = KafkaState.closedReply(this.state);
            doServerFanoutInitialEndIfNecessary(traceId);
            if (KafkaCacheServerFetchFactory.this.reconnectDelay == 0 || this.members.isEmpty()) {
                if (KafkaConfiguration.DEBUG) {
                    System.out.format("%s FETCH disconnect\n", this.partition);
                }
                this.members.forEach(kafkaCacheServerFetchStream -> {
                    kafkaCacheServerFetchStream.doServerReplyEndIfNecessary(traceId);
                });
                return;
            }
            if (KafkaConfiguration.DEBUG) {
                System.out.format("%s FETCH reconnect in %ds\n", this.partition, Integer.valueOf(KafkaCacheServerFetchFactory.this.reconnectDelay));
            }
            if (this.reconnectAt != -1) {
                KafkaCacheServerFetchFactory.this.signaler.cancel(this.reconnectAt);
            }
            Signaler signaler = KafkaCacheServerFetchFactory.this.signaler;
            long currentTimeMillis = System.currentTimeMillis();
            this.reconnectAttempt = this.reconnectAttempt + 1;
            this.reconnectAt = signaler.signalAt(currentTimeMillis + Math.min(50 << r5, TimeUnit.SECONDS.toMillis(KafkaCacheServerFetchFactory.this.reconnectDelay)), 1, this::onServerFanoutSignal);
        }

        private void onServerFanoutReplyAbort(AbortFW abortFW) {
            long traceId = abortFW.traceId();
            this.state = KafkaState.closedReply(this.state);
            doServerFanoutInitialAbortIfNecessary(traceId);
            if (KafkaCacheServerFetchFactory.this.reconnectDelay == 0 || this.members.isEmpty()) {
                if (KafkaConfiguration.DEBUG) {
                    System.out.format("%s FETCH disconnect\n", this.partition);
                }
                this.members.forEach(kafkaCacheServerFetchStream -> {
                    kafkaCacheServerFetchStream.doServerReplyAbortIfNecessary(traceId);
                });
                return;
            }
            if (KafkaConfiguration.DEBUG) {
                System.out.format("%s FETCH reconnect in %ds\n", this.partition, Integer.valueOf(KafkaCacheServerFetchFactory.this.reconnectDelay));
            }
            if (this.reconnectAt != -1) {
                KafkaCacheServerFetchFactory.this.signaler.cancel(this.reconnectAt);
            }
            Signaler signaler = KafkaCacheServerFetchFactory.this.signaler;
            long currentTimeMillis = System.currentTimeMillis();
            this.reconnectAttempt = this.reconnectAttempt + 1;
            this.reconnectAt = signaler.signalAt(currentTimeMillis + Math.min(50 << r5, TimeUnit.SECONDS.toMillis(KafkaCacheServerFetchFactory.this.reconnectDelay)), 1, this::onServerFanoutSignal);
        }

        private void onServerFanoutInitialReset(ResetFW resetFW) {
            long traceId = resetFW.traceId();
            OctetsFW extension = resetFW.extension();
            this.state = KafkaState.closedInitial(this.state);
            doServerFanoutReplyResetIfNecessary(traceId);
            KafkaResetExFW kafkaResetExFW = KafkaCacheServerFetchFactory.this.kafkaResetExRO;
            Objects.requireNonNull(kafkaResetExFW);
            KafkaResetExFW kafkaResetExFW2 = (KafkaResetExFW) extension.get(kafkaResetExFW::tryWrap);
            int error = kafkaResetExFW2 != null ? kafkaResetExFW2.error() : -1;
            if (KafkaCacheServerFetchFactory.this.reconnectDelay == 0 || this.members.isEmpty() || error == 6) {
                if (KafkaConfiguration.DEBUG) {
                    System.out.format("%s FETCH disconnect, error %d\n", this.partition, Integer.valueOf(error));
                }
                this.members.forEach(kafkaCacheServerFetchStream -> {
                    kafkaCacheServerFetchStream.doServerInitialResetIfNecessary(traceId, extension);
                });
                return;
            }
            if (KafkaConfiguration.DEBUG) {
                System.out.format("%s FETCH reconnect in %ds, error %d\n", this.partition, Integer.valueOf(KafkaCacheServerFetchFactory.this.reconnectDelay), Integer.valueOf(error));
            }
            if (this.reconnectAt != -1) {
                KafkaCacheServerFetchFactory.this.signaler.cancel(this.reconnectAt);
            }
            Signaler signaler = KafkaCacheServerFetchFactory.this.signaler;
            long currentTimeMillis = System.currentTimeMillis();
            this.reconnectAttempt = this.reconnectAttempt + 1;
            this.reconnectAt = signaler.signalAt(currentTimeMillis + Math.min(50 << r5, TimeUnit.SECONDS.toMillis(KafkaCacheServerFetchFactory.this.reconnectDelay)), 1, this::onServerFanoutSignal);
        }

        private void onServerFanoutInitialWindow(WindowFW windowFW) {
            if (KafkaState.initialOpened(this.state)) {
                return;
            }
            this.reconnectAttempt = 0;
            long traceId = windowFW.traceId();
            this.state = KafkaState.openedInitial(this.state);
            this.members.forEach(kafkaCacheServerFetchStream -> {
                kafkaCacheServerFetchStream.doServerInitialWindowIfNecessary(traceId, 0L, 0, 0);
            });
        }

        private void onServerFanoutSignal(int i) {
            if (!$assertionsDisabled && i != 1) {
                throw new AssertionError();
            }
            this.reconnectAt = -1L;
            doServerFanoutInitialBeginIfNecessary(KafkaCacheServerFetchFactory.this.supplyTraceId.getAsLong());
        }

        private void onServerFanoutInitialSignal(SignalFW signalFW) {
            switch (signalFW.signalId()) {
                case 2:
                    onServerFanoutInitialSignalSegmentRetain(signalFW);
                    return;
                case 3:
                    onServerFanoutInitialSignalSegmentDelete(signalFW);
                    return;
                case 4:
                    onServerFanoutInitialSignalSegmentCompact(signalFW);
                    return;
                default:
                    return;
            }
        }

        private void onServerFanoutInitialSignalSegmentRetain(SignalFW signalFW) {
            this.partition.append(this.partitionOffset);
        }

        private void onServerFanoutInitialSignalSegmentDelete(SignalFW signalFW) {
            KafkaCachePartition.Node node;
            long currentTimeMillis = System.currentTimeMillis();
            KafkaCachePartition.Node next = this.partition.sentinel().next();
            while (true) {
                node = next;
                if (node == this.partition.head() || this.partition.deleteAt(node.segment(), this.retentionMillisMax) > currentTimeMillis) {
                    break;
                }
                node.remove();
                next = node.next();
            }
            if (!$assertionsDisabled && node == null) {
                throw new AssertionError();
            }
            if (node != this.partition.head()) {
                this.deleteId = doServerFanoutInitialSignalAt(this.partition.deleteAt(node.segment(), this.retentionMillisMax), 3);
            } else {
                this.deleteId = -1L;
            }
        }

        private void onServerFanoutInitialSignalSegmentCompact(SignalFW signalFW) {
            long currentTimeMillis = System.currentTimeMillis();
            KafkaCachePartition.Node next = this.partition.sentinel().next();
            while (true) {
                KafkaCachePartition.Node node = next;
                if (node.next().sentinel()) {
                    this.compactAt = Long.MAX_VALUE;
                    this.compactId = -1L;
                    return;
                } else {
                    node.clean(currentTimeMillis);
                    next = node.next();
                }
            }
        }

        private void doServerFanoutReplyResetIfNecessary(long j) {
            if (KafkaState.replyClosed(this.state)) {
                return;
            }
            doServerFanoutReplyReset(j);
        }

        private void doServerFanoutReplyReset(long j) {
            KafkaCacheServerFetchFactory.this.correlations.remove(this.replyId);
            this.state = KafkaState.closedReply(this.state);
            KafkaCacheServerFetchFactory.this.doReset(this.receiver, this.routeId, this.replyId, j, this.authorization, KafkaCacheServerFetchFactory.EMPTY_OCTETS);
        }

        private void doServerFanoutReplyWindow(long j, int i) {
            this.state = KafkaState.openedReply(this.state);
            KafkaCacheServerFetchFactory.this.doWindow(this.receiver, this.routeId, this.replyId, j, this.authorization, 0L, i, 0);
        }

        private long doServerFanoutInitialSignalAt(long j, int i) {
            return KafkaCacheServerFetchFactory.this.signaler.signalAt(j, this.routeId, this.initialId, i);
        }

        /* synthetic */ KafkaCacheServerFetchFanout(KafkaCacheServerFetchFactory kafkaCacheServerFetchFactory, long j, long j2, long j3, KafkaCachePartition kafkaCachePartition, KafkaDeltaType kafkaDeltaType, KafkaOffsetType kafkaOffsetType, AnonymousClass1 anonymousClass1) {
            this(j, j2, j3, kafkaCachePartition, kafkaDeltaType, kafkaOffsetType);
        }

        /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.reaktivity.nukleus.kafka.internal.stream.KafkaCacheServerFetchFactory.KafkaCacheServerFetchFanout.access$102(org.reaktivity.nukleus.kafka.internal.stream.KafkaCacheServerFetchFactory$KafkaCacheServerFetchFanout, long):long
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        static /* synthetic */ long access$102(org.reaktivity.nukleus.kafka.internal.stream.KafkaCacheServerFetchFactory.KafkaCacheServerFetchFanout r6, long r7) {
            /*
                r0 = r6
                r1 = r7
                // decode failed: arraycopy: source index -1 out of bounds for object array[6]
                r0.affinity = r1
                return r-1
            */
            throw new UnsupportedOperationException("Method not decompiled: org.reaktivity.nukleus.kafka.internal.stream.KafkaCacheServerFetchFactory.KafkaCacheServerFetchFanout.access$102(org.reaktivity.nukleus.kafka.internal.stream.KafkaCacheServerFetchFactory$KafkaCacheServerFetchFanout, long):long");
        }

        static {
            $assertionsDisabled = !KafkaCacheServerFetchFactory.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/KafkaCacheServerFetchFactory$KafkaCacheServerFetchStream.class */
    public final class KafkaCacheServerFetchStream {
        private final KafkaCacheServerFetchFanout group;
        private final MessageConsumer sender;
        private final long routeId;
        private final long initialId;
        private final long replyId;
        private final long affinity;
        private final long authorization;
        private int state;
        private int replyBudget;
        private long partitionOffset;
        static final /* synthetic */ boolean $assertionsDisabled;
        final /* synthetic */ KafkaCacheServerFetchFactory this$0;

        KafkaCacheServerFetchStream(KafkaCacheServerFetchFactory kafkaCacheServerFetchFactory, KafkaCacheServerFetchFanout kafkaCacheServerFetchFanout, MessageConsumer messageConsumer, long j, long j2, long j3, long j4, long j5) {
            this.this$0 = kafkaCacheServerFetchFactory;
            this.group = kafkaCacheServerFetchFanout;
            this.sender = messageConsumer;
            this.routeId = j;
            this.initialId = j2;
            this.replyId = kafkaCacheServerFetchFactory.supplyReplyId.applyAsLong(j2);
            this.affinity = j3;
            this.authorization = j4;
            this.partitionOffset = j5;
        }

        public void onServerMessage(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1:
                    onServerInitialBegin(this.this$0.beginRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 2:
                    onServerInitialData(this.this$0.dataRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 3:
                    onServerInitialEnd(this.this$0.endRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 4:
                    onServerInitialAbort(this.this$0.abortRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741825:
                    onServerReplyReset(this.this$0.resetRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741826:
                    onServerReplyWindow(this.this$0.windowRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                default:
                    return;
            }
        }

        private void onServerInitialBegin(BeginFW beginFW) {
            long traceId = beginFW.traceId();
            this.state = KafkaState.openingInitial(this.state);
            this.group.onServerFanoutMemberOpening(traceId, this);
        }

        private void onServerInitialData(DataFW dataFW) {
            long traceId = dataFW.traceId();
            doServerInitialResetIfNecessary(traceId, KafkaCacheServerFetchFactory.EMPTY_OCTETS);
            doServerReplyAbortIfNecessary(traceId);
            this.group.onServerFanoutMemberClosed(traceId, this);
        }

        private void onServerInitialEnd(EndFW endFW) {
            long traceId = endFW.traceId();
            this.state = KafkaState.closedInitial(this.state);
            this.group.onServerFanoutMemberClosed(traceId, this);
            doServerReplyEndIfNecessary(traceId);
        }

        private void onServerInitialAbort(AbortFW abortFW) {
            long traceId = abortFW.traceId();
            this.state = KafkaState.closedInitial(this.state);
            this.group.onServerFanoutMemberClosed(traceId, this);
            doServerReplyAbortIfNecessary(traceId);
        }

        public void doServerInitialResetIfNecessary(long j, Flyweight flyweight) {
            if (KafkaState.initialOpening(this.state) && !KafkaState.initialClosed(this.state)) {
                doServerInitialReset(j, flyweight);
            }
            this.state = KafkaState.closedInitial(this.state);
        }

        private void doServerInitialReset(long j, Flyweight flyweight) {
            this.state = KafkaState.closedInitial(this.state);
            this.this$0.doReset(this.sender, this.routeId, this.initialId, j, this.authorization, flyweight);
        }

        public void doServerInitialWindowIfNecessary(long j, long j2, int i, int i2) {
            if (!KafkaState.initialOpened(this.state) || i > 0) {
                doServerInitialWindow(j, j2, i, i2);
            }
        }

        private void doServerInitialWindow(long j, long j2, int i, int i2) {
            this.state = KafkaState.openedInitial(this.state);
            this.this$0.doWindow(this.sender, this.routeId, this.initialId, j, this.authorization, j2, i, i2);
        }

        public void doServerReplyBeginIfNecessary(long j) {
            if (KafkaState.replyOpening(this.state)) {
                return;
            }
            doServerReplyBegin(j);
        }

        private void doServerReplyBegin(long j) {
            this.state = KafkaState.openingReply(this.state);
            this.partitionOffset = Math.max(this.partitionOffset, this.group.partitionOffset);
            this.this$0.router.setThrottle(this.replyId, this::onServerMessage);
            this.this$0.doBegin(this.sender, this.routeId, this.replyId, j, this.authorization, this.affinity, builder -> {
                builder.set((mutableDirectBuffer, i, i2) -> {
                    return this.this$0.kafkaBeginExRW.wrap2(mutableDirectBuffer, i, i2).typeId(this.this$0.kafkaTypeId).fetch(builder -> {
                        builder.topic(this.group.partition.topic()).partition(builder -> {
                            builder.partitionId(this.group.partition.id()).partitionOffset(this.partitionOffset);
                        });
                    }).build().sizeof();
                });
            });
        }

        public void doServerReplyFlushIfNecessary(long j) {
            if (this.partitionOffset > this.group.partitionOffset || this.replyBudget < KafkaCacheServerFetchFactory.SIZE_OF_FLUSH_WITH_EXTENSION) {
                return;
            }
            doServerReplyFlush(j, KafkaCacheServerFetchFactory.SIZE_OF_FLUSH_WITH_EXTENSION);
        }

        private void doServerReplyFlush(long j, int i) {
            if (!$assertionsDisabled && this.partitionOffset > this.group.partitionOffset) {
                throw new AssertionError();
            }
            this.replyBudget -= i;
            if (!$assertionsDisabled && this.replyBudget < 0) {
                throw new AssertionError();
            }
            this.this$0.doFlush(this.sender, this.routeId, this.replyId, j, this.authorization, 0L, i, builder -> {
                builder.set((mutableDirectBuffer, i2, i3) -> {
                    return this.this$0.kafkaFlushExRW.wrap2(mutableDirectBuffer, i2, i3).typeId(this.this$0.kafkaTypeId).fetch(builder -> {
                        builder.partition(builder -> {
                            builder.partitionId(this.group.partition.id()).partitionOffset(this.group.partitionOffset);
                        });
                    }).build().sizeof();
                });
            });
            this.partitionOffset = this.group.partitionOffset + 1;
        }

        public void doServerReplyEndIfNecessary(long j) {
            if (KafkaState.replyOpening(this.state) && !KafkaState.replyClosed(this.state)) {
                doServerReplyEnd(j);
            }
            this.state = KafkaState.closedReply(this.state);
        }

        private void doServerReplyEnd(long j) {
            this.state = KafkaState.closedReply(this.state);
            this.this$0.doEnd(this.sender, this.routeId, this.replyId, j, this.authorization, KafkaCacheServerFetchFactory.EMPTY_EXTENSION);
        }

        public void doServerReplyAbortIfNecessary(long j) {
            if (KafkaState.replyOpening(this.state) && !KafkaState.replyClosed(this.state)) {
                doServerReplyAbort(j);
            }
            this.state = KafkaState.closedReply(this.state);
        }

        private void doServerReplyAbort(long j) {
            this.state = KafkaState.closedReply(this.state);
            this.this$0.doAbort(this.sender, this.routeId, this.replyId, j, this.authorization, KafkaCacheServerFetchFactory.EMPTY_EXTENSION);
        }

        private void onServerReplyWindow(WindowFW windowFW) {
            long traceId = windowFW.traceId();
            long budgetId = windowFW.budgetId();
            int credit = windowFW.credit();
            int padding = windowFW.padding();
            if (!$assertionsDisabled && budgetId != 0) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && padding != 0) {
                throw new AssertionError();
            }
            this.state = KafkaState.openedReply(this.state);
            this.replyBudget += credit;
            doServerReplyFlushIfNecessary(traceId);
        }

        private void onServerReplyReset(ResetFW resetFW) {
            long traceId = resetFW.traceId();
            this.state = KafkaState.closedReply(this.state);
            this.group.onServerFanoutMemberClosed(traceId, this);
            doServerInitialResetIfNecessary(traceId, KafkaCacheServerFetchFactory.EMPTY_OCTETS);
        }

        static {
            $assertionsDisabled = !KafkaCacheServerFetchFactory.class.desiredAssertionStatus();
        }
    }

    public KafkaCacheServerFetchFactory(KafkaConfiguration kafkaConfiguration, RouteManager routeManager, MutableDirectBuffer mutableDirectBuffer, BufferPool bufferPool, Signaler signaler, LongUnaryOperator longUnaryOperator, LongUnaryOperator longUnaryOperator2, LongSupplier longSupplier, ToIntFunction<String> toIntFunction, Function<String, KafkaCache> function, LongFunction<KafkaCacheRoute> longFunction, Long2ObjectHashMap<MessageConsumer> long2ObjectHashMap) {
        this.kafkaTypeId = toIntFunction.applyAsInt(KafkaNukleus.NAME);
        this.router = routeManager;
        this.writeBuffer = mutableDirectBuffer;
        this.bufferPool = bufferPool;
        this.signaler = signaler;
        this.supplyInitialId = longUnaryOperator;
        this.supplyReplyId = longUnaryOperator2;
        this.supplyTraceId = longSupplier;
        this.supplyCache = function;
        this.supplyCacheRoute = longFunction;
        this.correlations = long2ObjectHashMap;
        this.reconnectDelay = kafkaConfiguration.cacheServerReconnect();
    }

    public MessageConsumer newStream(int i, DirectBuffer directBuffer, int i2, int i3, MessageConsumer messageConsumer) {
        BeginFW wrap = this.beginRO.wrap(directBuffer, i2, i2 + i3);
        long routeId = wrap.routeId();
        long streamId = wrap.streamId();
        long affinity = wrap.affinity();
        long authorization = wrap.authorization();
        if (!$assertionsDisabled && (streamId & 1) == 0) {
            throw new AssertionError();
        }
        OctetsFW extension = wrap.extension();
        ExtensionFW extensionFW = this.extensionRO;
        Objects.requireNonNull(extensionFW);
        ExtensionFW extensionFW2 = (ExtensionFW) extension.get(extensionFW::wrap);
        if (!$assertionsDisabled && (extensionFW2 == null || extensionFW2.typeId() != this.kafkaTypeId)) {
            throw new AssertionError();
        }
        KafkaBeginExFW kafkaBeginExFW = this.kafkaBeginExRO;
        Objects.requireNonNull(kafkaBeginExFW);
        KafkaFetchBeginExFW fetch = ((KafkaBeginExFW) extension.get(kafkaBeginExFW::wrap)).fetch();
        String16FW string16FW = fetch.topic();
        KafkaOffsetFW partition = fetch.partition();
        int partitionId = partition.partitionId();
        long partitionOffset = partition.partitionOffset();
        KafkaDeltaType kafkaDeltaType = fetch.deltaType().get();
        MessageConsumer messageConsumer2 = null;
        RouteFW routeFW = (RouteFW) this.router.resolve(routeId, authorization, (i4, directBuffer2, i5, i6) -> {
            RouteFW routeFW2 = (RouteFW) this.wrapRoute.apply(i4, directBuffer2, i5, i6);
            OctetsFW extension2 = routeFW2.extension();
            KafkaRouteExFW kafkaRouteExFW = this.routeExRO;
            Objects.requireNonNull(kafkaRouteExFW);
            KafkaRouteExFW kafkaRouteExFW2 = (KafkaRouteExFW) extension2.get(kafkaRouteExFW::tryWrap);
            String16FW string16FW2 = kafkaRouteExFW2 != null ? kafkaRouteExFW2.topic() : null;
            return (routeFW2.localAddress().equals(routeFW2.remoteAddress()) || string16FW == null || (string16FW2 != null && !string16FW2.equals(string16FW)) || ((kafkaRouteExFW2 != null ? kafkaRouteExFW2.deltaType().get() : KafkaRouteExFW.Builder.DEFAULT_DELTA_TYPE) != kafkaDeltaType && kafkaDeltaType != KafkaDeltaType.NONE)) ? false : true;
        }, this.wrapRoute);
        if (routeFW != null) {
            String asString = string16FW.asString();
            long correlationId = routeFW.correlationId();
            KafkaCacheRoute apply = this.supplyCacheRoute.apply(correlationId);
            long j = apply.topicPartitionKey(asString, partitionId);
            KafkaCacheServerFetchFanout kafkaCacheServerFetchFanout = (KafkaCacheServerFetchFanout) apply.serverFetchFanoutsByTopicPartition.get(j);
            if (kafkaCacheServerFetchFanout == null) {
                OctetsFW extension2 = routeFW.extension();
                KafkaRouteExFW kafkaRouteExFW = this.routeExRO;
                Objects.requireNonNull(kafkaRouteExFW);
                KafkaRouteExFW kafkaRouteExFW2 = (KafkaRouteExFW) extension2.get(kafkaRouteExFW::tryWrap);
                KafkaCacheServerFetchFanout kafkaCacheServerFetchFanout2 = new KafkaCacheServerFetchFanout(correlationId, authorization, affinity, this.supplyCache.apply(routeFW.localAddress().asString()).supplyTopic(asString).supplyPartition(partitionId), kafkaRouteExFW2 != null ? kafkaRouteExFW2.deltaType().get() : KafkaRouteExFW.Builder.DEFAULT_DELTA_TYPE, kafkaRouteExFW2 != null ? kafkaRouteExFW2.defaultOffset().get() : KafkaRouteExFW.Builder.DEFAULT_DEFAULT_OFFSET);
                apply.serverFetchFanoutsByTopicPartition.put(j, kafkaCacheServerFetchFanout2);
                kafkaCacheServerFetchFanout = kafkaCacheServerFetchFanout2;
            }
            if (kafkaCacheServerFetchFanout != null) {
                if (!$assertionsDisabled && kafkaCacheServerFetchFanout.affinity != affinity && kafkaCacheServerFetchFanout.state != 0 && !KafkaState.closed(kafkaCacheServerFetchFanout.state)) {
                    throw new AssertionError(String.format("%d == %d || %d == 0 || KafkaState.closed(0x%08x) // [%016x]", Long.valueOf(kafkaCacheServerFetchFanout.affinity), Long.valueOf(affinity), Integer.valueOf(kafkaCacheServerFetchFanout.state), Integer.valueOf(kafkaCacheServerFetchFanout.state), Long.valueOf(kafkaCacheServerFetchFanout.initialId)));
                }
                KafkaCacheServerFetchFanout.access$102(kafkaCacheServerFetchFanout, affinity);
                KafkaCacheServerFetchStream kafkaCacheServerFetchStream = new KafkaCacheServerFetchStream(this, kafkaCacheServerFetchFanout, messageConsumer, routeId, streamId, affinity, authorization, partitionOffset);
                messageConsumer2 = (i7, directBuffer3, i8, i9) -> {
                    kafkaCacheServerFetchStream.onServerMessage(i7, directBuffer3, i8, i9);
                };
            }
        }
        return messageConsumer2;
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW$Builder] */
    public void doBegin(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, long j5, Consumer<OctetsFW.Builder> consumer) {
        BeginFW build = this.beginRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).affinity(j5).extension(consumer).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.FlushFW$Builder] */
    public void doFlush(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, long j5, int i, Consumer<OctetsFW.Builder> consumer) {
        FlushFW build = this.flushRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).budgetId(j5).reserved(i).extension(consumer).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.EndFW$Builder] */
    public void doEnd(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, Consumer<OctetsFW.Builder> consumer) {
        EndFW build = this.endRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).extension(consumer).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.AbortFW$Builder] */
    public void doAbort(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, Consumer<OctetsFW.Builder> consumer) {
        AbortFW build = this.abortRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).extension(consumer).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.WindowFW$Builder] */
    public void doWindow(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, long j5, int i, int i2) {
        WindowFW build = this.windowRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).budgetId(j5).credit(i).padding(i2).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.ResetFW$Builder] */
    public void doReset(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, Flyweight flyweight) {
        ResetFW build = this.resetRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).extension(flyweight.buffer(), flyweight.offset(), flyweight.sizeof()).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    static {
        $assertionsDisabled = !KafkaCacheServerFetchFactory.class.desiredAssertionStatus();
        EMPTY_BUFFER = new UnsafeBuffer();
        EMPTY_OCTETS = new OctetsFW().wrap(EMPTY_BUFFER, 0, 0);
        EMPTY_EXTENSION = builder -> {
        };
    }
}
