package org.reaktivity.k3po.nukleus.ext.internal.behavior;

import java.net.SocketAddress;
import java.nio.file.Path;
import java.util.Deque;
import java.util.Objects;
import java.util.Random;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.LongFunction;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.DownstreamMessageEvent;
import org.jboss.netty.channel.MessageEvent;
import org.kaazing.k3po.driver.internal.netty.channel.ChannelAddress;
import org.kaazing.k3po.driver.internal.netty.channel.Channels;
import org.reaktivity.k3po.nukleus.ext.internal.behavior.layout.Layout;
import org.reaktivity.k3po.nukleus.ext.internal.behavior.layout.StreamsLayout;
import org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.AbortFW;
import org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.BeginFW;
import org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.DataFW;
import org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.EndFW;
import org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.FrameFW;
import org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.ResetFW;
import org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.WindowFW;
import org.reaktivity.k3po.nukleus.ext.internal.util.function.LongObjectBiConsumer;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/reaktivity/k3po/nukleus/ext/internal/behavior/NukleusTarget.class */
public final class NukleusTarget implements AutoCloseable {
    private final Path partitionPath;
    private final Layout layout;
    private final RingBuffer streamsBuffer;
    private final RingBuffer throttleBuffer;
    private final LongFunction<MessageHandler> lookupThrottle;
    private final LongObjectBiConsumer<MessageHandler> registerThrottle;
    private final LongConsumer unregisterThrottle;
    private final MutableDirectBuffer writeBuffer;
    private final LongObjectBiConsumer<NukleusCorrelation> correlateNew;
    private final FrameFW frameRO = new FrameFW();
    private final BeginFW.Builder beginRW = new BeginFW.Builder();
    private final DataFW.Builder dataRW = new DataFW.Builder();
    private final EndFW.Builder endRW = new EndFW.Builder();
    private final AbortFW.Builder abortRW = new AbortFW.Builder();
    private final WindowFW windowRO = new WindowFW();
    private final ResetFW resetRO = new ResetFW();
    private final MutableDirectBuffer resetBuffer = new UnsafeBuffer(new byte[8]);
    private final ResetFW.Builder resetRW = new ResetFW.Builder();
    private final MessageHandler throttleHandler = this::handleThrottle;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/k3po/nukleus/ext/internal/behavior/NukleusTarget$Throttle.class */
    public final class Throttle {
        private final NukleusChannel channel;
        private final Consumer<Throwable> failureHandler;
        private Consumer<WindowFW> windowHandler;
        private Consumer<ResetFW> resetHandler;

        private Throttle(NukleusChannel nukleusChannel, ChannelFuture channelFuture) {
            this.channel = nukleusChannel;
            this.windowHandler = this::processWindow;
            this.resetHandler = this::processResetBeforeHandshake;
            Objects.requireNonNull(channelFuture);
            this.failureHandler = channelFuture::setFailure;
            channelFuture.addListener(this::onHandshakeCompleted);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void handleThrottle(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1073741825:
                    this.resetHandler.accept(NukleusTarget.this.resetRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741826:
                    this.windowHandler.accept(NukleusTarget.this.windowRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                default:
                    throw new IllegalArgumentException("Unexpected message type: " + i);
            }
        }

        private void processWindow(WindowFW windowFW) {
            this.channel.writableWindow(windowFW.credit(), windowFW.padding());
            NukleusTarget.this.flushThrottledWrites(this.channel);
        }

        private void processReset(ResetFW resetFW) {
            NukleusTarget.this.unregisterThrottle.accept(resetFW.streamId());
            if (this.channel.setWriteAborted()) {
                Channels.fireOutputAborted(this.channel);
            }
        }

        private void processResetBeforeHandshake(ResetFW resetFW) {
            this.failureHandler.accept(new ChannelException("Handshake failed"));
        }

        /* JADX WARN: Type inference failed for: r0v9, types: [org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.ResetFW$Builder] */
        private void onHandshakeCompleted(ChannelFuture channelFuture) {
            this.resetHandler = this::processReset;
            if (channelFuture.isSuccess()) {
                return;
            }
            this.resetHandler.accept(NukleusTarget.this.resetRW.wrap2(NukleusTarget.this.resetBuffer, 0, NukleusTarget.this.resetBuffer.capacity()).streamId(this.channel.sourceId()).build());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NukleusTarget(Path path, StreamsLayout streamsLayout, MutableDirectBuffer mutableDirectBuffer, LongFunction<MessageHandler> longFunction, LongObjectBiConsumer<MessageHandler> longObjectBiConsumer, LongConsumer longConsumer, LongObjectBiConsumer<NukleusCorrelation> longObjectBiConsumer2) {
        this.partitionPath = path;
        this.layout = streamsLayout;
        this.streamsBuffer = streamsLayout.streamsBuffer();
        this.throttleBuffer = streamsLayout.throttleBuffer();
        this.writeBuffer = mutableDirectBuffer;
        this.lookupThrottle = longFunction;
        this.registerThrottle = longObjectBiConsumer;
        this.unregisterThrottle = longConsumer;
        this.correlateNew = longObjectBiConsumer2;
    }

    public int process() {
        return this.throttleBuffer.read(this.throttleHandler);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.layout.close();
    }

    public String toString() {
        return String.format("%s [%s]", getClass().getSimpleName(), this.partitionPath);
    }

    /* JADX WARN: Type inference failed for: r0v28, types: [org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.BeginFW$Builder] */
    public void doConnect(final NukleusClientChannel nukleusClientChannel, NukleusChannelAddress nukleusChannelAddress, final ChannelFuture channelFuture) {
        try {
            String senderName = nukleusChannelAddress.getSenderName();
            long route = nukleusChannelAddress.getRoute();
            long targetId = nukleusClientChannel.targetId();
            long nextLong = new Random().nextLong();
            ChannelFuture succeededFuture = org.jboss.netty.channel.Channels.succeededFuture(nukleusClientChannel);
            switch (nukleusClientChannel.getConfig().getTransmission()) {
                case DUPLEX:
                    ChannelFuture beginInputFuture = nukleusClientChannel.beginInputFuture();
                    this.correlateNew.accept(nextLong, (long) new NukleusCorrelation(nukleusClientChannel, beginInputFuture));
                    succeededFuture = beginInputFuture;
                    break;
                case HALF_DUPLEX:
                    ChannelFuture beginInputFuture2 = nukleusClientChannel.beginInputFuture();
                    this.correlateNew.accept(nextLong, (long) new NukleusCorrelation(nukleusClientChannel, beginInputFuture2));
                    beginInputFuture2.addListener(channelFuture2 -> {
                        org.jboss.netty.channel.Channels.fireChannelInterestChanged(channelFuture2.getChannel());
                    });
                    break;
            }
            long authorization = nukleusChannelAddress.getAuthorization();
            nukleusClientChannel.targetAuth(authorization);
            ChannelBuffer writeExtBuffer = nukleusClientChannel.writeExtBuffer(NukleusExtensionKind.BEGIN, true);
            int readableBytes = writeExtBuffer.readableBytes();
            byte[] writeExtCopy = writeExtCopy(writeExtBuffer);
            BeginFW build = this.beginRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(targetId).authorization(authorization).source(senderName).sourceRef(route).correlationId(nextLong).extension(builder -> {
                builder.set(writeExtCopy);
            }).build();
            if (!nukleusClientChannel.isBound()) {
                ChannelAddress newReplyToAddress = nukleusChannelAddress.newReplyToAddress();
                nukleusClientChannel.setLocalAddress(newReplyToAddress);
                nukleusClientChannel.setBound();
                org.jboss.netty.channel.Channels.fireChannelBound(nukleusClientChannel, newReplyToAddress);
            }
            nukleusClientChannel.setRemoteAddress(nukleusChannelAddress);
            succeededFuture.addListener(new ChannelFutureListener() { // from class: org.reaktivity.k3po.nukleus.ext.internal.behavior.NukleusTarget.1
                public void operationComplete(ChannelFuture channelFuture3) throws Exception {
                    if (!channelFuture3.isSuccess()) {
                        channelFuture.setFailure(channelFuture3.getCause());
                        return;
                    }
                    nukleusClientChannel.setConnected();
                    channelFuture.setSuccess();
                    org.jboss.netty.channel.Channels.fireChannelConnected(nukleusClientChannel, nukleusClientChannel.m4getRemoteAddress());
                }
            });
            Throttle throttle = new Throttle(nukleusClientChannel, succeededFuture);
            LongObjectBiConsumer<MessageHandler> longObjectBiConsumer = this.registerThrottle;
            long streamId = build.streamId();
            Objects.requireNonNull(throttle);
            longObjectBiConsumer.accept(streamId, (long) (i, directBuffer, i2, i3) -> {
                throttle.handleThrottle(i, directBuffer, i2, i3);
            });
            this.streamsBuffer.write(build.typeId(), build.buffer(), build.offset(), build.sizeof());
            writeExtBuffer.skipBytes(readableBytes);
            writeExtBuffer.discardReadBytes();
            nukleusClientChannel.beginOutputFuture().setSuccess();
        } catch (Exception e) {
            channelFuture.setFailure(e);
        }
    }

    /* JADX WARN: Type inference failed for: r0v18, types: [org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.BeginFW$Builder] */
    public void doBeginReply(NukleusChannel nukleusChannel, ChannelFuture channelFuture) {
        long correlation = nukleusChannel.getConfig().getCorrelation();
        String senderName = nukleusChannel.m4getRemoteAddress().getSenderName();
        ChannelBuffer writeExtBuffer = nukleusChannel.writeExtBuffer(NukleusExtensionKind.BEGIN, true);
        int readableBytes = writeExtBuffer.readableBytes();
        byte[] writeExtCopy = writeExtCopy(writeExtBuffer);
        BeginFW build = this.beginRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(nukleusChannel.targetId()).source(senderName).sourceRef(0L).correlationId(correlation).extension(builder -> {
            builder.set(writeExtCopy);
        }).build();
        Throttle throttle = new Throttle(nukleusChannel, channelFuture);
        LongObjectBiConsumer<MessageHandler> longObjectBiConsumer = this.registerThrottle;
        long streamId = build.streamId();
        Objects.requireNonNull(throttle);
        longObjectBiConsumer.accept(streamId, (long) (i, directBuffer, i2, i3) -> {
            throttle.handleThrottle(i, directBuffer, i2, i3);
        });
        this.streamsBuffer.write(build.typeId(), build.buffer(), build.offset(), build.sizeof());
        writeExtBuffer.skipBytes(readableBytes);
        writeExtBuffer.discardReadBytes();
        nukleusChannel.beginOutputFuture().setSuccess();
    }

    public void doWrite(NukleusChannel nukleusChannel, MessageEvent messageEvent) {
        doFlushBegin(nukleusChannel);
        nukleusChannel.writeRequests.addLast(messageEvent);
        flushThrottledWrites(nukleusChannel);
    }

    public void doFlush(NukleusChannel nukleusChannel, ChannelFuture channelFuture) {
        if (doFlushBegin(nukleusChannel)) {
            Channels.fireFlushed(nukleusChannel);
            channelFuture.setSuccess();
        } else if (!nukleusChannel.writeExtBuffer(NukleusExtensionKind.DATA, true).readable()) {
            channelFuture.setSuccess();
            Channels.fireFlushed(nukleusChannel);
        } else {
            nukleusChannel.writeRequests.addLast(new DownstreamMessageEvent(nukleusChannel, channelFuture, ChannelBuffers.EMPTY_BUFFER, (SocketAddress) null));
            flushThrottledWrites(nukleusChannel);
        }
    }

    /* JADX WARN: Type inference failed for: r0v8, types: [org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.AbortFW$Builder] */
    public void doAbortOutput(NukleusChannel nukleusChannel, ChannelFuture channelFuture) {
        doFlushBegin(nukleusChannel);
        long targetId = nukleusChannel.targetId();
        AbortFW build = this.abortRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(targetId).authorization(nukleusChannel.targetAuth()).build();
        this.streamsBuffer.write(build.typeId(), build.buffer(), build.offset(), build.sizeof());
        channelFuture.setSuccess();
    }

    /* JADX WARN: Type inference failed for: r0v14, types: [org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.EndFW$Builder] */
    public void doShutdownOutput(NukleusChannel nukleusChannel, ChannelFuture channelFuture) {
        doFlushBegin(nukleusChannel);
        long targetId = nukleusChannel.targetId();
        ChannelBuffer writeExtBuffer = nukleusChannel.writeExtBuffer(NukleusExtensionKind.END, true);
        int readableBytes = writeExtBuffer.readableBytes();
        byte[] writeExtCopy = writeExtCopy(writeExtBuffer);
        EndFW build = this.endRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(targetId).authorization(nukleusChannel.targetAuth()).extension(builder -> {
            builder.set(writeExtCopy);
        }).build();
        this.streamsBuffer.write(build.typeId(), build.buffer(), build.offset(), build.sizeof());
        writeExtBuffer.skipBytes(readableBytes);
        writeExtBuffer.discardReadBytes();
        Channels.fireOutputShutdown(nukleusChannel);
        channelFuture.setSuccess();
        if (nukleusChannel.setWriteClosed()) {
            org.jboss.netty.channel.Channels.fireChannelDisconnected(nukleusChannel);
            org.jboss.netty.channel.Channels.fireChannelUnbound(nukleusChannel);
            org.jboss.netty.channel.Channels.fireChannelClosed(nukleusChannel);
        }
    }

    /* JADX WARN: Type inference failed for: r0v12, types: [org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.EndFW$Builder] */
    public void doClose(NukleusChannel nukleusChannel, ChannelFuture channelFuture) {
        doFlushBegin(nukleusChannel);
        long targetId = nukleusChannel.targetId();
        ChannelBuffer writeExtBuffer = nukleusChannel.writeExtBuffer(NukleusExtensionKind.END, true);
        int readableBytes = writeExtBuffer.readableBytes();
        byte[] writeExtCopy = writeExtCopy(writeExtBuffer);
        EndFW build = this.endRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(targetId).authorization(nukleusChannel.targetAuth()).extension(builder -> {
            builder.set(writeExtCopy);
        }).build();
        this.streamsBuffer.write(build.typeId(), build.buffer(), build.offset(), build.sizeof());
        writeExtBuffer.skipBytes(readableBytes);
        writeExtBuffer.discardReadBytes();
        channelFuture.setSuccess();
        if (nukleusChannel.setClosed()) {
            org.jboss.netty.channel.Channels.fireChannelDisconnected(nukleusChannel);
            org.jboss.netty.channel.Channels.fireChannelUnbound(nukleusChannel);
            org.jboss.netty.channel.Channels.fireChannelClosed(nukleusChannel);
        }
    }

    private boolean doFlushBegin(NukleusChannel nukleusChannel) {
        boolean z = !nukleusChannel.beginOutputFuture().isDone();
        if (z) {
            doBeginReply(nukleusChannel, org.jboss.netty.channel.Channels.future(nukleusChannel));
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r0v40, types: [org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.DataFW$Builder] */
    public void flushThrottledWrites(NukleusChannel nukleusChannel) {
        Deque<MessageEvent> deque = nukleusChannel.writeRequests;
        long targetAuth = nukleusChannel.targetAuth();
        while (nukleusChannel.writable() && !deque.isEmpty()) {
            ChannelBuffer channelBuffer = (ChannelBuffer) deque.peekFirst().getMessage();
            ChannelBuffer writeExtBuffer = nukleusChannel.writeExtBuffer(NukleusExtensionKind.DATA, true);
            if (channelBuffer.readable() || writeExtBuffer.readable()) {
                boolean z = !channelBuffer.readable();
                int min = Math.min(nukleusChannel.writableBytes(channelBuffer.readableBytes()), 65535);
                if (min > 0 || !channelBuffer.readable()) {
                    int readerIndex = channelBuffer.readerIndex();
                    if (readerIndex == 0) {
                        nukleusChannel.targetWriteRequestProgressing();
                    }
                    int readableBytes = writeExtBuffer.readableBytes();
                    byte[] writeExtCopy = writeExtCopy(writeExtBuffer);
                    byte[] bArr = new byte[min];
                    channelBuffer.getBytes(readerIndex, bArr);
                    DataFW build = this.dataRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(nukleusChannel.targetId()).authorization(targetAuth).groupId(0L).padding(0).payload(builder -> {
                        builder.set(bArr);
                    }).extension(builder2 -> {
                        builder2.set(writeExtCopy);
                    }).build();
                    this.streamsBuffer.write(build.typeId(), build.buffer(), build.offset(), build.sizeof());
                    nukleusChannel.writtenBytes(min);
                    channelBuffer.skipBytes(min);
                    writeExtBuffer.skipBytes(readableBytes);
                    writeExtBuffer.discardReadBytes();
                }
                if (z) {
                    Channels.fireFlushed(nukleusChannel);
                } else if (min > 0) {
                    org.jboss.netty.channel.Channels.fireWriteComplete(nukleusChannel, min);
                }
                nukleusChannel.targetWriteRequestProgress();
            } else if (nukleusChannel.isTargetWriteRequestInProgress()) {
                return;
            }
        }
    }

    private byte[] writeExtCopy(ChannelBuffer channelBuffer) {
        byte[] bArr = new byte[channelBuffer.readableBytes()];
        System.arraycopy(channelBuffer.array(), channelBuffer.arrayOffset() + channelBuffer.readerIndex(), bArr, 0, bArr.length);
        return bArr;
    }

    private void handleThrottle(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
        this.frameRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3);
        MessageHandler apply = this.lookupThrottle.apply(this.frameRO.streamId());
        if (apply != null) {
            apply.onMessage(i, mutableDirectBuffer, i2, i3);
        }
    }
}
