package org.reaktivity.k3po.nukleus.ext.internal.behavior;

import java.nio.file.Path;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import org.agrona.CloseHelper;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.ArrayUtil;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.MessageHandler;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.Channels;
import org.reaktivity.k3po.nukleus.ext.internal.behavior.layout.StreamsLayout;
import org.reaktivity.k3po.nukleus.ext.internal.util.function.LongLongFunction;
import org.reaktivity.nukleus.Configuration;

/* loaded from: input_file:org/reaktivity/k3po/nukleus/ext/internal/behavior/NukleusSource.class */
public final class NukleusSource implements AutoCloseable {
    private final Configuration config;
    private final Path streamsDirectory;
    private final String sourceName;
    private final MutableDirectBuffer writeBuffer;
    private final NukleusStreamFactory streamFactory;
    private final LongFunction<NukleusCorrelation> correlateEstablished;
    private final BiFunction<String, String, NukleusTarget> supplyTarget;
    private final LongSupplier supplyTimestamp;
    private final LongSupplier supplyTrace;
    private final Long2ObjectHashMap<Long2ObjectHashMap<NukleusServerChannel>> routesByRefAndAuth = new Long2ObjectHashMap<>();
    private final Long2ObjectHashMap<MessageHandler> streamsById = new Long2ObjectHashMap<>();
    private final Map<String, NukleusPartition> partitionsByName = new LinkedHashMap();
    private NukleusPartition[] partitions = new NukleusPartition[0];

    public NukleusSource(Configuration configuration, Path path, String str, MutableDirectBuffer mutableDirectBuffer, LongFunction<NukleusCorrelation> longFunction, BiFunction<String, String, NukleusTarget> biFunction, LongSupplier longSupplier, LongSupplier longSupplier2) {
        this.config = configuration;
        this.streamsDirectory = path;
        this.sourceName = str;
        this.writeBuffer = mutableDirectBuffer;
        Long2ObjectHashMap<MessageHandler> long2ObjectHashMap = this.streamsById;
        Objects.requireNonNull(long2ObjectHashMap);
        this.streamFactory = new NukleusStreamFactory(long2ObjectHashMap::remove);
        this.correlateEstablished = longFunction;
        this.supplyTarget = biFunction;
        this.supplyTimestamp = longSupplier;
        this.supplyTrace = longSupplier2;
    }

    public String toString() {
        return String.format("%s [%s/%s[#...]]", getClass().getSimpleName(), this.streamsDirectory, this.sourceName);
    }

    public void doRoute(long j, long j2, NukleusServerChannel nukleusServerChannel) {
        ((Long2ObjectHashMap) this.routesByRefAndAuth.computeIfAbsent(j, j3 -> {
            return new Long2ObjectHashMap();
        })).put(j2, nukleusServerChannel);
    }

    public void doUnroute(long j, long j2, NukleusServerChannel nukleusServerChannel) {
        Long2ObjectHashMap long2ObjectHashMap = (Long2ObjectHashMap) this.routesByRefAndAuth.get(j);
        if (long2ObjectHashMap == null || long2ObjectHashMap.remove(j2) == null || !long2ObjectHashMap.isEmpty()) {
            return;
        }
        this.routesByRefAndAuth.remove(j);
    }

    public void doAbortInput(final NukleusChannel nukleusChannel, final ChannelFuture channelFuture) {
        ChannelFuture beginInputFuture = nukleusChannel.beginInputFuture();
        if (beginInputFuture.isSuccess()) {
            doAbortInputAfterBeginReply(nukleusChannel, channelFuture);
        } else {
            beginInputFuture.addListener(new ChannelFutureListener() { // from class: org.reaktivity.k3po.nukleus.ext.internal.behavior.NukleusSource.1
                public void operationComplete(ChannelFuture channelFuture2) throws Exception {
                    if (channelFuture2.isSuccess()) {
                        NukleusSource.this.doAbortInputAfterBeginReply(nukleusChannel, channelFuture);
                    } else {
                        channelFuture.setFailure(channelFuture2.getCause());
                    }
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doAbortInputAfterBeginReply(NukleusChannel nukleusChannel, ChannelFuture channelFuture) {
        NukleusPartition findPartition = findPartition(nukleusChannel);
        if (findPartition == null) {
            channelFuture.setFailure(new ChannelException("Partition not found for " + nukleusChannel));
            return;
        }
        findPartition.doReset(nukleusChannel.sourceId());
        channelFuture.setSuccess();
        if (nukleusChannel.setReadAborted() && nukleusChannel.setReadClosed()) {
            Channels.fireChannelDisconnected(nukleusChannel);
            Channels.fireChannelUnbound(nukleusChannel);
            Channels.fireChannelClosed(nukleusChannel);
        }
    }

    public NukleusPartition supplyPartition(String str) {
        return this.partitionsByName.computeIfAbsent(str, this::newPartition);
    }

    public int process() {
        int i = 0;
        for (int i2 = 0; i2 < this.partitions.length; i2++) {
            i += this.partitions[i2].process();
        }
        return i;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        Iterator<NukleusPartition> it = this.partitionsByName.values().iterator();
        while (it.hasNext()) {
            CloseHelper.quietClose(it.next());
        }
    }

    private NukleusPartition findPartition(NukleusChannel nukleusChannel) {
        return this.partitionsByName.get(nukleusChannel.m5getLocalAddress().getSenderName());
    }

    private NukleusPartition newPartition(String str) {
        Path resolve = this.streamsDirectory.resolve(str);
        StreamsLayout build = new StreamsLayout.Builder().path(resolve).streamsCapacity(this.config.streamsBufferCapacity()).throttleCapacity(this.config.throttleBufferCapacity()).readonly(false).build();
        LongLongFunction longLongFunction = (j, j2) -> {
            return (NukleusServerChannel) ((Long2ObjectHashMap) this.routesByRefAndAuth.computeIfAbsent(j, j -> {
                return new Long2ObjectHashMap();
            })).get(j2);
        };
        Long2ObjectHashMap<MessageHandler> long2ObjectHashMap = this.streamsById;
        Objects.requireNonNull(long2ObjectHashMap);
        LongFunction longFunction = long2ObjectHashMap::get;
        Long2ObjectHashMap<MessageHandler> long2ObjectHashMap2 = this.streamsById;
        Objects.requireNonNull(long2ObjectHashMap2);
        NukleusPartition nukleusPartition = new NukleusPartition(resolve, build, longLongFunction, longFunction, (v1, v2) -> {
            r6.put(v1, v2);
        }, this.writeBuffer, this.streamFactory, this.correlateEstablished, this.supplyTarget, this.supplyTimestamp, this.supplyTrace);
        this.partitions = (NukleusPartition[]) ArrayUtil.add(this.partitions, nukleusPartition);
        return nukleusPartition;
    }
}
