package org.reaktivity.nukleus.kafka.internal.stream;

import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.LongUnaryOperator;
import java.util.function.ToIntFunction;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Long2ObjectHashMap;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.kafka.internal.KafkaConfiguration;
import org.reaktivity.nukleus.kafka.internal.KafkaNukleus;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;
import org.reaktivity.nukleus.kafka.internal.types.String8FW;
import org.reaktivity.nukleus.kafka.internal.types.control.KafkaRouteExFW;
import org.reaktivity.nukleus.kafka.internal.types.control.RouteFW;
import org.reaktivity.nukleus.kafka.internal.types.control.UnrouteFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.EndFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaBeginExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.WindowFW;
import org.reaktivity.nukleus.route.Address;
import org.reaktivity.nukleus.route.AddressFactory;
import org.reaktivity.nukleus.route.RouteManager;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/KafkaCacheServerAddressFactory.class */
public class KafkaCacheServerAddressFactory implements AddressFactory {
    private static final Consumer<OctetsFW.Builder> EMPTY_EXTENSION;
    private final int kafkaTypeId;
    private final RouteManager router;
    private final LongSupplier supplyTraceId;
    private final LongUnaryOperator supplyInitialId;
    private final LongUnaryOperator supplyReplyId;
    private final MutableDirectBuffer writeBuffer;
    private final Long2ObjectHashMap<MessageConsumer> correlations;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final RouteFW routeRO = new RouteFW();
    private final UnrouteFW unrouteRO = new UnrouteFW();
    private final KafkaRouteExFW kafkaRouteExRO = new KafkaRouteExFW();
    private final BeginFW.Builder beginRW = new BeginFW.Builder();
    private final ResetFW.Builder resetRW = new ResetFW.Builder();
    private final WindowFW.Builder windowRW = new WindowFW.Builder();
    private final EndFW.Builder endRW = new EndFW.Builder();
    private final KafkaBeginExFW.Builder kafkaBeginExRW = new KafkaBeginExFW.Builder();
    private final Long2ObjectHashMap<KafkaAddressStream> streamsByRouteId = new Long2ObjectHashMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/KafkaCacheServerAddressFactory$KafkaAddressStream.class */
    public final class KafkaAddressStream {
        private final long routeId;
        private final long authorization;
        private final long initialId;
        private final long replyId;
        private final MessageConsumer receiver;
        private final String topic;
        private int state;
        static final /* synthetic */ boolean $assertionsDisabled;

        private KafkaAddressStream(long j, long j2, String str) {
            this.routeId = j;
            this.authorization = j2;
            this.initialId = KafkaCacheServerAddressFactory.this.supplyInitialId.applyAsLong(j);
            this.replyId = KafkaCacheServerAddressFactory.this.supplyReplyId.applyAsLong(this.initialId);
            this.receiver = KafkaCacheServerAddressFactory.this.router.supplyReceiver(this.initialId);
            this.topic = str;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void doKafkaInitialBegin() {
            long asLong = KafkaCacheServerAddressFactory.this.supplyTraceId.getAsLong();
            this.state = KafkaState.openingInitial(this.state);
            KafkaCacheServerAddressFactory.this.correlations.put(this.replyId, this::onKafkaReply);
            KafkaCacheServerAddressFactory.this.router.setThrottle(this.initialId, this::onKafkaReply);
            KafkaCacheServerAddressFactory.this.doBegin(this.receiver, this.routeId, this.initialId, asLong, this.authorization, 0L, builder -> {
                builder.set((mutableDirectBuffer, i, i2) -> {
                    return KafkaCacheServerAddressFactory.this.kafkaBeginExRW.wrap2(mutableDirectBuffer, i, i2).typeId(KafkaCacheServerAddressFactory.this.kafkaTypeId).bootstrap(builder -> {
                        builder.topic(this.topic);
                    }).build().sizeof();
                });
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void doKafkaInitialEnd() {
            long asLong = KafkaCacheServerAddressFactory.this.supplyTraceId.getAsLong();
            this.state = KafkaState.closedInitial(this.state);
            KafkaCacheServerAddressFactory.this.correlations.remove(this.replyId);
            KafkaCacheServerAddressFactory.this.doEnd(this.receiver, this.routeId, this.initialId, asLong, this.authorization, KafkaCacheServerAddressFactory.EMPTY_EXTENSION);
        }

        private void onKafkaReply(int i, DirectBuffer directBuffer, int i2, int i3) {
            long asLong = KafkaCacheServerAddressFactory.this.supplyTraceId.getAsLong();
            switch (i) {
                case 1:
                    this.state = KafkaState.openedReply(this.state);
                    doKafkaReplyWindow(asLong, 0, 0);
                    return;
                case 3:
                case 4:
                    if (!$assertionsDisabled && !KafkaState.initialClosed(this.state)) {
                        throw new AssertionError("reply closed unexpectedly");
                    }
                    this.state = KafkaState.closedReply(this.state);
                    return;
                case 1073741825:
                    KafkaCacheServerAddressFactory.this.correlations.remove(this.replyId);
                    if (!$assertionsDisabled && !KafkaState.replyClosing(this.state)) {
                        throw new AssertionError("initial closed unexpectedly");
                    }
                    this.state = KafkaState.closedInitial(this.state);
                    doKafkaReplyReset(asLong);
                    return;
                case 1073741826:
                    this.state = KafkaState.openedInitial(this.state);
                    return;
                default:
                    return;
            }
        }

        private void doKafkaReplyReset(long j) {
            KafkaCacheServerAddressFactory.this.doReset(this.receiver, this.routeId, this.replyId, j, this.authorization);
        }

        private void doKafkaReplyWindow(long j, int i, int i2) {
            KafkaCacheServerAddressFactory.this.doWindow(this.receiver, this.routeId, this.replyId, j, this.authorization, 0L, i, i2);
        }

        static {
            $assertionsDisabled = !KafkaCacheServerAddressFactory.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/KafkaCacheServerAddressFactory$KafkaCacheServerAddress.class */
    public final class KafkaCacheServerAddress implements Address {
        private final String name;

        private KafkaCacheServerAddress(String str) {
            this.name = str;
        }

        public String name() {
            return this.name;
        }

        public String nukleus() {
            return KafkaNukleus.NAME;
        }

        public MessageConsumer routeHandler() {
            KafkaCacheServerAddressFactory kafkaCacheServerAddressFactory = KafkaCacheServerAddressFactory.this;
            return (i, directBuffer, i2, i3) -> {
                kafkaCacheServerAddressFactory.onCacheServerMessage(i, directBuffer, i2, i3);
            };
        }
    }

    public KafkaCacheServerAddressFactory(KafkaConfiguration kafkaConfiguration, RouteManager routeManager, MutableDirectBuffer mutableDirectBuffer, ToIntFunction<String> toIntFunction, LongSupplier longSupplier, LongUnaryOperator longUnaryOperator, LongUnaryOperator longUnaryOperator2, Long2ObjectHashMap<MessageConsumer> long2ObjectHashMap) {
        this.kafkaTypeId = toIntFunction.applyAsInt(KafkaNukleus.NAME);
        this.router = routeManager;
        this.writeBuffer = mutableDirectBuffer;
        this.supplyTraceId = longSupplier;
        this.supplyInitialId = longUnaryOperator;
        this.supplyReplyId = longUnaryOperator2;
        this.correlations = long2ObjectHashMap;
    }

    /* renamed from: newAddress, reason: merged with bridge method [inline-methods] */
    public KafkaCacheServerAddress m45newAddress(String str) {
        return new KafkaCacheServerAddress(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onCacheServerMessage(int i, DirectBuffer directBuffer, int i2, int i3) {
        switch (i) {
            case 1:
                onCacheServerRouted(this.routeRO.wrap(directBuffer, i2, i2 + i3));
                return;
            case 2:
                onCacheServerUnrouted(this.unrouteRO.wrap(directBuffer, i2, i2 + i3));
                return;
            default:
                return;
        }
    }

    private void onCacheServerRouted(RouteFW routeFW) {
        String asString;
        String8FW localAddress = routeFW.localAddress();
        String8FW remoteAddress = routeFW.remoteAddress();
        long authorization = routeFW.authorization();
        OctetsFW extension = routeFW.extension();
        KafkaRouteExFW kafkaRouteExFW = this.kafkaRouteExRO;
        Objects.requireNonNull(kafkaRouteExFW);
        KafkaRouteExFW kafkaRouteExFW2 = (KafkaRouteExFW) extension.get(kafkaRouteExFW::tryWrap);
        if (!remoteAddress.equals(localAddress) || kafkaRouteExFW2 == null || (asString = kafkaRouteExFW2.topic().asString()) == null) {
            return;
        }
        long correlationId = routeFW.correlationId();
        if (!$assertionsDisabled && this.streamsByRouteId.containsKey(correlationId)) {
            throw new AssertionError();
        }
        KafkaAddressStream kafkaAddressStream = new KafkaAddressStream(correlationId, authorization, asString);
        this.streamsByRouteId.put(correlationId, kafkaAddressStream);
        kafkaAddressStream.doKafkaInitialBegin();
    }

    private void onCacheServerUnrouted(UnrouteFW unrouteFW) {
        KafkaAddressStream kafkaAddressStream = (KafkaAddressStream) this.streamsByRouteId.remove(unrouteFW.correlationId());
        if (kafkaAddressStream != null) {
            kafkaAddressStream.doKafkaInitialEnd();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW$Builder] */
    public void doBegin(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, long j5, Consumer<OctetsFW.Builder> consumer) {
        BeginFW build = this.beginRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).affinity(j5).extension(consumer).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.EndFW$Builder] */
    public void doEnd(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, Consumer<OctetsFW.Builder> consumer) {
        EndFW build = this.endRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).extension(consumer).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.ResetFW$Builder] */
    public void doReset(MessageConsumer messageConsumer, long j, long j2, long j3, long j4) {
        ResetFW build = this.resetRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.WindowFW$Builder] */
    public void doWindow(MessageConsumer messageConsumer, long j, long j2, long j3, long j4, long j5, int i, int i2) {
        WindowFW build = this.windowRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).traceId(j3).authorization(j4).budgetId(j5).credit(i).padding(i2).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    static {
        $assertionsDisabled = !KafkaCacheServerAddressFactory.class.desiredAssertionStatus();
        EMPTY_EXTENSION = builder -> {
        };
    }
}
