package org.reaktivity.nukleus.kafka.internal.stream;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.kafka.internal.types.KafkaHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;
import org.reaktivity.nukleus.kafka.internal.util.BufferUtil;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/HeaderValueMessageDispatcher.class */
public class HeaderValueMessageDispatcher implements MessageDispatcher {
    static final HeaderValueMessageDispatcher NOOP = new HeaderValueMessageDispatcher(null) { // from class: org.reaktivity.nukleus.kafka.internal.stream.HeaderValueMessageDispatcher.1
        @Override // org.reaktivity.nukleus.kafka.internal.stream.HeaderValueMessageDispatcher, org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
        public void adjustOffset(int i, long j, long j2) {
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.HeaderValueMessageDispatcher, org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
        public void detach(boolean z) {
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.HeaderValueMessageDispatcher, org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
        public void flush(int i, long j, long j2) {
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.HeaderValueMessageDispatcher, org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
        public int dispatch(int i, long j, long j2, DirectBuffer directBuffer, Function<DirectBuffer, Iterator<DirectBuffer>> function, long j3, long j4, DirectBuffer directBuffer2) {
            return 0;
        }
    };
    final DirectBuffer headerName;
    private boolean deferUpdates;
    private boolean hasDeferredUpdates;
    final UnsafeBuffer buffer = new UnsafeBuffer(new byte[0]);
    Map<DirectBuffer, HeadersMessageDispatcher> dispatchersByHeaderValue = new HashMap();
    private final List<HeadersMessageDispatcher> dispatchers = new ArrayList();

    public HeaderValueMessageDispatcher(DirectBuffer directBuffer) {
        this.headerName = directBuffer;
    }

    @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
    public void adjustOffset(int i, long j, long j2) {
        for (int i2 = 0; i2 < this.dispatchers.size(); i2++) {
            this.dispatchers.get(i2).adjustOffset(i, j, j2);
        }
    }

    @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
    public void detach(boolean z) {
        this.deferUpdates = true;
        for (int i = 0; i < this.dispatchers.size(); i++) {
            this.dispatchers.get(i).detach(z);
        }
        this.deferUpdates = false;
        processDeferredUpdates();
    }

    @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
    public int dispatch(int i, long j, long j2, DirectBuffer directBuffer, Function<DirectBuffer, Iterator<DirectBuffer>> function, long j3, long j4, DirectBuffer directBuffer2) {
        int i2 = 0;
        this.deferUpdates = true;
        Iterator<DirectBuffer> apply = function.apply(this.headerName);
        while (apply.hasNext()) {
            this.buffer.wrap(apply.next());
            HeadersMessageDispatcher headersMessageDispatcher = this.dispatchersByHeaderValue.get(this.buffer);
            if (headersMessageDispatcher != null) {
                i2 |= headersMessageDispatcher.dispatch(i, j, j2, directBuffer, function, j3, j4, directBuffer2);
            }
        }
        this.deferUpdates = false;
        processDeferredUpdates();
        return i2;
    }

    @Override // org.reaktivity.nukleus.kafka.internal.stream.MessageDispatcher
    public void flush(int i, long j, long j2) {
        this.deferUpdates = true;
        for (int i2 = 0; i2 < this.dispatchers.size(); i2++) {
            this.dispatchers.get(i2).flush(i, j, j2);
        }
        this.deferUpdates = false;
        processDeferredUpdates();
    }

    public void add(OctetsFW octetsFW, Iterator<KafkaHeaderFW> it, MessageDispatcher messageDispatcher) {
        BufferUtil.wrap((DirectBuffer) this.buffer, octetsFW);
        HeadersMessageDispatcher headersMessageDispatcher = this.dispatchersByHeaderValue.get(this.buffer);
        if (headersMessageDispatcher == null) {
            DirectBuffer unsafeBuffer = new UnsafeBuffer(new byte[octetsFW.sizeof()]);
            unsafeBuffer.putBytes(0, octetsFW.buffer(), octetsFW.offset(), octetsFW.sizeof());
            headersMessageDispatcher = new HeadersMessageDispatcher(HeaderValueMessageDispatcher::new);
            this.dispatchersByHeaderValue.put(unsafeBuffer, headersMessageDispatcher);
            this.dispatchers.add(headersMessageDispatcher);
        }
        headersMessageDispatcher.add(it, messageDispatcher);
    }

    public boolean remove(OctetsFW octetsFW, Iterator<KafkaHeaderFW> it, MessageDispatcher messageDispatcher) {
        boolean z = false;
        BufferUtil.wrap((DirectBuffer) this.buffer, octetsFW);
        HeadersMessageDispatcher headersMessageDispatcher = this.dispatchersByHeaderValue.get(this.buffer);
        if (headersMessageDispatcher != null) {
            z = headersMessageDispatcher.remove(it, messageDispatcher);
            if (headersMessageDispatcher.isEmpty()) {
                if (this.deferUpdates) {
                    this.dispatchersByHeaderValue.replace(this.buffer, HeadersMessageDispatcher.NOOP);
                    int indexOf = this.dispatchers.indexOf(headersMessageDispatcher);
                    if (indexOf != -1) {
                        this.dispatchers.set(indexOf, HeadersMessageDispatcher.NOOP);
                    }
                    this.hasDeferredUpdates = true;
                } else {
                    this.dispatchersByHeaderValue.remove(this.buffer);
                    this.dispatchers.remove(headersMessageDispatcher);
                }
                onRemoved(headersMessageDispatcher);
            }
        }
        return z;
    }

    protected void onRemoved(MessageDispatcher messageDispatcher) {
    }

    public HeadersMessageDispatcher get(OctetsFW octetsFW) {
        this.buffer.wrap(octetsFW.buffer(), octetsFW.offset(), octetsFW.sizeof());
        return this.dispatchersByHeaderValue.get(this.buffer);
    }

    private void processDeferredUpdates() {
        if (this.hasDeferredUpdates) {
            this.hasDeferredUpdates = false;
            this.dispatchersByHeaderValue.entrySet().removeIf(entry -> {
                return entry.getValue() == HeadersMessageDispatcher.NOOP;
            });
            this.dispatchers.removeIf(headersMessageDispatcher -> {
                return headersMessageDispatcher == HeadersMessageDispatcher.NOOP;
            });
        }
    }

    public String toString() {
        return String.format("%s(%s, %s)", getClass().getSimpleName(), new String(this.headerName.byteArray()), toString(this.dispatchersByHeaderValue));
    }

    private <V> String toString(Map<DirectBuffer, V> map) {
        StringBuffer stringBuffer = new StringBuffer(1000);
        stringBuffer.append("{");
        boolean z = true;
        for (Map.Entry<DirectBuffer, V> entry : map.entrySet()) {
            if (z) {
                z = false;
            } else {
                stringBuffer.append(", ");
            }
            stringBuffer.append(entry.getKey().getStringWithoutLengthUtf8(0, entry.getKey().capacity()));
            stringBuffer.append("=");
            stringBuffer.append(entry.getValue().toString());
        }
        stringBuffer.append("}");
        return stringBuffer.toString();
    }

    public boolean isEmpty() {
        return this.dispatchers.isEmpty() || this.dispatchers.stream().allMatch(headersMessageDispatcher -> {
            return headersMessageDispatcher == HeadersMessageDispatcher.NOOP;
        });
    }
}
