package org.reaktivity.nukleus.kafka.internal.stream;

import java.util.Objects;
import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.function.LongToIntFunction;
import java.util.function.LongUnaryOperator;
import java.util.function.ToIntFunction;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.reaktivity.nukleus.buffer.BufferPool;
import org.reaktivity.nukleus.concurrent.Signaler;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.kafka.internal.KafkaConfiguration;
import org.reaktivity.nukleus.kafka.internal.KafkaNukleus;
import org.reaktivity.nukleus.kafka.internal.cache.KafkaCache;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.ExtensionFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaBeginExFW;
import org.reaktivity.nukleus.route.RouteManager;
import org.reaktivity.nukleus.stream.StreamFactory;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/KafkaCacheServerFactory.class */
public final class KafkaCacheServerFactory implements StreamFactory {
    private final BeginFW beginRO = new BeginFW();
    private final ExtensionFW extensionRO = new ExtensionFW();
    private final KafkaBeginExFW kafkaBeginExRO = new KafkaBeginExFW();
    private final int kafkaTypeId;
    private final Long2ObjectHashMap<MessageConsumer> correlations;
    private final Int2ObjectHashMap<StreamFactory> streamFactoriesByKind;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaCacheServerFactory(KafkaConfiguration kafkaConfiguration, RouteManager routeManager, MutableDirectBuffer mutableDirectBuffer, BufferPool bufferPool, Signaler signaler, LongUnaryOperator longUnaryOperator, LongUnaryOperator longUnaryOperator2, LongSupplier longSupplier, LongSupplier longSupplier2, ToIntFunction<String> toIntFunction, Function<String, KafkaCache> function, LongFunction<KafkaCacheRoute> longFunction, Long2ObjectHashMap<MessageConsumer> long2ObjectHashMap, LongToIntFunction longToIntFunction) {
        Int2ObjectHashMap<StreamFactory> int2ObjectHashMap = new Int2ObjectHashMap<>();
        int2ObjectHashMap.put(KafkaBeginExFW.KIND_BOOTSTRAP, new KafkaCacheServerBootstrapFactory(kafkaConfiguration, routeManager, mutableDirectBuffer, longUnaryOperator, longUnaryOperator2, longSupplier, toIntFunction, long2ObjectHashMap));
        int2ObjectHashMap.put(3, new KafkaCacheMetaFactory(kafkaConfiguration, routeManager, mutableDirectBuffer, bufferPool, signaler, longUnaryOperator, longUnaryOperator2, longSupplier, toIntFunction, function, longFunction, long2ObjectHashMap, KafkaConfiguration.KAFKA_CACHE_SERVER_RECONNECT_DELAY));
        int2ObjectHashMap.put(32, new KafkaCacheServerDescribeFactory(kafkaConfiguration, routeManager, mutableDirectBuffer, bufferPool, signaler, longUnaryOperator, longUnaryOperator2, longSupplier, toIntFunction, function, longFunction, long2ObjectHashMap));
        int2ObjectHashMap.put(1, new KafkaCacheServerFetchFactory(kafkaConfiguration, routeManager, mutableDirectBuffer, bufferPool, signaler, longUnaryOperator, longUnaryOperator2, longSupplier, toIntFunction, function, longFunction, long2ObjectHashMap));
        int2ObjectHashMap.put(0, new KafkaCacheServerProduceFactory(kafkaConfiguration, routeManager, mutableDirectBuffer, signaler, longUnaryOperator, longUnaryOperator2, longSupplier, longSupplier2, toIntFunction, function, longFunction, long2ObjectHashMap, longToIntFunction));
        this.kafkaTypeId = toIntFunction.applyAsInt(KafkaNukleus.NAME);
        this.correlations = long2ObjectHashMap;
        this.streamFactoriesByKind = int2ObjectHashMap;
    }

    public MessageConsumer newStream(int i, DirectBuffer directBuffer, int i2, int i3, MessageConsumer messageConsumer) {
        BeginFW wrap = this.beginRO.wrap(directBuffer, i2, i2 + i3);
        return (wrap.streamId() & 1) != 0 ? newInitialStream(wrap, messageConsumer) : newReplyStream(wrap, messageConsumer);
    }

    private MessageConsumer newInitialStream(BeginFW beginFW, MessageConsumer messageConsumer) {
        StreamFactory streamFactory;
        OctetsFW extension = beginFW.extension();
        ExtensionFW extensionFW = this.extensionRO;
        Objects.requireNonNull(extensionFW);
        ExtensionFW extensionFW2 = (ExtensionFW) extension.get(extensionFW::tryWrap);
        if (!$assertionsDisabled && extensionFW2 == null) {
            throw new AssertionError();
        }
        int typeId = extensionFW2.typeId();
        if (!$assertionsDisabled && (extensionFW2 == null || typeId != this.kafkaTypeId)) {
            throw new AssertionError();
        }
        MessageConsumer messageConsumer2 = null;
        KafkaBeginExFW kafkaBeginExFW = this.kafkaBeginExRO;
        Objects.requireNonNull(kafkaBeginExFW);
        KafkaBeginExFW kafkaBeginExFW2 = (KafkaBeginExFW) extension.get(kafkaBeginExFW::tryWrap);
        if (kafkaBeginExFW2 != null && (streamFactory = (StreamFactory) this.streamFactoriesByKind.get(kafkaBeginExFW2.kind())) != null) {
            messageConsumer2 = streamFactory.newStream(beginFW.typeId(), beginFW.buffer(), beginFW.offset(), beginFW.sizeof(), messageConsumer);
        }
        return messageConsumer2;
    }

    private MessageConsumer newReplyStream(BeginFW beginFW, MessageConsumer messageConsumer) {
        return (MessageConsumer) this.correlations.remove(beginFW.streamId());
    }

    static {
        $assertionsDisabled = !KafkaCacheServerFactory.class.desiredAssertionStatus();
    }
}
