package org.reaktivity.nukleus.kafka.internal;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.Configuration;
import org.reaktivity.nukleus.Nukleus;
import org.reaktivity.nukleus.NukleusBuilder;
import org.reaktivity.nukleus.NukleusFactorySpi;
import org.reaktivity.nukleus.function.MessagePredicate;
import org.reaktivity.nukleus.kafka.internal.stream.ClientStreamFactoryBuilder;
import org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;
import org.reaktivity.nukleus.kafka.internal.types.control.KafkaRouteExFW;
import org.reaktivity.nukleus.kafka.internal.types.control.RouteFW;
import org.reaktivity.nukleus.route.RouteKind;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/KafkaNukleusFactorySpi.class */
public final class KafkaNukleusFactorySpi implements NukleusFactorySpi, Nukleus {
    private static final MessagePredicate DEFAULT_ROUTE_HANDLER = (i, directBuffer, i2, i3) -> {
        return true;
    };
    private static final Consumer<BiFunction<String, Long, NetworkConnectionPool>> DEFAULT_CONNECT_POOL_FACTORY_CONSUMER = biFunction -> {
    };
    private final Map<String, Long2ObjectHashMap<NetworkConnectionPool>> connectionPools = new LinkedHashMap();
    private final RouteFW routeRO = new RouteFW();
    private final KafkaRouteExFW routeExRO = new KafkaRouteExFW();
    private List<RouteFW> routesToBootstrap = new ArrayList();
    private BiFunction<String, Long, NetworkConnectionPool> createNetworkConnectionPool;

    public String name() {
        return "kafka";
    }

    public Nukleus create(Configuration configuration, NukleusBuilder nukleusBuilder) {
        KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(configuration);
        Consumer<BiFunction<String, Long, NetworkConnectionPool>> consumer = DEFAULT_CONNECT_POOL_FACTORY_CONSUMER;
        MessagePredicate messagePredicate = DEFAULT_ROUTE_HANDLER;
        if (kafkaConfiguration.topicBootstrapEnabled()) {
            messagePredicate = this::handleRouteForBootstrap;
            consumer = biFunction -> {
                this.createNetworkConnectionPool = biFunction;
            };
        }
        return nukleusBuilder.streamFactory(RouteKind.CLIENT, new ClientStreamFactoryBuilder(kafkaConfiguration, this.connectionPools, consumer)).routeHandler(RouteKind.CLIENT, messagePredicate).inject(this).build();
    }

    public boolean handleRouteForBootstrap(int i, DirectBuffer directBuffer, int i2, int i3) {
        switch (i) {
            case 1:
                if (this.routeRO.wrap(directBuffer, i2, i2 + i3).extension().sizeof() > 0) {
                    DirectBuffer unsafeBuffer = new UnsafeBuffer(new byte[i3]);
                    directBuffer.getBytes(i2, unsafeBuffer, 0, i3);
                    this.routesToBootstrap.add(new RouteFW().wrap(unsafeBuffer, 0, i3));
                    break;
                }
                break;
        }
        return true;
    }

    public int process() {
        if (this.routesToBootstrap.isEmpty() || this.createNetworkConnectionPool == null) {
            return 0;
        }
        startTopicBootstrap(this.routesToBootstrap);
        this.routesToBootstrap.clear();
        return 0;
    }

    public void startTopicBootstrap(List<RouteFW> list) {
        for (RouteFW routeFW : list) {
            OctetsFW extension = routeFW.extension();
            if (extension.sizeof() > 0) {
                KafkaRouteExFW kafkaRouteExFW = this.routeExRO;
                Objects.requireNonNull(kafkaRouteExFW);
                String asString = ((KafkaRouteExFW) extension.get(kafkaRouteExFW::wrap)).topicName().asString();
                String asString2 = routeFW.target().asString();
                long targetRef = routeFW.targetRef();
                ((NetworkConnectionPool) this.connectionPools.computeIfAbsent(asString2, str -> {
                    return new Long2ObjectHashMap();
                }).computeIfAbsent(targetRef, j -> {
                    return this.createNetworkConnectionPool.apply(asString2, Long.valueOf(targetRef));
                })).doBootstrap(asString, i -> {
                    switch (i) {
                        case 3:
                            System.out.println(String.format("WARNING: bootstrap failed for topic \"%s\" with error \"unknown topic\"", asString));
                            return;
                        default:
                            throw new IllegalStateException(String.format("Received error code %d from Kafka while attempting to bootstrap topic \"%s\"", Integer.valueOf(i), asString));
                    }
                });
            }
        }
    }
}
