package org.reaktivity.reaktor.internal.router;

import java.util.EnumMap;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.IntUnaryOperator;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.reaktivity.nukleus.Nukleus;
import org.reaktivity.nukleus.buffer.BufferPool;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.route.RouteKind;
import org.reaktivity.nukleus.route.RouteManager;
import org.reaktivity.nukleus.stream.StreamFactory;
import org.reaktivity.nukleus.stream.StreamFactoryBuilder;
import org.reaktivity.reaktor.internal.Context;
import org.reaktivity.reaktor.internal.Counters;
import org.reaktivity.reaktor.internal.State;
import org.reaktivity.reaktor.internal.buffer.CountingBufferPool;
import org.reaktivity.reaktor.internal.layouts.StreamsLayout;
import org.reaktivity.reaktor.internal.types.control.ErrorFW;
import org.reaktivity.reaktor.internal.types.stream.AbortFW;
import org.reaktivity.reaktor.internal.types.stream.BeginFW;
import org.reaktivity.reaktor.internal.types.stream.FrameFW;
import org.reaktivity.reaktor.internal.types.stream.ResetFW;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/reaktivity/reaktor/internal/router/Source.class */
public final class Source implements Nukleus {
    private final String nukleusName;
    private final Counters counters;
    private final String name;
    private final StreamsLayout layout;
    private final MutableDirectBuffer writeBuffer;
    private final ToIntFunction<MessageHandler> streamsBuffer;
    private final Long2ObjectHashMap<MessageConsumer> streams;
    private final Long2ObjectHashMap<MessageConsumer> throttles;
    private final Function<RouteKind, StreamFactory> supplyStreamFactory;
    private final Function<String, MessageConsumer> supplyThrottle;
    private MessageConsumer writeHandler;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final FrameFW frameRO = new FrameFW();
    private final BeginFW beginRO = new BeginFW();
    private final AbortFW.Builder abortRW = new AbortFW.Builder();
    private final ResetFW.Builder resetRW = new ResetFW.Builder();
    private final MessageHandler readHandler = this::handleRead;
    private final Long2ObjectHashMap<ReadCounters> countersByRouteId = new Long2ObjectHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/reaktivity/reaktor/internal/router/Source$ReadCounters.class */
    public final class ReadCounters {
        private final AtomicCounter opens;
        private final AtomicCounter closes;
        private final AtomicCounter aborts;
        private final AtomicCounter windows;
        private final AtomicCounter resets;
        private final AtomicCounter bytes;
        private final AtomicCounter frames;

        ReadCounters(long j) {
            this.opens = Source.this.counters.counter(String.format("%d.opens.read", Long.valueOf(j)));
            this.closes = Source.this.counters.counter(String.format("%d.closes.read", Long.valueOf(j)));
            this.aborts = Source.this.counters.counter(String.format("%d.aborts.read", Long.valueOf(j)));
            this.windows = Source.this.counters.counter(String.format("%d.windows.read", Long.valueOf(j)));
            this.resets = Source.this.counters.counter(String.format("%d.resets.read", Long.valueOf(j)));
            this.bytes = Source.this.counters.counter(String.format("%d.bytes.read", Long.valueOf(j)));
            this.frames = Source.this.counters.counter(String.format("%d.frames.read", Long.valueOf(j)));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Source(Context context, MutableDirectBuffer mutableDirectBuffer, RouteManager routeManager, String str, State state, LongFunction<IntUnaryOperator> longFunction, LongFunction<IntUnaryOperator> longFunction2, Function<RouteKind, StreamFactoryBuilder> function, AtomicLong atomicLong, Function<String, MessageConsumer> function2, Long2ObjectHashMap<MessageConsumer> long2ObjectHashMap, Long2ObjectHashMap<MessageConsumer> long2ObjectHashMap2) {
        this.nukleusName = context.name();
        this.counters = context.counters();
        this.name = str;
        this.writeBuffer = mutableDirectBuffer;
        this.streams = long2ObjectHashMap;
        this.throttles = long2ObjectHashMap2;
        StreamsLayout build = new StreamsLayout.Builder().path(context.sourceStreamsPath().apply(str)).streamsCapacity(context.streamsBufferCapacity()).readonly(false).build();
        this.layout = build;
        int maximumMessagesPerRead = context.maximumMessagesPerRead();
        RingBuffer streamsBuffer = build.streamsBuffer();
        this.streamsBuffer = messageHandler -> {
            return streamsBuffer.read(messageHandler, maximumMessagesPerRead);
        };
        this.supplyThrottle = function2;
        EnumMap enumMap = new EnumMap(RouteKind.class);
        Function function3 = str2 -> {
            return () -> {
                return context.counters().counter(str2).increment() + 1;
            };
        };
        Function function4 = str3 -> {
            return j -> {
                context.counters().counter(str3).add(j);
            };
        };
        AtomicCounter acquires = context.counters().acquires();
        AtomicCounter releases = context.counters().releases();
        BufferPool bufferPool = state.bufferPool();
        Objects.requireNonNull(acquires);
        LongSupplier longSupplier = acquires::increment;
        Objects.requireNonNull(releases);
        CountingBufferPool countingBufferPool = new CountingBufferPool(bufferPool, longSupplier, releases::increment);
        Supplier supplier = () -> {
            return countingBufferPool;
        };
        Iterator it = EnumSet.allOf(RouteKind.class).iterator();
        while (it.hasNext()) {
            RouteKind routeKind = (RouteKind) it.next();
            StreamFactoryBuilder apply = function.apply(routeKind);
            if (apply != null) {
                StreamFactoryBuilder writeBuffer = apply.setRouteManager(routeManager).setWriteBuffer(mutableDirectBuffer);
                Objects.requireNonNull(state);
                StreamFactoryBuilder initialIdSupplier = writeBuffer.setInitialIdSupplier(state::supplyInitialId);
                Objects.requireNonNull(state);
                StreamFactoryBuilder replyIdSupplier = initialIdSupplier.setReplyIdSupplier(state::supplyReplyId);
                Objects.requireNonNull(state);
                StreamFactoryBuilder sourceCorrelationIdSupplier = replyIdSupplier.setSourceCorrelationIdSupplier(state::supplyCorrelationId);
                Objects.requireNonNull(state);
                StreamFactoryBuilder targetCorrelationIdSupplier = sourceCorrelationIdSupplier.setTargetCorrelationIdSupplier(state::supplyCorrelationId);
                Objects.requireNonNull(state);
                StreamFactoryBuilder traceSupplier = targetCorrelationIdSupplier.setTraceSupplier(state::supplyTrace);
                Objects.requireNonNull(state);
                enumMap.put((EnumMap) routeKind, (RouteKind) traceSupplier.setGroupIdSupplier(state::supplyGroupId).setGroupBudgetClaimer(longFunction).setGroupBudgetReleaser(longFunction2).setCounterSupplier(function3).setAccumulatorSupplier(function4).setBufferPoolSupplier(supplier).build());
            }
        }
        Objects.requireNonNull(enumMap);
        this.supplyStreamFactory = (v1) -> {
            return r1.get(v1);
        };
    }

    public String name() {
        return this.name;
    }

    public int process() {
        return this.streamsBuffer.applyAsInt(this.readHandler);
    }

    public void detach() {
        this.writeHandler = (i, directBuffer, i2, i3) -> {
        };
    }

    public void close() throws Exception {
        this.streams.forEach((v1, v2) -> {
            doSyntheticAbort(v1, v2);
        });
        this.layout.close();
    }

    public String toString() {
        return String.format("%s (read)", this.name);
    }

    private void handleRead(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
        FrameFW wrap = this.frameRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3);
        long streamId = wrap.streamId();
        long routeId = wrap.routeId();
        try {
            if ((streamId & Long.MIN_VALUE) == 0) {
                handleReadInitial(routeId, streamId, i, mutableDirectBuffer, i2, i3);
            } else {
                handleReadReply(routeId, streamId, i, mutableDirectBuffer, i2, i3);
            }
        } catch (Throwable th) {
            th.addSuppressed(new Exception(String.format("[%s/%s]\t[0x%016x] %s", this.nukleusName, this.name, Long.valueOf(streamId), this.layout)));
            LangUtil.rethrowUnchecked(th);
        }
    }

    private void handleReadInitial(long j, long j2, int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
        if ((i & ErrorFW.TYPE_ID) != 0) {
            MessageConsumer messageConsumer = (MessageConsumer) this.throttles.get(j2);
            if (messageConsumer != null) {
                ReadCounters readCounters = (ReadCounters) this.countersByRouteId.computeIfAbsent(j, j3 -> {
                    return new ReadCounters(j3);
                });
                switch (i) {
                    case 1073741825:
                        readCounters.resets.increment();
                        messageConsumer.accept(i, mutableDirectBuffer, i2, i3);
                        this.throttles.remove(j2);
                        return;
                    case 1073741826:
                        readCounters.windows.increment();
                        messageConsumer.accept(i, mutableDirectBuffer, i2, i3);
                        return;
                    default:
                        return;
                }
            }
            return;
        }
        MessageConsumer messageConsumer2 = (MessageConsumer) this.streams.get(j2);
        if (messageConsumer2 == null) {
            if (i == 1) {
                MessageConsumer handleBegin = handleBegin(i, mutableDirectBuffer, i2, i3);
                if (handleBegin != null) {
                    handleBegin.accept(i, mutableDirectBuffer, i2, i3);
                    return;
                } else {
                    doReset(j, j2);
                    return;
                }
            }
            return;
        }
        switch (i) {
            case 1:
                messageConsumer2.accept(i, mutableDirectBuffer, i2, i3);
                return;
            case 2:
                messageConsumer2.accept(i, mutableDirectBuffer, i2, i3);
                return;
            case 3:
                messageConsumer2.accept(i, mutableDirectBuffer, i2, i3);
                this.streams.remove(j2);
                return;
            case 4:
                messageConsumer2.accept(i, mutableDirectBuffer, i2, i3);
                this.streams.remove(j2);
                return;
            default:
                doReset(j, j2);
                return;
        }
    }

    private void handleReadReply(long j, long j2, int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
        if ((i & ErrorFW.TYPE_ID) != 0) {
            MessageConsumer messageConsumer = (MessageConsumer) this.throttles.get(j2);
            if (messageConsumer != null) {
                switch (i) {
                    case 1073741825:
                        messageConsumer.accept(i, mutableDirectBuffer, i2, i3);
                        this.throttles.remove(j2);
                        return;
                    case 1073741826:
                        messageConsumer.accept(i, mutableDirectBuffer, i2, i3);
                        return;
                    default:
                        return;
                }
            }
            return;
        }
        MessageConsumer messageConsumer2 = (MessageConsumer) this.streams.get(j2);
        if (messageConsumer2 == null) {
            if (i == 1) {
                MessageConsumer handleBegin = handleBegin(i, mutableDirectBuffer, i2, i3);
                if (handleBegin == null) {
                    doReset(j, j2);
                    return;
                } else {
                    ((ReadCounters) this.countersByRouteId.computeIfAbsent(j, j3 -> {
                        return new ReadCounters(j3);
                    })).opens.increment();
                    handleBegin.accept(i, mutableDirectBuffer, i2, i3);
                    return;
                }
            }
            return;
        }
        ReadCounters readCounters = (ReadCounters) this.countersByRouteId.computeIfAbsent(j, j4 -> {
            return new ReadCounters(j4);
        });
        switch (i) {
            case 1:
                readCounters.opens.increment();
                messageConsumer2.accept(i, mutableDirectBuffer, i2, i3);
                return;
            case 2:
                readCounters.frames.increment();
                readCounters.bytes.add(mutableDirectBuffer.getInt(i2 + 53));
                messageConsumer2.accept(i, mutableDirectBuffer, i2, i3);
                return;
            case 3:
                readCounters.closes.increment();
                messageConsumer2.accept(i, mutableDirectBuffer, i2, i3);
                this.streams.remove(j2);
                return;
            case 4:
                readCounters.aborts.increment();
                messageConsumer2.accept(i, mutableDirectBuffer, i2, i3);
                this.streams.remove(j2);
                return;
            default:
                doReset(j, j2);
                return;
        }
    }

    private MessageConsumer handleBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
        BeginFW wrap = this.beginRO.wrap(directBuffer, i2, i2 + i3);
        long routeId = wrap.routeId();
        long streamId = wrap.streamId();
        long sourceRef = wrap.sourceRef();
        StreamFactory apply = this.supplyStreamFactory.apply(sourceRef != 0 ? ReferenceKind.resolve(sourceRef) : RouteKind.valueOf((int) ((routeId >> 28) & 255)));
        MessageConsumer messageConsumer = null;
        if (apply != null) {
            messageConsumer = apply.newStream(i, directBuffer, i2, i3, writeHandler());
            if (messageConsumer != null) {
                this.streams.put(streamId, messageConsumer);
            }
        }
        return messageConsumer;
    }

    private MessageConsumer writeHandler() {
        if (this.writeHandler == null) {
            this.writeHandler = this.supplyThrottle.apply(this.name);
            if (!$assertionsDisabled && this.writeHandler == null) {
                throw new AssertionError();
            }
        }
        return this.writeHandler;
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.reaktor.internal.types.stream.ResetFW$Builder] */
    private void doReset(long j, long j2) {
        ResetFW build = this.resetRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).build();
        writeHandler().accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [org.reaktivity.reaktor.internal.types.stream.AbortFW$Builder] */
    private void doSyntheticAbort(long j, MessageConsumer messageConsumer) {
        AbortFW build = this.abortRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(0L).streamId(j).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    static {
        $assertionsDisabled = !Source.class.desiredAssertionStatus();
    }
}
