package org.reaktivity.nukleus.kafka.internal;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.LongFunction;
import org.agrona.DirectBuffer;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.kafka.internal.memory.CountingMemoryManager;
import org.reaktivity.nukleus.kafka.internal.memory.DefaultMemoryManager;
import org.reaktivity.nukleus.kafka.internal.memory.MemoryLayout;
import org.reaktivity.nukleus.kafka.internal.memory.MemoryManager;
import org.reaktivity.nukleus.kafka.internal.stream.KafkaError;
import org.reaktivity.nukleus.kafka.internal.stream.NetworkConnectionPool;
import org.reaktivity.nukleus.kafka.internal.types.KafkaHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.ListFW;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;
import org.reaktivity.nukleus.kafka.internal.types.control.KafkaRouteExFW;
import org.reaktivity.nukleus.kafka.internal.types.control.RouteFW;
import org.reaktivity.nukleus.kafka.internal.util.DelayedTaskScheduler;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/KafkaAgent.class */
final class KafkaAgent implements Agent {
    private final KafkaConfiguration config;
    private LongFunction<NetworkConnectionPool> connectionPoolFactory;
    private MemoryManager memoryManager;
    private MemoryLayout memoryLayout;
    private final RouteFW routeRO = new RouteFW();
    private final KafkaRouteExFW routeExRO = new KafkaRouteExFW();
    final Long2ObjectHashMap<NetworkConnectionPool> connectionPools = new Long2ObjectHashMap<>();
    final DelayedTaskScheduler scheduler = new DelayedTaskScheduler();
    private final List<RouteFW> routesToProcess = new CopyOnWriteArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaAgent(KafkaConfiguration kafkaConfiguration) {
        this.config = kafkaConfiguration;
    }

    public int doWork() throws Exception {
        if (!this.routesToProcess.isEmpty() && this.connectionPoolFactory != null) {
            processRoutes(this.routesToProcess);
            this.routesToProcess.clear();
        }
        return this.scheduler.process();
    }

    public void onClose() {
        if (this.memoryLayout != null) {
            this.memoryLayout.close();
        }
    }

    public String roleName() {
        return String.format("%s.agent", "kafka");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean handleRoute(int i, DirectBuffer directBuffer, int i2, int i3) {
        switch (i) {
            case 1:
                if (this.routeRO.wrap(directBuffer, i2, i2 + i3).extension().sizeof() > 0) {
                    DirectBuffer unsafeBuffer = new UnsafeBuffer(new byte[i3]);
                    directBuffer.getBytes(i2, unsafeBuffer, 0, i3);
                    this.routesToProcess.add(new RouteFW().wrap(unsafeBuffer, 0, i3));
                    break;
                }
                break;
        }
        return true;
    }

    private void processRoutes(List<RouteFW> list) {
        for (RouteFW routeFW : list) {
            OctetsFW extension = routeFW.extension();
            if (extension.sizeof() > 0) {
                KafkaRouteExFW kafkaRouteExFW = this.routeExRO;
                Objects.requireNonNull(kafkaRouteExFW);
                KafkaRouteExFW kafkaRouteExFW2 = (KafkaRouteExFW) extension.get(kafkaRouteExFW::wrap);
                String asString = kafkaRouteExFW2.topicName().asString();
                long correlationId = routeFW.correlationId();
                NetworkConnectionPool networkConnectionPool = (NetworkConnectionPool) this.connectionPools.computeIfAbsent(((int) (correlationId >> 32)) & 65535, j -> {
                    return this.connectionPoolFactory.apply(correlationId);
                });
                ListFW<KafkaHeaderFW> headers = kafkaRouteExFW2.headers();
                ListFW<KafkaHeaderFW> listFW = new ListFW<>(new KafkaHeaderFW());
                listFW.wrap(headers.buffer(), headers.offset(), headers.limit());
                networkConnectionPool.addRoute(asString, listFW, this.config.topicBootstrapEnabled(), this::onKafkaError);
            }
        }
    }

    private void onKafkaError(KafkaError kafkaError, String str) {
        switch (kafkaError) {
            case UNKNOWN_TOPIC_OR_PARTITION:
                System.out.println(String.format("WARNING: bootstrap failed for topic \"%s\" with error \"unknown topic\"", str));
                return;
            default:
                System.out.println(String.format("WARNING: bootstrap failed while getting metadata for topic \"%s\" with error code %d", str, kafkaError));
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setConnectionPoolFactory(LongFunction<NetworkConnectionPool> longFunction) {
        this.connectionPoolFactory = longFunction;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MemoryManager supplyMemoryManager(KafkaCounters kafkaCounters) {
        if (this.memoryManager == null) {
            this.memoryManager = createMemoryManager(kafkaCounters);
        }
        return this.memoryManager;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v15, types: [org.reaktivity.nukleus.kafka.internal.memory.MemoryManager] */
    private MemoryManager createMemoryManager(KafkaCounters kafkaCounters) {
        CountingMemoryManager countingMemoryManager;
        long messageCacheCapacity = this.config.messageCacheCapacity();
        if (messageCacheCapacity == 0) {
            countingMemoryManager = KafkaNukleusFactorySpi.OUT_OF_SPACE_MEMORY_MANAGER;
        } else {
            MemoryLayout build = new MemoryLayout.Builder().path(this.config.directory().resolve("kafka").resolve("memory0")).minimumBlockSize(this.config.messageCacheBlockCapacity()).capacity(messageCacheCapacity).create(true).build();
            this.memoryLayout = build;
            countingMemoryManager = new CountingMemoryManager(new DefaultMemoryManager(build, kafkaCounters), kafkaCounters.cacheBufferAcquires, kafkaCounters.cacheBufferReleases);
        }
        return countingMemoryManager;
    }
}
