package org.reaktivity.k3po.nukleus.ext.internal.behavior;

import java.nio.file.Path;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.LongFunction;
import org.agrona.CloseHelper;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.ArrayUtil;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.MessageHandler;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.reaktivity.k3po.nukleus.ext.internal.behavior.layout.StreamsLayout;
import org.reaktivity.nukleus.Configuration;

/* loaded from: input_file:org/reaktivity/k3po/nukleus/ext/internal/behavior/NukleusSource.class */
public final class NukleusSource implements AutoCloseable {
    private final Configuration config;
    private final Path streamsDirectory;
    private final String sourceName;
    private final MutableDirectBuffer writeBuffer;
    private final NukleusStreamFactory streamFactory;
    private final LongFunction<NukleusCorrelation> correlateEstablished;
    private final BiFunction<String, String, NukleusTarget> supplyTarget;
    private final Long2ObjectHashMap<NukleusServerChannel> routesByRef = new Long2ObjectHashMap<>();
    private final Long2ObjectHashMap<MessageHandler> streamsById = new Long2ObjectHashMap<>();
    private final Map<String, NukleusPartition> partitionsByName = new LinkedHashMap();
    private NukleusPartition[] partitions = new NukleusPartition[0];

    public NukleusSource(Configuration configuration, Path path, String str, MutableDirectBuffer mutableDirectBuffer, LongFunction<NukleusCorrelation> longFunction, BiFunction<String, String, NukleusTarget> biFunction) {
        this.config = configuration;
        this.streamsDirectory = path;
        this.sourceName = str;
        this.writeBuffer = mutableDirectBuffer;
        Long2ObjectHashMap<MessageHandler> long2ObjectHashMap = this.streamsById;
        long2ObjectHashMap.getClass();
        this.streamFactory = new NukleusStreamFactory(long2ObjectHashMap::remove);
        this.correlateEstablished = longFunction;
        this.supplyTarget = biFunction;
    }

    public String toString() {
        return String.format("%s [%s/%s[#...]]", getClass().getSimpleName(), this.streamsDirectory, this.sourceName);
    }

    public void doRoute(long j, NukleusServerChannel nukleusServerChannel) {
        this.routesByRef.putIfAbsent(Long.valueOf(j), nukleusServerChannel);
    }

    public void doUnroute(long j, NukleusServerChannel nukleusServerChannel) {
        this.routesByRef.remove(Long.valueOf(j), nukleusServerChannel);
    }

    public void doAbortInput(NukleusChannel nukleusChannel, ChannelFuture channelFuture) {
        NukleusPartition findPartition = findPartition(nukleusChannel);
        if (findPartition == null) {
            channelFuture.setFailure(new ChannelException("Partition not found for " + nukleusChannel));
        } else {
            findPartition.doReset(nukleusChannel.sourceId());
            channelFuture.setSuccess();
        }
    }

    public void onReadable(String str) {
        this.partitionsByName.computeIfAbsent(str, this::newPartition);
    }

    public int process() {
        int i = 0;
        for (int i2 = 0; i2 < this.partitions.length; i2++) {
            i += this.partitions[i2].process();
        }
        return i;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        Iterator<NukleusPartition> it = this.partitionsByName.values().iterator();
        while (it.hasNext()) {
            CloseHelper.quietClose(it.next());
        }
    }

    private NukleusPartition findPartition(NukleusChannel nukleusChannel) {
        NukleusChannelAddress m4getLocalAddress = nukleusChannel.m4getLocalAddress();
        String senderName = m4getLocalAddress.getSenderName();
        NukleusPartition nukleusPartition = this.partitionsByName.get(m4getLocalAddress.getSenderPartition());
        if (nukleusPartition == null) {
            nukleusPartition = (NukleusPartition) this.partitionsByName.entrySet().stream().filter(entry -> {
                return ((String) entry.getKey()).startsWith(senderName + "#");
            }).findFirst().map(entry2 -> {
                return (NukleusPartition) entry2.getValue();
            }).orElse(null);
        }
        return nukleusPartition;
    }

    private NukleusPartition newPartition(String str) {
        Path resolve = this.streamsDirectory.resolve(str);
        StreamsLayout build = new StreamsLayout.Builder().path(resolve).streamsCapacity(this.config.streamsBufferCapacity()).throttleCapacity(this.config.throttleBufferCapacity()).readonly(true).build();
        Long2ObjectHashMap<NukleusServerChannel> long2ObjectHashMap = this.routesByRef;
        long2ObjectHashMap.getClass();
        LongFunction longFunction = long2ObjectHashMap::get;
        Long2ObjectHashMap<MessageHandler> long2ObjectHashMap2 = this.streamsById;
        long2ObjectHashMap2.getClass();
        LongFunction longFunction2 = long2ObjectHashMap2::get;
        Long2ObjectHashMap<MessageHandler> long2ObjectHashMap3 = this.streamsById;
        long2ObjectHashMap3.getClass();
        NukleusPartition nukleusPartition = new NukleusPartition(resolve, build, longFunction, longFunction2, (v1, v2) -> {
            r6.put(v1, v2);
        }, this.writeBuffer, this.streamFactory, this.correlateEstablished, this.supplyTarget);
        this.partitions = (NukleusPartition[]) ArrayUtil.add(this.partitions, nukleusPartition);
        return nukleusPartition;
    }
}
