package org.reaktivity.reaktor.test.internal.k3po.ext.behavior;

import java.nio.file.Path;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.Channels;
import org.kaazing.k3po.driver.internal.behavior.handler.RejectedHandler;
import org.reaktivity.reaktor.internal.budget.DefaultBudgetCreditor;
import org.reaktivity.reaktor.internal.router.BudgetId;
import org.reaktivity.reaktor.internal.types.stream.DataFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.behavior.layout.StreamsLayout;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.OctetsFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.control.ErrorFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.stream.BeginFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.types.stream.FrameFW;
import org.reaktivity.reaktor.test.internal.k3po.ext.util.function.LongLongFunction;
import org.reaktivity.reaktor.test.internal.k3po.ext.util.function.LongObjectBiConsumer;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/reaktivity/reaktor/test/internal/k3po/ext/behavior/NukleusPartition.class */
public final class NukleusPartition implements AutoCloseable {
    private final LabelManager labels;
    private final Path streamsPath;
    private final int scopeIndex;
    private final StreamsLayout layout;
    private final RingBuffer streamsBuffer;
    private final LongLongFunction<NukleusServerChannel> lookupRoute;
    private final LongFunction<MessageHandler> lookupStream;
    private final LongFunction<MessageHandler> lookupThrottle;
    private final LongObjectBiConsumer<MessageHandler> registerStream;
    private final NukleusStreamFactory streamFactory;
    private final LongFunction<NukleusCorrelation> correlateEstablished;
    private final LongLongFunction<NukleusTarget> supplySender;
    private final IntFunction<NukleusTarget> supplyTarget;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final FrameFW frameRO = new FrameFW();
    private final BeginFW beginRO = new BeginFW();
    private final DataFW dataRO = new DataFW();
    private final MessageHandler streamHandler = this::handleStream;

    /* JADX INFO: Access modifiers changed from: package-private */
    public NukleusPartition(LabelManager labelManager, Path path, int i, StreamsLayout streamsLayout, LongLongFunction<NukleusServerChannel> longLongFunction, LongFunction<MessageHandler> longFunction, LongObjectBiConsumer<MessageHandler> longObjectBiConsumer, LongFunction<MessageHandler> longFunction2, NukleusStreamFactory nukleusStreamFactory, LongFunction<NukleusCorrelation> longFunction3, LongLongFunction<NukleusTarget> longLongFunction2, IntFunction<NukleusTarget> intFunction) {
        this.labels = labelManager;
        this.streamsPath = path;
        this.scopeIndex = i;
        this.layout = streamsLayout;
        this.streamsBuffer = streamsLayout.streamsBuffer();
        this.lookupRoute = longLongFunction;
        this.lookupStream = longFunction;
        this.lookupThrottle = longFunction2;
        this.registerStream = longObjectBiConsumer;
        this.streamFactory = nukleusStreamFactory;
        this.correlateEstablished = longFunction3;
        this.supplySender = longLongFunction2;
        this.supplyTarget = intFunction;
    }

    public int process() {
        return this.streamsBuffer.read(this.streamHandler);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.layout.close();
    }

    public String toString() {
        return String.format("%s [%s]", getClass().getSimpleName(), this.streamsPath);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doSystemWindow(NukleusChannel nukleusChannel, long j) {
        int pendingSharedBudget = nukleusChannel.pendingSharedBudget();
        if (pendingSharedBudget != 0) {
            long creditorId = nukleusChannel.creditorId();
            if (!$assertionsDisabled && creditorId == 0) {
                throw new AssertionError();
            }
            this.supplyTarget.apply(BudgetId.ownerIndex(creditorId)).doSystemWindow(j, creditorId, pendingSharedBudget);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int scopeIndex() {
        return this.scopeIndex;
    }

    private void handleStream(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
        long streamId = this.frameRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3).streamId();
        if ((i & ErrorFW.TYPE_ID) != 0) {
            MessageHandler apply = this.lookupThrottle.apply(streamId);
            if (apply != null) {
                apply.onMessage(i, mutableDirectBuffer, i2, i3);
                return;
            }
            return;
        }
        MessageHandler apply2 = this.lookupStream.apply(streamId);
        if (apply2 != null) {
            apply2.onMessage(i, mutableDirectBuffer, i2, i3);
        } else {
            handleUnrecognized(i, mutableDirectBuffer, i2, i3);
        }
    }

    private void handleUnrecognized(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
        switch (i) {
            case 1:
                handleBegin(this.beginRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3));
                return;
            case 2:
                DataFW wrap = this.dataRO.wrap(mutableDirectBuffer, i2, i2 + i3);
                long traceId = wrap.traceId();
                long budgetId = wrap.budgetId();
                if (budgetId != 0) {
                    this.supplyTarget.apply(BudgetId.ownerIndex(budgetId)).doSystemWindow(traceId, budgetId, wrap.reserved());
                    return;
                }
                return;
            default:
                return;
        }
    }

    private void handleBegin(BeginFW beginFW) {
        long routeId = beginFW.routeId();
        long streamId = beginFW.streamId();
        long traceId = beginFW.traceId();
        long authorization = beginFW.authorization();
        if ((streamId & 1) == 0) {
            handleBeginReply(beginFW);
            return;
        }
        NukleusServerChannel apply = this.lookupRoute.apply(routeId, authorization);
        if (apply != null) {
            handleBeginInitial(beginFW, apply);
        } else {
            this.supplySender.apply(routeId, streamId).doReset(routeId, streamId, traceId);
        }
    }

    private void handleBeginInitial(BeginFW beginFW, NukleusServerChannel nukleusServerChannel) {
        long routeId = beginFW.routeId();
        long streamId = beginFW.streamId();
        long traceId = beginFW.traceId();
        NukleusChildChannel doAccept = doAccept(nukleusServerChannel, routeId, streamId, streamId & (-2));
        NukleusTarget apply = this.supplySender.apply(routeId, streamId);
        if (doAccept.getPipeline().get(RejectedHandler.class) != null) {
            OctetsFW extension = beginFW.extension();
            int sizeof = extension.sizeof();
            if (sizeof != 0) {
                DirectBuffer buffer = extension.buffer();
                int offset = extension.offset();
                byte[] bArr = new byte[sizeof];
                buffer.getBytes(offset, bArr);
                doAccept.readExtBuffer(NukleusExtensionKind.BEGIN).writeBytes(bArr);
            }
            doAccept.setWriteClosed();
            Channels.fireChannelBound(doAccept, doAccept.m8getLocalAddress());
            apply.doReset(routeId, streamId, traceId);
            doAccept.setReadClosed();
            return;
        }
        ChannelFuture future = Channels.future(doAccept);
        ChannelFuture future2 = Channels.future(doAccept);
        MessageHandler newStream = this.streamFactory.newStream(doAccept, apply, future);
        this.registerStream.accept(streamId, (long) newStream);
        newStream.onMessage(beginFW.typeId(), beginFW.buffer(), beginFW.offset(), beginFW.sizeof());
        Channels.fireChannelBound(doAccept, doAccept.m8getLocalAddress());
        apply.doPrepareReply(doAccept, future2, future);
        switch (doAccept.getConfig().getTransmission()) {
            case DUPLEX:
                apply.doBeginReply(doAccept);
                break;
            default:
                future2.setSuccess();
                break;
        }
        Channels.fireChannelConnected(doAccept, doAccept.m7getRemoteAddress());
    }

    private void handleBeginReply(BeginFW beginFW) {
        long routeId = beginFW.routeId();
        long streamId = beginFW.streamId();
        long traceId = beginFW.traceId();
        NukleusCorrelation apply = this.correlateEstablished.apply(streamId);
        NukleusTarget apply2 = this.supplySender.apply(routeId, streamId);
        if (apply == null) {
            apply2.doReset(routeId, streamId, traceId);
            return;
        }
        ChannelFuture correlatedFuture = apply.correlatedFuture();
        MessageHandler newStream = this.streamFactory.newStream(correlatedFuture.getChannel(), apply2, correlatedFuture);
        this.registerStream.accept(streamId, (long) newStream);
        newStream.onMessage(beginFW.typeId(), beginFW.buffer(), beginFW.offset(), beginFW.sizeof());
    }

    private NukleusChildChannel doAccept(NukleusServerChannel nukleusServerChannel, long j, long j2, long j3) {
        try {
            NukleusServerChannelConfig config = nukleusServerChannel.getConfig();
            ChannelPipeline pipeline = config.getPipelineFactory().getPipeline();
            NukleusChannelAddress m20getLocalAddress = nukleusServerChannel.m20getLocalAddress();
            NukleusChannelAddress newReplyToAddress = m20getLocalAddress.newReplyToAddress(this.labels.lookupLabel(((int) (j >> 48)) & 65535));
            NukleusChildChannel nukleusChildChannel = new NukleusChildChannel(nukleusServerChannel, nukleusServerChannel.getFactory(), pipeline, new NukleusChildChannelSink(), j2, j3);
            NukleusChannelConfig config2 = nukleusChildChannel.getConfig();
            config2.setBufferFactory(config.getBufferFactory());
            config2.setTransmission(config.getTransmission());
            config2.setThrottle(config.getThrottle());
            config2.setWindow(config.getWindow());
            config2.setBudgetId(config.getBudgetId());
            config2.setPadding(config.getPadding());
            config2.setAlignment(config.getAlignment());
            config2.setCapabilities(config.getCapabilities());
            if (config2.getTransmission() == NukleusTransmission.SIMPLEX) {
                nukleusChildChannel.setWriteClosed();
            }
            nukleusChildChannel.routeId(j);
            nukleusChildChannel.setLocalAddress(m20getLocalAddress);
            nukleusChildChannel.setRemoteAddress(newReplyToAddress);
            long budgetId = config2.getBudgetId();
            if (budgetId != 0) {
                long budgetMask = budgetId | BudgetId.budgetMask(this.scopeIndex);
                DefaultBudgetCreditor supplyCreditor = nukleusServerChannel.reaktor.supplyCreditor(nukleusChildChannel);
                nukleusChildChannel.setCreditor(supplyCreditor, budgetMask);
                supplyCreditor.creditById(0L, budgetId, config2.getSharedWindow());
            }
            return nukleusChildChannel;
        } catch (Exception e) {
            LangUtil.rethrowUnchecked(e);
            return null;
        }
    }

    static {
        $assertionsDisabled = !NukleusPartition.class.desiredAssertionStatus();
    }
}
