package org.reaktivity.reaktor.internal.acceptable;

import java.util.function.Function;
import java.util.function.ToIntFunction;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.AtomicBuffer;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.reaktivity.nukleus.Nukleus;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.function.MessagePredicate;
import org.reaktivity.nukleus.route.RouteKind;
import org.reaktivity.nukleus.stream.StreamFactory;
import org.reaktivity.reaktor.internal.layouts.StreamsLayout;
import org.reaktivity.reaktor.internal.router.ReferenceKind;
import org.reaktivity.reaktor.internal.types.stream.AbortFW;
import org.reaktivity.reaktor.internal.types.stream.BeginFW;
import org.reaktivity.reaktor.internal.types.stream.FrameFW;
import org.reaktivity.reaktor.internal.types.stream.ResetFW;

/* loaded from: input_file:org/reaktivity/reaktor/internal/acceptable/Source.class */
public final class Source implements Nukleus {
    private final FrameFW frameRO = new FrameFW();
    private final BeginFW beginRO = new BeginFW();
    private final ResetFW.Builder resetRW = new ResetFW.Builder();
    private final String sourceName;
    private final String partitionName;
    private final StreamsLayout layout;
    private final AtomicBuffer writeBuffer;
    private final ToIntFunction<MessageHandler> streamsBuffer;
    private final Long2ObjectHashMap<MessageConsumer> streams;
    private final Function<RouteKind, StreamFactory> supplyStreamFactory;
    private final int abortTypeId;
    private final MessageHandler readHandler;
    private final MessageConsumer writeHandler;
    private MessagePredicate throttleBuffer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Source(String str, String str2, StreamsLayout streamsLayout, AtomicBuffer atomicBuffer, Long2ObjectHashMap<MessageConsumer> long2ObjectHashMap, Function<String, Target> function, Function<RouteKind, StreamFactory> function2, int i) {
        this.sourceName = str;
        this.partitionName = str2;
        this.layout = streamsLayout;
        this.writeBuffer = atomicBuffer;
        this.supplyStreamFactory = function2;
        this.abortTypeId = i;
        RingBuffer streamsBuffer = streamsLayout.streamsBuffer();
        streamsBuffer.getClass();
        this.streamsBuffer = streamsBuffer::read;
        RingBuffer throttleBuffer = streamsLayout.throttleBuffer();
        throttleBuffer.getClass();
        this.throttleBuffer = throttleBuffer::write;
        this.streams = long2ObjectHashMap;
        this.readHandler = this::handleRead;
        this.writeHandler = this::handleWrite;
    }

    public int process() {
        return this.streamsBuffer.applyAsInt(this.readHandler);
    }

    public void close() throws Exception {
        this.layout.close();
    }

    public String name() {
        return this.partitionName;
    }

    public String routableName() {
        return this.sourceName;
    }

    public String toString() {
        return String.format("%s[name=%s]", getClass().getSimpleName(), this.partitionName);
    }

    public MessageConsumer writeHandler() {
        return this.writeHandler;
    }

    public void cleanup(long j) {
        this.streams.remove(j);
    }

    private void handleWrite(int i, DirectBuffer directBuffer, int i2, int i3) {
        switch (i) {
            case 1073741825:
                this.throttleBuffer.test(i, directBuffer, i2, i3);
                this.streams.remove(this.frameRO.wrap(directBuffer, i2, i2 + i3).streamId());
                return;
            case 1073741826:
                this.throttleBuffer.test(i, directBuffer, i2, i3);
                return;
            default:
                return;
        }
    }

    private void handleRead(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
        this.frameRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3);
        long streamId = this.frameRO.streamId();
        MessageConsumer messageConsumer = (MessageConsumer) this.streams.get(streamId);
        if (messageConsumer == null) {
            handleUnrecognized(i, mutableDirectBuffer, i2, i3);
            return;
        }
        switch (i) {
            case 1:
            case 2:
                messageConsumer.accept(i, mutableDirectBuffer, i2, i3);
                return;
            case 3:
                messageConsumer.accept(i, mutableDirectBuffer, i2, i3);
                this.streams.remove(streamId);
                return;
            case AbortFW.TYPE_ID /* 4 */:
                messageConsumer.accept(this.abortTypeId, mutableDirectBuffer, i2, i3);
                this.streams.remove(streamId);
                return;
            default:
                handleUnrecognized(i, mutableDirectBuffer, i2, i3);
                return;
        }
    }

    private void handleUnrecognized(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
        if (i == 1) {
            handleBegin(i, mutableDirectBuffer, i2, i3);
        } else {
            this.frameRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3);
            doReset(this.frameRO.streamId());
        }
    }

    private void handleBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
        BeginFW wrap = this.beginRO.wrap(directBuffer, i2, i2 + i3);
        long streamId = wrap.streamId();
        long sourceRef = wrap.sourceRef();
        StreamFactory apply = this.supplyStreamFactory.apply(ReferenceKind.resolve(sourceRef == 0 ? wrap.correlationId() : sourceRef));
        if (apply == null) {
            doReset(streamId);
            return;
        }
        MessageConsumer newStream = apply.newStream(i, directBuffer, i2, i3, this.writeHandler);
        if (newStream == null) {
            doReset(streamId);
        } else {
            this.streams.put(streamId, newStream);
            newStream.accept(wrap.typeId(), wrap.buffer(), wrap.offset(), wrap.sizeof());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reset() {
        this.throttleBuffer = (i, directBuffer, i2, i3) -> {
            return false;
        };
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.reaktor.internal.types.stream.ResetFW$Builder] */
    private void doReset(long j) {
        ResetFW build = this.resetRW.wrap2((MutableDirectBuffer) this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(j).build();
        handleWrite(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }
}
