package org.reaktivity.nukleus.kafka.internal.stream;

import java.util.Objects;
import java.util.function.LongSupplier;
import java.util.function.ToIntFunction;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Long2LongHashMap;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.kafka.internal.KafkaNukleus;
import org.reaktivity.nukleus.kafka.internal.types.Flyweight;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.AbortFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.DataFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.EndFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaDataExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.KafkaEndExFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.kafka.internal.types.stream.WindowFW;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/MessageWriter.class */
public final class MessageWriter {
    private final BeginFW.Builder beginRW = new BeginFW.Builder();
    private final DataFW.Builder dataRW = new DataFW.Builder();
    private final EndFW.Builder endRW = new EndFW.Builder();
    private final AbortFW.Builder abortRW = new AbortFW.Builder();
    private final WindowFW.Builder windowRW = new WindowFW.Builder();
    private final ResetFW.Builder resetRW = new ResetFW.Builder();
    private final KafkaDataExFW.Builder dataExRW = new KafkaDataExFW.Builder();
    private final KafkaEndExFW.Builder endExRW = new KafkaEndExFW.Builder();
    private final OctetsFW messageKeyRO = new OctetsFW();
    private final OctetsFW messageValueRO = new OctetsFW();
    private final MutableDirectBuffer writeBuffer;
    private final LongSupplier supplyTrace;
    private final int kafkaTypeId;

    public MessageWriter(MutableDirectBuffer mutableDirectBuffer, LongSupplier longSupplier, ToIntFunction<String> toIntFunction) {
        this.writeBuffer = (MutableDirectBuffer) Objects.requireNonNull(mutableDirectBuffer);
        this.supplyTrace = (LongSupplier) Objects.requireNonNull(longSupplier);
        this.kafkaTypeId = toIntFunction.applyAsInt(KafkaNukleus.NAME);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW$Builder] */
    public void doBegin(MessageConsumer messageConsumer, long j, long j2, Flyweight.Builder.Visitor visitor) {
        BeginFW build = this.beginRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).trace(this.supplyTrace.getAsLong()).extension(builder -> {
            builder.set(visitor);
        }).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.DataFW$Builder] */
    public void doData(MessageConsumer messageConsumer, long j, long j2, int i, OctetsFW octetsFW) {
        DataFW build = this.dataRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).trace(this.supplyTrace.getAsLong()).groupId(0L).padding(i).payload(builder -> {
            builder.set(octetsFW.buffer(), octetsFW.offset(), octetsFW.sizeof());
        }).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.EndFW$Builder] */
    public void doEnd(MessageConsumer messageConsumer, long j, long j2, long[] jArr) {
        EndFW build = this.endRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).trace(this.supplyTrace.getAsLong()).extension(builder -> {
            builder.set(visitKafkaEndEx(jArr));
        }).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    private Flyweight.Builder.Visitor visitKafkaEndEx(long[] jArr) {
        return (mutableDirectBuffer, i, i2) -> {
            return this.endExRW.wrap2(mutableDirectBuffer, i, i2).typeId(this.kafkaTypeId).fetchOffsets(builder -> {
                if (jArr != null) {
                    for (long j : jArr) {
                        builder.item(builder -> {
                            builder.set(j);
                        });
                    }
                }
            }).build().sizeof();
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.AbortFW$Builder] */
    public void doAbort(MessageConsumer messageConsumer, long j, long j2) {
        AbortFW build = this.abortRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).trace(this.supplyTrace.getAsLong()).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.WindowFW$Builder] */
    public void doWindow(MessageConsumer messageConsumer, long j, long j2, int i, int i2, long j3) {
        WindowFW build = this.windowRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).trace(this.supplyTrace.getAsLong()).credit(i).padding(i2).groupId(j3).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.ResetFW$Builder] */
    public void doReset(MessageConsumer messageConsumer, long j, long j2) {
        ResetFW build = this.resetRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).trace(this.supplyTrace.getAsLong()).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.nukleus.kafka.internal.types.stream.BeginFW$Builder] */
    public void doKafkaBegin(MessageConsumer messageConsumer, long j, long j2, byte[] bArr) {
        BeginFW build = this.beginRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).trace(this.supplyTrace.getAsLong()).extension(builder -> {
            builder.set(bArr);
        }).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v12, types: [org.reaktivity.nukleus.kafka.internal.types.stream.DataFW$Builder] */
    public void doKafkaData(MessageConsumer messageConsumer, long j, long j2, long j3, int i, byte b, DirectBuffer directBuffer, long j4, DirectBuffer directBuffer2, int i2, Long2LongHashMap long2LongHashMap) {
        OctetsFW wrap = directBuffer == null ? null : this.messageKeyRO.wrap(directBuffer, 0, directBuffer.capacity());
        DataFW build = this.dataRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).trace(j3).flags(b).groupId(0L).padding(i).payload(directBuffer2 == null ? null : this.messageValueRO.wrap(directBuffer2, 0, i2)).extension(builder -> {
            builder.set(visitKafkaDataEx(j4, long2LongHashMap, wrap));
        }).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v7, types: [org.reaktivity.nukleus.kafka.internal.types.stream.DataFW$Builder] */
    public void doKafkaDataContinuation(MessageConsumer messageConsumer, long j, long j2, long j3, int i, byte b, DirectBuffer directBuffer, int i2, int i3) {
        DataFW build = this.dataRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(j).streamId(j2).trace(j3).flags(b).groupId(0L).padding(i).payload(directBuffer == null ? null : this.messageValueRO.wrap(directBuffer, i2, i3)).build();
        messageConsumer.accept(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    private Flyweight.Builder.Visitor visitKafkaDataEx(long j, Long2LongHashMap long2LongHashMap, OctetsFW octetsFW) {
        return (mutableDirectBuffer, i, i2) -> {
            return this.dataExRW.wrap2(mutableDirectBuffer, i, i2).typeId(this.kafkaTypeId).timestamp(j).fetchOffsets(builder -> {
                if (long2LongHashMap.size() == 1) {
                    long nextValue = long2LongHashMap.values().iterator().nextValue();
                    builder.item(builder -> {
                        builder.set(nextValue);
                    });
                    return;
                }
                int i = -1;
                while (true) {
                    i++;
                    if (long2LongHashMap.get(i) == long2LongHashMap.missingValue()) {
                        return;
                    }
                    long j2 = long2LongHashMap.get(i);
                    builder.item(builder2 -> {
                        builder2.set(j2);
                    });
                }
            }).messageKey(octetsFW).build().sizeof();
        };
    }
}
