package org.reaktivity.nukleus.kafka.internal;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.ToIntFunction;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.AtomicBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.Controller;
import org.reaktivity.nukleus.ControllerSpi;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.function.MessagePredicate;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;
import org.reaktivity.nukleus.kafka.internal.types.control.FreezeFW;
import org.reaktivity.nukleus.kafka.internal.types.control.KafkaRouteExFW;
import org.reaktivity.nukleus.kafka.internal.types.control.Role;
import org.reaktivity.nukleus.kafka.internal.types.control.RouteFW;
import org.reaktivity.nukleus.kafka.internal.types.control.UnrouteFW;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/KafkaController.class */
public final class KafkaController implements Controller {
    private static final int MAX_SEND_LENGTH = 1024;
    private final ControllerSpi controllerSpi;
    private final RouteFW.Builder routeRW = new RouteFW.Builder();
    private final UnrouteFW.Builder unrouteRW = new UnrouteFW.Builder();
    private final FreezeFW.Builder freezeRW = new FreezeFW.Builder();
    private final KafkaRouteExFW.Builder routeExRW = new KafkaRouteExFW.Builder();
    private final AtomicBuffer atomicBuffer = new UnsafeBuffer(ByteBuffer.allocateDirect(MAX_SEND_LENGTH).order(ByteOrder.nativeOrder()));

    public KafkaController(ControllerSpi controllerSpi) {
        this.controllerSpi = controllerSpi;
    }

    public int process() {
        return this.controllerSpi.doProcess();
    }

    public void close() throws Exception {
        this.controllerSpi.doClose();
    }

    public Class<KafkaController> kind() {
        return KafkaController.class;
    }

    public String name() {
        return "kafka";
    }

    public <T> T supplySource(String str, BiFunction<MessagePredicate, ToIntFunction<MessageConsumer>, T> biFunction) {
        return (T) this.controllerSpi.doSupplySource(str, biFunction);
    }

    public <T> T supplyTarget(String str, BiFunction<ToIntFunction<MessageConsumer>, MessagePredicate, T> biFunction) {
        return (T) this.controllerSpi.doSupplyTarget(str, biFunction);
    }

    public CompletableFuture<Long> routeServer(String str, long j, String str2, long j2, String str3) {
        return route(Role.SERVER, str, j, str2, j2, str3, Collections.emptyMap());
    }

    public CompletableFuture<Long> routeClient(String str, long j, String str2, long j2, String str3) {
        return route(Role.CLIENT, str, j, str2, j2, str3, Collections.emptyMap());
    }

    public CompletableFuture<Long> routeClient(String str, long j, String str2, long j2, String str3, Map<String, String> map) {
        return route(Role.CLIENT, str, j, str2, j2, str3, map);
    }

    public CompletableFuture<Void> unrouteServer(String str, long j, String str2, long j2, String str3) {
        return unroute(Role.SERVER, str, j, str2, j2, str3, Collections.emptyMap());
    }

    public CompletableFuture<Void> unrouteClient(String str, long j, String str2, long j2, String str3, Map<String, String> map) {
        return unroute(Role.CLIENT, str, j, str2, j2, str3, map);
    }

    public CompletableFuture<Void> unrouteClient(String str, long j, String str2, long j2, String str3) {
        return unroute(Role.CLIENT, str, j, str2, j2, str3, Collections.emptyMap());
    }

    /* JADX WARN: Type inference failed for: r0v5, types: [org.reaktivity.nukleus.kafka.internal.types.control.FreezeFW$Builder] */
    public CompletableFuture<Void> freeze() {
        FreezeFW build = this.freezeRW.wrap2((MutableDirectBuffer) this.atomicBuffer, 0, this.atomicBuffer.capacity()).correlationId(this.controllerSpi.nextCorrelationId()).build();
        return this.controllerSpi.doFreeze(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    public long count(String str) {
        return this.controllerSpi.doCount(str);
    }

    private Consumer<OctetsFW.Builder> extension(String str, Map<String, String> map) {
        return str != null ? builder -> {
            builder.set((mutableDirectBuffer, i, i2) -> {
                return this.routeExRW.wrap2(mutableDirectBuffer, i, i2).topicName(str).headers(builder -> {
                    map.forEach((str2, str3) -> {
                        builder.item(builder -> {
                            builder.key(str2).value(builder -> {
                                builder.put(str3.getBytes(StandardCharsets.UTF_8));
                            });
                        });
                    });
                }).build().sizeof();
            });
        } : builder2 -> {
        };
    }

    /* JADX WARN: Type inference failed for: r0v5, types: [org.reaktivity.nukleus.kafka.internal.types.control.RouteFW$Builder] */
    private CompletableFuture<Long> route(Role role, String str, long j, String str2, long j2, String str3, Map<String, String> map) {
        RouteFW build = this.routeRW.wrap2((MutableDirectBuffer) this.atomicBuffer, 0, this.atomicBuffer.capacity()).correlationId(this.controllerSpi.nextCorrelationId()).role(builder -> {
            builder.set(role);
        }).source(str).sourceRef(j).target(str2).targetRef(j2).extension(extension(str3, map)).build();
        return this.controllerSpi.doRoute(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v5, types: [org.reaktivity.nukleus.kafka.internal.types.control.UnrouteFW$Builder] */
    private CompletableFuture<Void> unroute(Role role, String str, long j, String str2, long j2, String str3, Map<String, String> map) {
        UnrouteFW build = this.unrouteRW.wrap2((MutableDirectBuffer) this.atomicBuffer, 0, this.atomicBuffer.capacity()).correlationId(this.controllerSpi.nextCorrelationId()).role(builder -> {
            builder.set(role);
        }).source(str).sourceRef(j).target(str2).targetRef(j2).extension(extension(str3, map)).build();
        return this.controllerSpi.doUnroute(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }
}
