package org.reaktivity.nukleus.kafka.internal;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongSupplier;
import org.agrona.DirectBuffer;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.Configuration;
import org.reaktivity.nukleus.Nukleus;
import org.reaktivity.nukleus.NukleusBuilder;
import org.reaktivity.nukleus.NukleusFactorySpi;
import org.reaktivity.nukleus.kafka.internal.memory.CountingMemoryManager;
import org.reaktivity.nukleus.kafka.internal.memory.DefaultMemoryManager;
import org.reaktivity.nukleus.kafka.internal.memory.MemoryLayout;
import org.reaktivity.nukleus.kafka.internal.memory.MemoryManager;
import org.reaktivity.nukleus.kafka.internal.stream.ClientStreamFactoryBuilder;
import org.reaktivity.nukleus.kafka.internal.stream.KafkaError;
import org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool;
import org.reaktivity.nukleus.kafka.internal.types.KafkaHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.ListFW;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;
import org.reaktivity.nukleus.kafka.internal.types.control.KafkaRouteExFW;
import org.reaktivity.nukleus.kafka.internal.types.control.RouteFW;
import org.reaktivity.nukleus.route.RouteKind;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/KafkaNukleusFactorySpi.class */
public final class KafkaNukleusFactorySpi implements NukleusFactorySpi, Nukleus {
    public static final String MESSAGE_CACHE_BUFFER_ACQUIRES = "message.cache.buffer.acquires";
    public static final String HISTORICAL_FETCHES = "historical.fetches";
    private static final String MESSAGE_CACHE_BUFFER_RELEASES = "message.cache.buffer.releases";
    private static final MemoryManager OUT_OF_SPACE_MEMORY_MANAGER = new MemoryManager() { // from class: org.reaktivity.nukleus.kafka.internal.KafkaNukleusFactorySpi.1
        @Override // org.reaktivity.nukleus.kafka.internal.memory.MemoryManager
        public long acquire(int i) {
            return -1L;
        }

        @Override // org.reaktivity.nukleus.kafka.internal.memory.MemoryManager
        public long resolve(long j) {
            throw new UnsupportedOperationException();
        }

        @Override // org.reaktivity.nukleus.kafka.internal.memory.MemoryManager
        public void release(long j, int i) {
            throw new UnsupportedOperationException();
        }
    };
    private MemoryLayout memoryLayout;
    private BiFunction<String, Long, NetworkConnectionPool> connectionPoolFactory;
    private KafkaConfiguration kafkaConfig;
    private MemoryManager memoryManager = null;
    private final Map<String, Long2ObjectHashMap<NetworkConnectionPool>> connectionPools = new LinkedHashMap();
    private final RouteFW routeRO = new RouteFW();
    private final KafkaRouteExFW routeExRO = new KafkaRouteExFW();
    private List<RouteFW> routesToProcess = new ArrayList();

    public String name() {
        return "kafka";
    }

    public Nukleus create(Configuration configuration, NukleusBuilder nukleusBuilder) {
        this.kafkaConfig = new KafkaConfiguration(configuration);
        return nukleusBuilder.streamFactory(RouteKind.CLIENT, new ClientStreamFactoryBuilder(this.kafkaConfig, this::supplyMemoryManager, this.connectionPools, this::setConnectionPoolFactory)).routeHandler(RouteKind.CLIENT, this::handleRoute).inject(this).build();
    }

    private MemoryManager supplyMemoryManager(Function<String, LongSupplier> function) {
        if (this.memoryManager == null) {
            this.memoryManager = createMemoryManager(function);
        }
        return this.memoryManager;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v15, types: [org.reaktivity.nukleus.kafka.internal.memory.MemoryManager] */
    private MemoryManager createMemoryManager(Function<String, LongSupplier> function) {
        CountingMemoryManager countingMemoryManager;
        long messageCacheCapacity = this.kafkaConfig.messageCacheCapacity();
        if (messageCacheCapacity == 0) {
            countingMemoryManager = OUT_OF_SPACE_MEMORY_MANAGER;
        } else {
            MemoryLayout build = new MemoryLayout.Builder().path(this.kafkaConfig.directory().resolve("kafka").resolve("memory0")).minimumBlockSize(this.kafkaConfig.messageCacheBlockCapacity()).capacity(messageCacheCapacity).create(true).build();
            this.memoryLayout = build;
            countingMemoryManager = new CountingMemoryManager(new DefaultMemoryManager(build), function.apply(MESSAGE_CACHE_BUFFER_ACQUIRES), function.apply(MESSAGE_CACHE_BUFFER_RELEASES));
        }
        return countingMemoryManager;
    }

    public boolean handleRoute(int i, DirectBuffer directBuffer, int i2, int i3) {
        switch (i) {
            case 1:
                if (this.routeRO.wrap(directBuffer, i2, i2 + i3).extension().sizeof() > 0) {
                    DirectBuffer unsafeBuffer = new UnsafeBuffer(new byte[i3]);
                    directBuffer.getBytes(i2, unsafeBuffer, 0, i3);
                    this.routesToProcess.add(new RouteFW().wrap(unsafeBuffer, 0, i3));
                    break;
                }
                break;
        }
        return true;
    }

    public void close() throws Exception {
        if (this.memoryLayout != null) {
            this.memoryLayout.close();
        }
    }

    public int process() {
        if (this.routesToProcess.isEmpty() || this.connectionPoolFactory == null) {
            return 0;
        }
        processRoutes(this.routesToProcess);
        this.routesToProcess.clear();
        return 0;
    }

    public void processRoutes(List<RouteFW> list) {
        for (RouteFW routeFW : list) {
            OctetsFW extension = routeFW.extension();
            if (extension.sizeof() > 0) {
                KafkaRouteExFW kafkaRouteExFW = this.routeExRO;
                Objects.requireNonNull(kafkaRouteExFW);
                KafkaRouteExFW kafkaRouteExFW2 = (KafkaRouteExFW) extension.get(kafkaRouteExFW::wrap);
                String asString = kafkaRouteExFW2.topicName().asString();
                String asString2 = routeFW.target().asString();
                long targetRef = routeFW.targetRef();
                NetworkConnectionPool networkConnectionPool = (NetworkConnectionPool) this.connectionPools.computeIfAbsent(asString2, str -> {
                    return new Long2ObjectHashMap();
                }).computeIfAbsent(targetRef, j -> {
                    return this.connectionPoolFactory.apply(asString2, Long.valueOf(targetRef));
                });
                ListFW<KafkaHeaderFW> headers = kafkaRouteExFW2.headers();
                ListFW<KafkaHeaderFW> listFW = new ListFW<>(new KafkaHeaderFW());
                listFW.wrap(headers.buffer(), headers.offset(), headers.limit());
                networkConnectionPool.addRoute(asString, listFW, this.kafkaConfig.topicBootstrapEnabled(), this::onKafkaError);
            }
        }
    }

    private void onKafkaError(KafkaError kafkaError, String str) {
        switch (kafkaError) {
            case UNKNOWN_TOPIC_OR_PARTITION:
                System.out.println(String.format("WARNING: bootstrap failed for topic \"%s\" with error \"unknown topic\"", str));
                return;
            default:
                System.out.println(String.format("WARNING: bootstrap failed while getting metadata for topic \"%s\" with error code %d", str, kafkaError));
                return;
        }
    }

    private void setConnectionPoolFactory(BiFunction<String, Long, NetworkConnectionPool> biFunction) {
        this.connectionPoolFactory = biFunction;
    }
}
