package org.reaktivity.reaktor.internal.agent;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.broadcast.BroadcastTransmitter;
import org.agrona.concurrent.ringbuffer.ManyToOneRingBuffer;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.reaktivity.nukleus.AgentBuilder;
import org.reaktivity.nukleus.Nukleus;
import org.reaktivity.nukleus.function.CommandHandler;
import org.reaktivity.nukleus.function.MessagePredicate;
import org.reaktivity.nukleus.route.RouteKind;
import org.reaktivity.reaktor.ReaktorConfiguration;
import org.reaktivity.reaktor.internal.LabelManager;
import org.reaktivity.reaktor.internal.layouts.ControlLayout;
import org.reaktivity.reaktor.internal.router.Router;
import org.reaktivity.reaktor.internal.types.OctetsFW;
import org.reaktivity.reaktor.internal.types.control.CommandFW;
import org.reaktivity.reaktor.internal.types.control.ErrorFW;
import org.reaktivity.reaktor.internal.types.control.FreezeFW;
import org.reaktivity.reaktor.internal.types.control.FrozenFW;
import org.reaktivity.reaktor.internal.types.control.RouteFW;
import org.reaktivity.reaktor.internal.types.control.RoutedFW;
import org.reaktivity.reaktor.internal.types.control.UnrouteFW;
import org.reaktivity.reaktor.internal.types.control.UnroutedFW;

/* loaded from: input_file:org/reaktivity/reaktor/internal/agent/NukleusAgent.class */
public class NukleusAgent implements Agent {
    private final ReaktorConfiguration config;
    private final Supplier<AgentBuilder> supplyAgentBuilder;
    private final LabelManager labels;
    private final Router router;
    private final ControlLayout control;
    private final RingBuffer commandBuffer;
    private final BroadcastTransmitter responseBuffer;
    private final MutableDirectBuffer sendBuffer;
    private final CommandFW commandRO = new CommandFW();
    private final RouteFW routeRO = new RouteFW();
    private final UnrouteFW unrouteRO = new UnrouteFW();
    private final FreezeFW freezeRO = new FreezeFW();
    private final ErrorFW.Builder errorRW = new ErrorFW.Builder();
    private final RoutedFW.Builder routedRW = new RoutedFW.Builder();
    private final UnroutedFW.Builder unroutedRW = new UnroutedFW.Builder();
    private final FrozenFW.Builder frozenRW = new FrozenFW.Builder();
    private final List<ElektronAgent> elektronAgents = new ArrayList();
    private final Map<String, Nukleus> nukleiByName = new HashMap();
    private final MessageHandler commandHandler = this::handleCommand;

    public NukleusAgent(ReaktorConfiguration reaktorConfiguration, Supplier<AgentBuilder> supplier) {
        this.config = reaktorConfiguration;
        this.supplyAgentBuilder = supplier;
        this.labels = new LabelManager(reaktorConfiguration.directory());
        this.control = new ControlLayout.Builder().controlPath(reaktorConfiguration.directory().resolve("control")).commandBufferCapacity(reaktorConfiguration.commandBufferCapacity()).responseBufferCapacity(reaktorConfiguration.responseBufferCapacity()).readonly(false).build();
        this.commandBuffer = new ManyToOneRingBuffer(this.control.commandBuffer());
        this.responseBuffer = new BroadcastTransmitter(this.control.responseBuffer());
        this.sendBuffer = new UnsafeBuffer(ByteBuffer.allocateDirect(this.responseBuffer.maxMsgLength()));
        this.router = new Router(reaktorConfiguration, this.labels, this.commandBuffer.maxMsgLength());
    }

    public String roleName() {
        return "reaktor/control";
    }

    public int doWork() throws Exception {
        return this.commandBuffer.read(this.commandHandler);
    }

    public void onClose() {
        CloseHelper.quietClose(this.control);
    }

    public Nukleus nukleus(String str) {
        return this.nukleiByName.get(str);
    }

    public <T extends Nukleus> T nukleus(Class<T> cls) {
        Stream<Nukleus> stream = this.nukleiByName.values().stream();
        Objects.requireNonNull(cls);
        Stream<Nukleus> filter = stream.filter((v1) -> {
            return r1.isInstance(v1);
        });
        Objects.requireNonNull(cls);
        return (T) filter.map((v1) -> {
            return r1.cast(v1);
        }).findFirst().orElse(null);
    }

    public LabelManager labels() {
        return this.labels;
    }

    public ElektronAgent supplyElektronAgent(int i, int i2, ExecutorService executorService, Function<String, BitSet> function) {
        ReaktorConfiguration reaktorConfiguration = this.config;
        LabelManager labelManager = this.labels;
        Router router = this.router;
        Objects.requireNonNull(router);
        ElektronAgent elektronAgent = new ElektronAgent(i, i2, reaktorConfiguration, labelManager, executorService, function, router::readonlyRoutesBuffer, this.supplyAgentBuilder);
        this.elektronAgents.add(elektronAgent);
        return elektronAgent;
    }

    public void assign(Nukleus nukleus) {
        this.nukleiByName.putIfAbsent(nukleus.name(), nukleus);
    }

    public void unassign(Nukleus nukleus) {
        this.nukleiByName.remove(nukleus.name());
    }

    public boolean isEmpty() {
        return this.nukleiByName.isEmpty();
    }

    Router router() {
        return this.router;
    }

    public void onRouteable(long j, Nukleus nukleus) {
        this.elektronAgents.forEach(elektronAgent -> {
            elektronAgent.onRouteable(j, nukleus);
        });
    }

    public void onRouted(Nukleus nukleus, RouteKind routeKind, long j, OctetsFW octetsFW) {
        this.elektronAgents.forEach(elektronAgent -> {
            elektronAgent.onRouted(nukleus, routeKind, j, octetsFW);
        });
    }

    void onUnrouted(Nukleus nukleus, RouteKind routeKind, long j) {
        this.elektronAgents.forEach(elektronAgent -> {
            elektronAgent.onUnrouted(nukleus, routeKind, j);
        });
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.reaktor.internal.types.control.ErrorFW$Builder] */
    private void doError(long j) {
        ErrorFW build = this.errorRW.wrap2(this.sendBuffer, 0, this.sendBuffer.capacity()).correlationId(j).build();
        this.responseBuffer.transmit(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.reaktor.internal.types.control.RoutedFW$Builder] */
    private void doRouted(long j, long j2) {
        RoutedFW build = this.routedRW.wrap2(this.sendBuffer, 0, this.sendBuffer.capacity()).correlationId(j).routeId(j2).build();
        this.responseBuffer.transmit(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.reaktor.internal.types.control.UnroutedFW$Builder] */
    private void doUnrouted(long j) {
        UnroutedFW build = this.unroutedRW.wrap2(this.sendBuffer, 0, this.sendBuffer.capacity()).correlationId(j).build();
        this.responseBuffer.transmit(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.reaktor.internal.types.control.FrozenFW$Builder] */
    private void doFrozen(long j) {
        FrozenFW build = this.frozenRW.wrap2(this.sendBuffer, 0, this.sendBuffer.capacity()).correlationId(j).build();
        this.responseBuffer.transmit(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    private void handleCommand(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
        switch (i) {
            case 1:
                onRoute(this.routeRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3));
                return;
            case 2:
                onUnroute(this.unrouteRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3));
                return;
            case 3:
                onFreeze(this.freezeRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3));
                return;
            default:
                onUnrecognized(i, mutableDirectBuffer, i2, i3);
                return;
        }
    }

    private void onRoute(RouteFW routeFW) {
        long correlationId = routeFW.correlationId();
        Nukleus nukleus = nukleus(routeFW.nukleus().asString());
        try {
            RouteKind valueOf = RouteKind.valueOf(routeFW.role().get().ordinal());
            MessagePredicate routeHandler = nukleus.routeHandler(valueOf);
            if (routeHandler == null) {
                routeHandler = (i, directBuffer, i2, i3) -> {
                    return true;
                };
            }
            RouteFW generateRouteId = this.router.generateRouteId(routeFW);
            long correlationId2 = generateRouteId.correlationId();
            onRouteable(correlationId2, nukleus);
            if (this.router.doRoute(generateRouteId, routeHandler)) {
                onRouted(nukleus, valueOf, correlationId2, routeFW.extension());
                Thread.sleep(this.config.routedDelayMillis());
                doRouted(correlationId, correlationId2);
            } else {
                doError(correlationId);
            }
        } catch (Exception e) {
            doError(correlationId);
            LangUtil.rethrowUnchecked(e);
        }
    }

    private void onUnroute(UnrouteFW unrouteFW) {
        long correlationId = unrouteFW.correlationId();
        Nukleus nukleus = nukleus(unrouteFW.nukleus().asString());
        try {
            long routeId = unrouteFW.routeId();
            RouteKind replyRouteKind = replyRouteKind(routeId);
            MessagePredicate routeHandler = nukleus.routeHandler(replyRouteKind);
            if (routeHandler == null) {
                routeHandler = (i, directBuffer, i2, i3) -> {
                    return true;
                };
            }
            if (this.router.doUnroute(unrouteFW, routeHandler)) {
                onUnrouted(nukleus, replyRouteKind, routeId);
                doUnrouted(correlationId);
            } else {
                doError(correlationId);
            }
        } catch (Exception e) {
            doError(correlationId);
            LangUtil.rethrowUnchecked(e);
        }
    }

    private void onUnrecognized(int i, DirectBuffer directBuffer, int i2, int i3) {
        CommandFW wrap = this.commandRO.wrap(directBuffer, i2, i2 + i3);
        CommandHandler commandHandler = nukleus(wrap.nukleus().asString()).commandHandler(i);
        if (commandHandler == null) {
            doError(wrap.correlationId());
            return;
        }
        BroadcastTransmitter broadcastTransmitter = this.responseBuffer;
        Objects.requireNonNull(broadcastTransmitter);
        commandHandler.handle(directBuffer, i2, i3, broadcastTransmitter::transmit, this.sendBuffer);
    }

    private void onFreeze(FreezeFW freezeFW) {
        long correlationId = freezeFW.correlationId();
        unassign(nukleus(freezeFW.nukleus().asString()));
        doFrozen(correlationId);
    }

    private RouteKind replyRouteKind(long j) {
        return RouteKind.valueOf(((int) (j >> 28)) & 15);
    }
}
