package org.reaktivity.nukleus.kafka.internal.stream;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.function.LongUnaryOperator;
import java.util.function.ToIntFunction;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.Configuration;
import org.reaktivity.nukleus.buffer.BufferPool;
import org.reaktivity.nukleus.concurrent.Signaler;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.function.MessageFunction;
import org.reaktivity.nukleus.kafka.internal.KafkaConfiguration;
import org.reaktivity.nukleus.kafka.internal.KafkaNukleus;
import org.reaktivity.nukleus.kafka.internal.cache.KafkaCache;
import org.reaktivity.nukleus.kafka.internal.cache.KafkaCacheTopic;
import org.reaktivity.nukleus.kafka.internal.types.ArrayFW;
import org.reaktivity.nukleus.kafka.internal.types.Flyweight;
import org.reaktivity.nukleus.kafka.internal.types.KafkaPartitionFW;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;
import org.reaktivity.nukleus.kafka.internal.types.String16FW;
import org.reaktivity.nukleus.kafka.internal.types.control.KafkaRouteExFW;
import org.reaktivity.nukleus.kafka.internal.types.control.RouteFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.AbortFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.DataFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.EndFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.ExtensionFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaBeginExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaDataExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaMetaDataExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaResetExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.WindowFW;
import org.reaktivity.nukleus.route.RouteManager;
import org.reaktivity.nukleus.stream.StreamFactory;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/KafkaCacheMetaFactory.class */
public final class KafkaCacheMetaFactory implements StreamFactory {
    private static final Consumer<OctetsFW.Builder> EMPTY_EXTENSION;
    private static final int SIGNAL_RECONNECT = 1;
    private final RouteFW routeRO = new RouteFW();
    private final KafkaRouteExFW routeExRO = new KafkaRouteExFW();
    private final BeginFW beginRO = new BeginFW();
    private final DataFW dataRO = new DataFW();
    private final EndFW endRO = new EndFW();
    private final AbortFW abortRO = new AbortFW();
    private final ResetFW resetRO = new ResetFW();
    private final WindowFW windowRO = new WindowFW();
    private final ExtensionFW extensionRO = new ExtensionFW();
    private final KafkaBeginExFW kafkaBeginExRO = new KafkaBeginExFW();
    private final KafkaDataExFW kafkaDataExRO = new KafkaDataExFW();
    private final KafkaResetExFW kafkaResetExRO = new KafkaResetExFW();
    private final BeginFW.Builder beginRW = new BeginFW.Builder();
    private final DataFW.Builder dataRW = new DataFW.Builder();
    private final EndFW.Builder endRW = new EndFW.Builder();
    private final AbortFW.Builder abortRW = new AbortFW.Builder();
    private final ResetFW.Builder resetRW = new ResetFW.Builder();
    private final WindowFW.Builder windowRW = new WindowFW.Builder();
    private final KafkaBeginExFW.Builder kafkaBeginExRW = new KafkaBeginExFW.Builder();
    private final KafkaDataExFW.Builder kafkaDataExRW = new KafkaDataExFW.Builder();
    private final MessageFunction<RouteFW> wrapRoute = (i, directBuffer, i2, i3) -> {
        return this.routeRO.wrap(directBuffer, i2, i2 + i3);
    };
    private final int kafkaTypeId;
    private final RouteManager router;
    private final MutableDirectBuffer writeBuffer;
    private final MutableDirectBuffer extBuffer;
    private final BufferPool bufferPool;
    private final Signaler signaler;
    private final LongUnaryOperator supplyInitialId;
    private final LongUnaryOperator supplyReplyId;
    private final LongSupplier supplyTraceId;
    private final Function<String, KafkaCache> supplyCache;
    private final LongFunction<KafkaCacheRoute> supplyCacheRoute;
    private final Long2ObjectHashMap<MessageConsumer> correlations;
    private final int reconnectDelay;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/KafkaCacheMetaFactory$KafkaCacheMetaFanout.class */
    public final class KafkaCacheMetaFanout {
        private final long routeId;
        private final long authorization;
        private final KafkaCacheTopic topic;
        private final List<KafkaCacheMetaStream> members;
        private long initialId;
        private long replyId;
        private MessageConsumer receiver;
        private Int2IntHashMap leadersByPartitionId;
        private int state;
        private long reconnectAt;
        static final /* synthetic */ boolean $assertionsDisabled;

        private KafkaCacheMetaFanout(long j, long j2, KafkaCacheTopic kafkaCacheTopic) {
            this.reconnectAt = -1L;
            this.routeId = j;
            this.authorization = j2;
            this.topic = kafkaCacheTopic;
            this.members = new ArrayList();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void onMetaFanoutMemberOpening(long j, KafkaCacheMetaStream kafkaCacheMetaStream) {
            this.members.add(kafkaCacheMetaStream);
            if (!$assertionsDisabled && this.members.isEmpty()) {
                throw new AssertionError();
            }
            doMetaFanoutInitialBeginIfNecessary(j);
            if (KafkaState.initialOpened(this.state)) {
                kafkaCacheMetaStream.doMetaInitialWindowIfNecessary(j, 0L, 0, 0);
            }
            if (KafkaState.replyOpened(this.state)) {
                kafkaCacheMetaStream.doMetaReplyBeginIfNecessary(j);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* JADX WARN: Type inference failed for: r0v5, types: [org.reaktivity.nukleus.kafka.internal.types.stream.KafkaDataExFW$Builder] */
        public void onMetaFanoutMemberOpened(long j, KafkaCacheMetaStream kafkaCacheMetaStream) {
            if (this.leadersByPartitionId != null) {
                kafkaCacheMetaStream.doMetaReplyDataIfNecessary(j, KafkaCacheMetaFactory.this.kafkaDataExRW.wrap2(KafkaCacheMetaFactory.this.extBuffer, 0, KafkaCacheMetaFactory.this.extBuffer.capacity()).typeId(KafkaCacheMetaFactory.this.kafkaTypeId).meta(builder -> {
                    this.leadersByPartitionId.forEach((num, num2) -> {
                        builder.partitionsItem(builder -> {
                            builder.partitionId(num.intValue()).leaderId(num2.intValue());
                        });
                    });
                }).build());
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void onMetaFanoutMemberClosed(long j, KafkaCacheMetaStream kafkaCacheMetaStream) {
            this.members.remove(kafkaCacheMetaStream);
            if (this.members.isEmpty()) {
                if (this.reconnectAt != -1) {
                    KafkaCacheMetaFactory.this.signaler.cancel(this.reconnectAt);
                    this.reconnectAt = -1L;
                }
                KafkaCacheMetaFactory.this.correlations.remove(this.replyId);
                doMetaFanoutInitialEndIfNecessary(j);
                doMetaFanoutReplyResetIfNecessary(j);
            }
        }

        private void doMetaFanoutInitialBeginIfNecessary(long j) {
            if (KafkaState.closed(this.state)) {
                this.state = 0;
            }
            if (KafkaState.initialOpening(this.state)) {
                return;
            }
            if (KafkaConfiguration.DEBUG) {
                System.out.format("%s META connect\n", this.topic);
            }
            doMetaFanoutInitialBegin(j);
        }

        private void doMetaFanoutInitialBegin(long j) {
            if (!$assertionsDisabled && this.state != 0) {
                throw new AssertionError();
            }
            this.initialId = KafkaCacheMetaFactory.this.supplyInitialId.applyAsLong(this.routeId);
            this.replyId = KafkaCacheMetaFactory.this.supplyReplyId.applyAsLong(this.initialId);
            this.receiver = KafkaCacheMetaFactory.this.router.supplyReceiver(this.initialId);
            KafkaCacheMetaFactory.this.correlations.put(this.replyId, this::onMetaFanoutMessage);
            KafkaCacheMetaFactory.this.router.setThrottle(this.initialId, this::onMetaFanoutMessage);
            KafkaCacheMetaFactory.this.doBegin(this.receiver, this.routeId, this.initialId, j, this.authorization, 0L, builder -> {
                builder.set((mutableDirectBuffer, i, i2) -> {
                    return KafkaCacheMetaFactory.this.kafkaBeginExRW.wrap2(mutableDirectBuffer, i, i2).typeId(KafkaCacheMetaFactory.this.kafkaTypeId).meta(builder -> {
                        builder.topic(this.topic.name());
                    }).build().sizeof();
                });
            });
            this.state = KafkaState.openingInitial(this.state);
        }

        private void doMetaFanoutInitialEndIfNecessary(long j) {
            if (KafkaState.initialClosed(this.state)) {
                return;
            }
            doMetaFanoutInitialEnd(j);
        }

        private void doMetaFanoutInitialEnd(long j) {
            KafkaCacheMetaFactory.this.doEnd(this.receiver, this.routeId, this.initialId, j, this.authorization, KafkaCacheMetaFactory.EMPTY_EXTENSION);
            this.state = KafkaState.closedInitial(this.state);
        }

        private void doMetaFanoutInitialAbortIfNecessary(long j) {
            if (KafkaState.initialClosed(this.state)) {
                return;
            }
            doMetaFanoutInitialAbort(j);
        }

        private void doMetaFanoutInitialAbort(long j) {
            KafkaCacheMetaFactory.this.doAbort(this.receiver, this.routeId, this.initialId, j, this.authorization, KafkaCacheMetaFactory.EMPTY_EXTENSION);
            this.state = KafkaState.closedInitial(this.state);
        }

        private void onMetaFanoutInitialReset(ResetFW resetFW) {
            long traceId = resetFW.traceId();
            OctetsFW extension = resetFW.extension();
            KafkaResetExFW kafkaResetExFW = KafkaCacheMetaFactory.this.kafkaResetExRO;
            Objects.requireNonNull(kafkaResetExFW);
            KafkaResetExFW kafkaResetExFW2 = (KafkaResetExFW) extension.get(kafkaResetExFW::tryWrap);
            int error = kafkaResetExFW2 != null ? kafkaResetExFW2.error() : -1;
            this.state = KafkaState.closedInitial(this.state);
            doMetaFanoutReplyResetIfNecessary(traceId);
            if (KafkaCacheMetaFactory.this.reconnectDelay != 0) {
                if (KafkaConfiguration.DEBUG) {
                    System.out.format("%s META reconnect in %ds, error %d\n", this.topic, Integer.valueOf(KafkaCacheMetaFactory.this.reconnectDelay), Integer.valueOf(error));
                }
                this.reconnectAt = KafkaCacheMetaFactory.this.signaler.signalAt(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(KafkaCacheMetaFactory.this.reconnectDelay), 1, this::onMetaFanoutSignal);
            } else {
                if (KafkaConfiguration.DEBUG) {
                    System.out.format("%s META disconnect, error %d\n", this.topic, Integer.valueOf(error));
                }
                this.members.forEach(kafkaCacheMetaStream -> {
                    kafkaCacheMetaStream.doMetaInitialResetIfNecessary(traceId);
                });
            }
        }

        private void onMetaFanoutInitialWindow(WindowFW windowFW) {
            if (KafkaState.initialOpened(this.state)) {
                return;
            }
            long traceId = windowFW.traceId();
            this.state = KafkaState.openedInitial(this.state);
            this.members.forEach(kafkaCacheMetaStream -> {
                kafkaCacheMetaStream.doMetaInitialWindowIfNecessary(traceId, 0L, 0, 0);
            });
        }

        private void onMetaFanoutMessage(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1:
                    onMetaFanoutReplyBegin(KafkaCacheMetaFactory.this.beginRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 2:
                    onMetaFanoutReplyData(KafkaCacheMetaFactory.this.dataRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 3:
                    onMetaFanoutReplyEnd(KafkaCacheMetaFactory.this.endRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 4:
                    onMetaFanoutReplyAbort(KafkaCacheMetaFactory.this.abortRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741825:
                    onMetaFanoutInitialReset(KafkaCacheMetaFactory.this.resetRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741826:
                    onMetaFanoutInitialWindow(KafkaCacheMetaFactory.this.windowRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                default:
                    return;
            }
        }

        private void onMetaFanoutReplyBegin(BeginFW beginFW) {
            long traceId = beginFW.traceId();
            this.state = KafkaState.openingReply(this.state);
            this.members.forEach(kafkaCacheMetaStream -> {
                kafkaCacheMetaStream.doMetaReplyBeginIfNecessary(traceId);
            });
            doMetaFanoutReplyWindow(traceId, KafkaCacheMetaFactory.this.bufferPool.slotCapacity());
        }

        private void onMetaFanoutReplyData(DataFW dataFW) {
            KafkaDataExFW kafkaDataExFW;
            long traceId = dataFW.traceId();
            int reserved = dataFW.reserved();
            OctetsFW extension = dataFW.extension();
            ExtensionFW extensionFW = KafkaCacheMetaFactory.this.extensionRO;
            Objects.requireNonNull(extensionFW);
            if (((ExtensionFW) extension.get(extensionFW::tryWrap)).typeId() == KafkaCacheMetaFactory.this.kafkaTypeId) {
                KafkaDataExFW kafkaDataExFW2 = KafkaCacheMetaFactory.this.kafkaDataExRO;
                Objects.requireNonNull(kafkaDataExFW2);
                kafkaDataExFW = (KafkaDataExFW) extension.get(kafkaDataExFW2::tryWrap);
            } else {
                kafkaDataExFW = null;
            }
            KafkaDataExFW kafkaDataExFW3 = kafkaDataExFW;
            if (!$assertionsDisabled && kafkaDataExFW3 != null && kafkaDataExFW3.kind() != 3) {
                throw new AssertionError();
            }
            KafkaMetaDataExFW meta = kafkaDataExFW3 != null ? kafkaDataExFW3.meta() : null;
            if (meta != null) {
                ArrayFW<KafkaPartitionFW> partitions = meta.partitions();
                if (this.leadersByPartitionId == null) {
                    this.leadersByPartitionId = new Int2IntHashMap(Integer.MIN_VALUE);
                }
                this.leadersByPartitionId.clear();
                partitions.forEach(kafkaPartitionFW -> {
                    this.leadersByPartitionId.put(kafkaPartitionFW.partitionId(), kafkaPartitionFW.leaderId());
                });
                this.members.forEach(kafkaCacheMetaStream -> {
                    kafkaCacheMetaStream.doMetaReplyDataIfNecessary(traceId, kafkaDataExFW3);
                });
            }
            doMetaFanoutReplyWindow(traceId, reserved);
        }

        private void onMetaFanoutReplyEnd(EndFW endFW) {
            long traceId = endFW.traceId();
            this.state = KafkaState.closedReply(this.state);
            doMetaFanoutInitialEndIfNecessary(traceId);
            if (KafkaCacheMetaFactory.this.reconnectDelay != 0) {
                if (KafkaConfiguration.DEBUG) {
                    System.out.format("%s META reconnect in %ds\n", this.topic, Integer.valueOf(KafkaCacheMetaFactory.this.reconnectDelay));
                }
                this.reconnectAt = KafkaCacheMetaFactory.this.signaler.signalAt(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(KafkaCacheMetaFactory.this.reconnectDelay), 1, this::onMetaFanoutSignal);
            } else {
                if (KafkaConfiguration.DEBUG) {
                    System.out.format("%s META disconnect\n", this.topic);
                }
                this.members.forEach(kafkaCacheMetaStream -> {
                    kafkaCacheMetaStream.doMetaReplyEndIfNecessary(traceId);
                });
            }
        }

        private void onMetaFanoutReplyAbort(AbortFW abortFW) {
            long traceId = abortFW.traceId();
            this.state = KafkaState.closedReply(this.state);
            doMetaFanoutInitialAbortIfNecessary(traceId);
            if (KafkaCacheMetaFactory.this.reconnectDelay != 0) {
                if (KafkaConfiguration.DEBUG) {
                    System.out.format("%s META reconnect in %ds\n", this.topic, Integer.valueOf(KafkaCacheMetaFactory.this.reconnectDelay));
                }
                this.reconnectAt = KafkaCacheMetaFactory.this.signaler.signalAt(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(KafkaCacheMetaFactory.this.reconnectDelay), 1, this::onMetaFanoutSignal);
            } else {
                if (KafkaConfiguration.DEBUG) {
                    System.out.format("%s META disconnect\n", this.topic);
                }
                this.members.forEach(kafkaCacheMetaStream -> {
                    kafkaCacheMetaStream.doMetaReplyAbortIfNecessary(traceId);
                });
            }
        }

        private void onMetaFanoutSignal(int i) {
            if (!$assertionsDisabled && i != 1) {
                throw new AssertionError();
            }
            doMetaFanoutInitialBeginIfNecessary(KafkaCacheMetaFactory.this.supplyTraceId.getAsLong());
        }

        private void doMetaFanoutReplyResetIfNecessary(long j) {
            if (KafkaState.replyClosed(this.state)) {
                return;
            }
            doMetaFanoutReplyReset(j);
        }

        private void doMetaFanoutReplyReset(long j) {
            KafkaCacheMetaFactory.this.doReset(this.receiver, this.routeId, this.replyId, j, this.authorization);
            this.state = KafkaState.closedReply(this.state);
        }

        private void doMetaFanoutReplyWindow(long j, int i) {
            this.state = KafkaState.openedReply(this.state);
            KafkaCacheMetaFactory.this.doWindow(this.receiver, this.routeId, this.replyId, j, this.authorization, 0L, i, 0);
        }

        static {
            $assertionsDisabled = !KafkaCacheMetaFactory.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/KafkaCacheMetaFactory$KafkaCacheMetaStream.class */
    public final class KafkaCacheMetaStream {
        private final KafkaCacheMetaFanout group;
        private final MessageConsumer sender;
        private final long routeId;
        private final long initialId;
        private final long replyId;
        private final long affinity;
        private final long authorization;
        private int state;
        private long replyBudgetId;
        private int replyBudget;
        private int replyPadding;
        static final /* synthetic */ boolean $assertionsDisabled;

        KafkaCacheMetaStream(KafkaCacheMetaFanout kafkaCacheMetaFanout, MessageConsumer messageConsumer, long j, long j2, long j3, long j4) {
            this.group = kafkaCacheMetaFanout;
            this.sender = messageConsumer;
            this.routeId = j;
            this.initialId = j2;
            this.replyId = KafkaCacheMetaFactory.this.supplyReplyId.applyAsLong(j2);
            this.affinity = j3;
            this.authorization = j4;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void onMetaMessage(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1:
                    onMetaInitialBegin(KafkaCacheMetaFactory.this.beginRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 3:
                    onMetaInitialEnd(KafkaCacheMetaFactory.this.endRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 4:
                    onMetaInitialAbort(KafkaCacheMetaFactory.this.abortRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741825:
                    onMetaReplyReset(KafkaCacheMetaFactory.this.resetRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741826:
                    onMetaReplyWindow(KafkaCacheMetaFactory.this.windowRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                default:
                    return;
            }
        }

        private void onMetaInitialBegin(BeginFW beginFW) {
            long traceId = beginFW.traceId();
            this.state = KafkaState.openingInitial(this.state);
            this.group.onMetaFanoutMemberOpening(traceId, this);
        }

        private void onMetaInitialEnd(EndFW endFW) {
            long traceId = endFW.traceId();
            this.state = KafkaState.closedInitial(this.state);
            this.group.onMetaFanoutMemberClosed(traceId, this);
            doMetaReplyEndIfNecessary(traceId);
        }

        private void onMetaInitialAbort(AbortFW abortFW) {
            long traceId = abortFW.traceId();
            this.state = KafkaState.closedInitial(this.state);
            this.group.onMetaFanoutMemberClosed(traceId, this);
            doMetaReplyAbortIfNecessary(traceId);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void doMetaInitialResetIfNecessary(long j) {
            if (KafkaState.initialOpening(this.state) && !KafkaState.initialClosed(this.state)) {
                doMetaInitialReset(j);
            }
            this.state = KafkaState.closedInitial(this.state);
        }

        private void doMetaInitialReset(long j) {
            this.state = KafkaState.closedInitial(this.state);
            KafkaCacheMetaFactory.this.doReset(this.sender, this.routeId, this.initialId, j, this.authorization);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void doMetaInitialWindowIfNecessary(long j, long j2, int i, int i2) {
            if (!KafkaState.initialOpened(this.state) || i > 0) {
                doMetaInitialWindow(j, j2, i, i2);
            }
        }

        private void doMetaInitialWindow(long j, long j2, int i, int i2) {
            KafkaCacheMetaFactory.this.doWindow(this.sender, this.routeId, this.initialId, j, this.authorization, j2, i, i2);
            this.state = KafkaState.openedInitial(this.state);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void doMetaReplyBeginIfNecessary(long j) {
            if (KafkaState.replyOpening(this.state)) {
                return;
            }
            doMetaReplyBegin(j);
        }

        private void doMetaReplyBegin(long j) {
            this.state = KafkaState.openingReply(this.state);
            KafkaCacheMetaFactory.this.router.setThrottle(this.replyId, this::onMetaMessage);
            KafkaCacheMetaFactory.this.doBegin(this.sender, this.routeId, this.replyId, j, this.authorization, this.affinity, builder -> {
                builder.set((mutableDirectBuffer, i, i2) -> {
                    return KafkaCacheMetaFactory.this.kafkaBeginExRW.wrap2(mutableDirectBuffer, i, i2).typeId(KafkaCacheMetaFactory.this.kafkaTypeId).meta(builder -> {
                        builder.topic(this.group.topic.name());
                    }).build().sizeof();
                });
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void doMetaReplyDataIfNecessary(long j, KafkaDataExFW kafkaDataExFW) {
            if (KafkaState.replyOpened(this.state)) {
                doMetaReplyData(j, kafkaDataExFW);
            }
        }

        private void doMetaReplyData(long j, KafkaDataExFW kafkaDataExFW) {
            int i = this.replyPadding;
            this.replyBudget -= i;
            if (!$assertionsDisabled && this.replyBudget < 0) {
                throw new AssertionError();
            }
            KafkaCacheMetaFactory.this.doDataNull(this.sender, this.routeId, this.replyId, j, this.authorization, this.replyBudgetId, i, kafkaDataExFW);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void doMetaReplyEndIfNecessary(long j) {
            if (KafkaState.replyOpening(this.state) && !KafkaState.replyClosed(this.state)) {
                doMetaReplyEnd(j);
            }
            this.state = KafkaState.closedReply(this.state);
        }

        private void doMetaReplyEnd(long j) {
            this.state = KafkaState.closedReply(this.state);
            KafkaCacheMetaFactory.this.doEnd(this.sender, this.routeId, this.replyId, j, this.authorization, KafkaCacheMetaFactory.EMPTY_EXTENSION);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void doMetaReplyAbortIfNecessary(long j) {
            if (KafkaState.replyOpening(this.state) && !KafkaState.replyClosed(this.state)) {
                doMetaReplyAbort(j);
            }
            this.state = KafkaState.closedReply(this.state);
        }

        private void doMetaReplyAbort(long j) {
            this.state = KafkaState.closedReply(this.state);
            KafkaCacheMetaFactory.this.doAbort(this.sender, this.routeId, this.replyId, j, this.authorization, KafkaCacheMetaFactory.EMPTY_EXTENSION);
        }

        private void onMetaReplyReset(ResetFW resetFW) {
            long traceId = resetFW.traceId();
            this.state = KafkaState.closedInitial(this.state);
            this.group.onMetaFanoutMemberClosed(traceId, this);
            doMetaInitialResetIfNecessary(traceId);
        }

        private void onMetaReplyWindow(WindowFW windowFW) {
            long budgetId = windowFW.budgetId();
            int credit = windowFW.credit();
            int padding = windowFW.padding();
            this.replyBudgetId = budgetId;
            this.replyBudget += credit;
            this.replyPadding = padding;
            if (KafkaState.replyOpened(this.state)) {
                return;
            }
            this.state = KafkaState.openedReply(this.state);
            this.group.onMetaFanoutMemberOpened(windowFW.traceId(), this);
        }

        static {
            $assertionsDisabled = !KafkaCacheMetaFactory.class.desiredAssertionStatus();
        }
    }

    public KafkaCacheMetaFactory(KafkaConfiguration kafkaConfiguration, RouteManager routeManager, MutableDirectBuffer mutableDirectBuffer, BufferPool bufferPool, Signaler signaler, LongUnaryOperator longUnaryOperator, LongUnaryOperator longUnaryOperator2, LongSupplier longSupplier, ToIntFunction<String> toIntFunction, Function<String, KafkaCache> function, LongFunction<KafkaCacheRoute> longFunction, Long2ObjectHashMap<MessageConsumer> long2ObjectHashMap, Configuration.IntPropertyDef intPropertyDef) {
        this.kafkaTypeId = toIntFunction.applyAsInt(KafkaNukleus.NAME);
        this.router = routeManager;
        this.writeBuffer = new UnsafeBuffer(new byte[mutableDirectBuffer.capacity()]);
        this.extBuffer = new UnsafeBuffer(new byte[mutableDirectBuffer.capacity()]);
        this.bufferPool = bufferPool;
        this.signaler = signaler;
        this.supplyInitialId = longUnaryOperator;
        this.supplyReplyId = longUnaryOperator2;
        this.supplyTraceId = longSupplier;
        this.supplyCache = function;
        this.supplyCacheRoute = longFunction;
        this.correlations = long2ObjectHashMap;
        this.reconnectDelay = intPropertyDef.getAsInt(kafkaConfiguration);
    }

    public MessageConsumer newStream(int i, DirectBuffer directBuffer, int i2, int i3, MessageConsumer messageConsumer) {
        BeginFW wrap = this.beginRO.wrap(directBuffer, i2, i2 + i3);
        long routeId = wrap.routeId();
        long streamId = wrap.streamId();
        long authorization = wrap.authorization();
        long affinity = wrap.affinity();
        if (!$assertionsDisabled && (streamId & 1) == 0) {
            throw new AssertionError();
        }
        OctetsFW extension = wrap.extension();
        ExtensionFW extensionFW = this.extensionRO;
        Objects.requireNonNull(extensionFW);
        ExtensionFW extensionFW2 = (ExtensionFW) extension.get(extensionFW::tryWrap);
        if (!$assertionsDisabled && (extensionFW2 == null || extensionFW2.typeId() != this.kafkaTypeId)) {
            throw new AssertionError();
        }
        KafkaBeginExFW kafkaBeginExFW = this.kafkaBeginExRO;
        Objects.requireNonNull(kafkaBeginExFW);
        KafkaBeginExFW kafkaBeginExFW2 = (KafkaBeginExFW) extension.get(kafkaBeginExFW::wrap);
        if (!$assertionsDisabled && kafkaBeginExFW2.kind() != 3) {
            throw new AssertionError();
        }
        String16FW string16FW = kafkaBeginExFW2.meta().topic();
        MessageConsumer messageConsumer2 = null;
        RouteFW routeFW = (RouteFW) this.router.resolve(routeId, authorization, (i4, directBuffer2, i5, i6) -> {
            RouteFW routeFW2 = (RouteFW) this.wrapRoute.apply(i4, directBuffer2, i5, i6);
            OctetsFW extension2 = routeFW2.extension();
            KafkaRouteExFW kafkaRouteExFW = this.routeExRO;
            Objects.requireNonNull(kafkaRouteExFW);
            KafkaRouteExFW kafkaRouteExFW2 = (KafkaRouteExFW) extension2.get(kafkaRouteExFW::tryWrap);
            String16FW string16FW2 = kafkaRouteExFW2 != null ? kafkaRouteExFW2.topic() : null;
            return (routeFW2.localAddress().equals(routeFW2.remoteAddress()) || string16FW == null || (string16FW2 != null && !string16FW2.equals(string16FW))) ? false : true;
        }, this.wrapRoute);
        if (routeFW != null) {
            long correlationId = routeFW.correlationId();
            String asString = string16FW.asString();
            KafkaCacheRoute apply = this.supplyCacheRoute.apply(correlationId);
            int i7 = apply.topicKey(asString);
            KafkaCacheMetaFanout kafkaCacheMetaFanout = (KafkaCacheMetaFanout) apply.metaFanoutsByTopic.get(i7);
            if (kafkaCacheMetaFanout == null) {
                KafkaCacheMetaFanout kafkaCacheMetaFanout2 = new KafkaCacheMetaFanout(correlationId, authorization, this.supplyCache.apply(routeFW.localAddress().asString()).supplyTopic(asString));
                apply.metaFanoutsByTopic.put(i7, kafkaCacheMetaFanout2);
                kafkaCacheMetaFanout = kafkaCacheMetaFanout2;
            }
            if (kafkaCacheMetaFanout != null) {
                KafkaCacheMetaStream kafkaCacheMetaStream = new KafkaCacheMetaStream(kafkaCacheMetaFanout, messageConsumer, routeId, streamId, affinity, authorization);
                messageConsumer2 = (i8, directBuffer3, i9, i10) -> {
                    kafkaCacheMetaStream.onMetaMessage(i8, directBuffer3, i9, i10);
                };
            }
        }
        return messageConsumer2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW$Builder] */
    public void doBegin(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, long j5, Consumer<OctetsFW.Builder> consumer) {
        BeginFW build = this.beginRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).affinity(j5).extension(consumer).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.DataFW$Builder] */
    public void doDataNull(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, long j5, int i, Flyweight flyweight) {
        DataFW build = this.dataRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).budgetId(j5).reserved(i).extension(flyweight.buffer(), flyweight.offset(), flyweight.sizeof()).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.EndFW$Builder] */
    public void doEnd(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, Consumer<OctetsFW.Builder> consumer) {
        EndFW build = this.endRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).extension(consumer).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.AbortFW$Builder] */
    public void doAbort(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, Consumer<OctetsFW.Builder> consumer) {
        AbortFW build = this.abortRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).extension(consumer).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.WindowFW$Builder] */
    public void doWindow(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, long j5, int i, int i2) {
        WindowFW build = this.windowRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).budgetId(j5).credit(i).padding(i2).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.ResetFW$Builder] */
    public void doReset(MessageConsumer messageConsumer, long j, long j2, long j3, long j4) {
        ResetFW build = this.resetRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    static {
        $assertionsDisabled = !KafkaCacheMetaFactory.class.desiredAssertionStatus();
        EMPTY_EXTENSION = builder -> {
        };
    }
}
