package org.reaktivity.nukleus.kafka.internal.stream;

import java.util.Iterator;
import java.util.function.Function;
import org.agrona.DirectBuffer;
import org.reaktivity.nukleus.kafka.internal.cache.PartitionIndex;
import org.reaktivity.nukleus.kafka.internal.types.KafkaHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/TopicMessageDispatcher.class */
public class TopicMessageDispatcher implements MessageDispatcher, DecoderMessageDispatcher {
    private final KeyMessageDispatcher[] keys;
    private final HeadersMessageDispatcher headers;
    private final PartitionIndex[] indexes;
    private final BroadcastMessageDispatcher broadcast = new BroadcastMessageDispatcher();
    private final OctetsFW octetsRO = new OctetsFW();

    /* JADX INFO: Access modifiers changed from: protected */
    public TopicMessageDispatcher(PartitionIndex[] partitionIndexArr, Function<DirectBuffer, HeaderValueMessageDispatcher> function) {
        this.indexes = partitionIndexArr;
        this.keys = new KeyMessageDispatcher[partitionIndexArr.length];
        for (int i = 0; i < partitionIndexArr.length; i++) {
            this.keys[i] = new KeyMessageDispatcher(function);
        }
        this.headers = new HeadersMessageDispatcher(function);
    }

    @Override // org.reaktivity.nukleus.kafka.internal.stream.DecoderMessageDispatcher
    public int dispatch(int i, long j, long j2, DirectBuffer directBuffer, HeadersFW headersFW, long j3, long j4, DirectBuffer directBuffer2) {
        int dispatch = dispatch(i, j, j2, directBuffer, headersFW.headerSupplier(), j3, j4, directBuffer2);
        long j5 = j2 - 1;
        if (MessageDispatcher.matched(dispatch)) {
            this.indexes[i].add(j, j5, j3, j4, directBuffer, headersFW, directBuffer2);
        }
        return dispatch;
    }

    @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
    public int dispatch(int i, long j, long j2, DirectBuffer directBuffer, Function<DirectBuffer, DirectBuffer> function, long j3, long j4, DirectBuffer directBuffer2) {
        PartitionIndex.Entry entry;
        int i2 = 0;
        long j5 = j2 - 1;
        if (shouldDispatch(i, j, j5, directBuffer)) {
            int dispatch = 0 | this.broadcast.dispatch(i, j, j2, directBuffer, function, j3, j4, directBuffer2);
            if (directBuffer != null) {
                KeyMessageDispatcher keyMessageDispatcher = this.keys[i];
                dispatch |= keyMessageDispatcher.dispatch(i, j, j2, directBuffer, function, j3, j4, directBuffer2);
                long nextOffset = this.indexes[i].nextOffset();
                if (MessageDispatcher.delivered(dispatch) && j2 < nextOffset && (entry = getEntry(i, j, asOctets(directBuffer))) != null && entry.offset() == j5) {
                    keyMessageDispatcher.flush(i, j, nextOffset, directBuffer);
                }
            }
            i2 = dispatch | this.headers.dispatch(i, j, j2, directBuffer, function, j3, j4, directBuffer2);
        }
        return i2;
    }

    @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
    public void flush(int i, long j, long j2) {
        this.broadcast.flush(i, j, j2);
        this.keys[i].flush(i, j, j2);
        this.headers.flush(i, j, j2);
        this.indexes[i].extendNextOffset(j, j2);
    }

    public void add(OctetsFW octetsFW, int i, Iterator<KafkaHeaderFW> it, MessageDispatcher messageDispatcher) {
        if (octetsFW != null) {
            this.keys[i].add(octetsFW, it, messageDispatcher);
        } else if (it.hasNext()) {
            this.headers.add(it, messageDispatcher);
        } else {
            this.broadcast.add(messageDispatcher);
        }
    }

    public boolean remove(OctetsFW octetsFW, int i, Iterator<KafkaHeaderFW> it, MessageDispatcher messageDispatcher) {
        return octetsFW != null ? this.keys[i].remove(octetsFW, it, messageDispatcher) : it.hasNext() ? this.headers.remove(it, messageDispatcher) : this.broadcast.remove(messageDispatcher);
    }

    public boolean isEmpty() {
        boolean z = true;
        for (KeyMessageDispatcher keyMessageDispatcher : this.keys) {
            z = z && keyMessageDispatcher.isEmpty();
        }
        return z && this.headers.isEmpty() && this.broadcast.isEmpty();
    }

    public PartitionIndex.Entry getEntry(int i, long j, OctetsFW octetsFW) {
        return this.indexes[i].getEntry(j, octetsFW);
    }

    public Iterator<PartitionIndex.Entry> entries(int i, long j) {
        return this.indexes[i].entries(j);
    }

    public long nextOffset(int i) {
        return this.indexes[i].nextOffset();
    }

    private boolean shouldDispatch(int i, long j, long j2, DirectBuffer directBuffer) {
        boolean z = true;
        if (directBuffer != null) {
            z = j2 >= getEntry(i, j, asOctets(directBuffer)).offset();
        }
        return z;
    }

    private OctetsFW asOctets(DirectBuffer directBuffer) {
        return this.octetsRO.wrap(directBuffer, 0, directBuffer.capacity());
    }
}
