package org.reaktivity.nukleus.kafka.internal.stream;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Function;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/CachingKeyMessageDispatcher.class */
public class CachingKeyMessageDispatcher extends KeyMessageDispatcher {
    private final Map<UnsafeBuffer, long[]> offsetsByKey = new HashMap();
    private long highestOffset;

    @Override // org.reaktivity.nukleus.kafka.internal.stream.KeyMessageDispatcher, org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
    public int dispatch(int i, long j, long j2, DirectBuffer directBuffer, Function<DirectBuffer, DirectBuffer> function, long j3, long j4, DirectBuffer directBuffer2) {
        long j5 = j2 - 1;
        this.buffer.wrap(directBuffer, 0, directBuffer.capacity());
        if (directBuffer2 == null) {
            this.offsetsByKey.remove(this.buffer);
        } else {
            long[] jArr = this.offsetsByKey.get(this.buffer);
            if (jArr == null) {
                UnsafeBuffer unsafeBuffer = new UnsafeBuffer(new byte[directBuffer.capacity()]);
                unsafeBuffer.putBytes(0, directBuffer, 0, directBuffer.capacity());
                this.offsetsByKey.put(unsafeBuffer, new long[]{j5});
            } else {
                jArr[0] = Math.max(j5, jArr[0]);
            }
        }
        if (j <= this.highestOffset && j2 > this.highestOffset) {
            this.highestOffset = j2;
        }
        int dispatch = super.dispatch(i, j, j2, directBuffer, function, j3, j4, directBuffer2);
        if (dispatch > 0 && j2 < this.highestOffset) {
            this.buffer.wrap(directBuffer, 0, directBuffer.capacity());
            long[] jArr2 = this.offsetsByKey.get(this.buffer);
            if (jArr2 != null && jArr2[0] == j5) {
                flush(i, j, this.highestOffset, directBuffer);
            }
        }
        return dispatch;
    }

    @Override // org.reaktivity.nukleus.kafka.internal.stream.KeyMessageDispatcher, org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
    public void flush(int i, long j, long j2) {
        if (j <= this.highestOffset && j2 > this.highestOffset) {
            this.highestOffset = j2;
        }
        super.flush(i, j, j2);
    }

    @Override // org.reaktivity.nukleus.kafka.internal.stream.KeyMessageDispatcher
    public long lastOffset(int i, OctetsFW octetsFW) {
        this.buffer.wrap(octetsFW.buffer(), octetsFW.offset(), octetsFW.sizeof());
        long[] jArr = this.offsetsByKey.get(this.buffer);
        return jArr == null ? this.highestOffset : jArr[0];
    }

    @Override // org.reaktivity.nukleus.kafka.internal.stream.KeyMessageDispatcher
    public long lowestOffset(int i) {
        long j = Long.MAX_VALUE;
        Iterator<long[]> it = this.offsetsByKey.values().iterator();
        while (it.hasNext()) {
            j = Math.min(it.next()[0], j);
        }
        if (j == Long.MAX_VALUE) {
            return 0L;
        }
        return j;
    }

    @Override // org.reaktivity.nukleus.kafka.internal.stream.KeyMessageDispatcher
    public boolean shouldDispatch(DirectBuffer directBuffer, long j) {
        this.buffer.wrap(directBuffer, 0, directBuffer.capacity());
        long[] jArr = this.offsetsByKey.get(this.buffer);
        return j > (jArr == null ? this.highestOffset : jArr[0]);
    }
}
