package org.reaktivity.nukleus.kafka.internal.stream;

import java.util.Objects;
import java.util.function.Function;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.kafka.internal.function.KafkaErrorConsumer;
import org.reaktivity.nukleus.kafka.internal.function.StringIntLongToLongFunction;
import org.reaktivity.nukleus.kafka.internal.function.StringIntToLongFunction;
import org.reaktivity.nukleus.kafka.internal.types.OctetsFW;
import org.reaktivity.nukleus.kafka.internal.types.Varint32FW;
import org.reaktivity.nukleus.kafka.internal.types.codec.ResponseHeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.FetchResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.HeaderFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.PartitionResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.RecordBatchFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.RecordFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.RecordSetFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.TopicResponseFW;
import org.reaktivity.nukleus.kafka.internal.types.codec.fetch.TransactionResponseFW;
import org.reaktivity.nukleus.kafka.internal.util.BufferUtil;

/* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/FetchResponseDecoder.class */
public class FetchResponseDecoder implements ResponseDecoder {
    private static final int UNKNOWN = -1;
    private final Function<String, DecoderMessageDispatcher> getDispatcher;
    private final StringIntToLongFunction getRequestedOffsetForPartition;
    private final StringIntLongToLongFunction updateStartOffsetForPartition;
    private final KafkaErrorConsumer errorHandler;
    private final int maxRecordBatchSize;
    private final MutableDirectBuffer buffer;
    private int slotOffset;
    private int slotLimit;
    private int topicCount;
    private String topicName;
    private DecoderMessageDispatcher messageDispatcher;
    private int partitionCount;
    private short errorCode;
    private int recordSetBytesRemaining;
    private int recordBatchBytesRemaining;
    private long highWatermark;
    private int partition;
    private int abortedTransactionCount;
    private long requestedOffset;
    private long nextFetchAt;
    private int recordCount;
    private long firstOffset;
    private int lastOffsetDelta;
    private long firstTimestamp;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ResponseHeaderFW responseRO = new ResponseHeaderFW();
    private final FetchResponseFW fetchResponseRO = new FetchResponseFW();
    private final TopicResponseFW topicResponseRO = new TopicResponseFW();
    private final PartitionResponseFW partitionResponseRO = new PartitionResponseFW();
    private final TransactionResponseFW transactionResponseRO = new TransactionResponseFW();
    private final RecordSetFW recordSetRO = new RecordSetFW();
    private final Varint32FW varint32RO = new Varint32FW();
    private final RecordBatchFW recordBatchRO = new RecordBatchFW();
    private final RecordFW recordRO = new RecordFW();
    private final HeaderFW headerRO = new HeaderFW();
    private final HeadersFW headers = new HeadersFW();
    private final DirectBuffer keyBuffer = new UnsafeBuffer(BufferUtil.EMPTY_BYTE_ARRAY);
    private final DirectBuffer valueBuffer = new UnsafeBuffer(BufferUtil.EMPTY_BYTE_ARRAY);
    private int responseBytesRemaining = -1;
    private final SkipBytesDecoderState skipBytesDecoderState = new SkipBytesDecoderState();
    private final BufferBytesDecoderState bufferBytesDecoderState = new BufferBytesDecoderState();
    private DecoderState decoderState = this::decodeResponseHeader;

    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/FetchResponseDecoder$BufferBytesDecoderState.class */
    final class BufferBytesDecoderState implements DecoderState {
        int bytesToAwait;
        DecoderState nextState;

        BufferBytesDecoderState() {
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.FetchResponseDecoder.DecoderState
        public int decode(DirectBuffer directBuffer, int i, int i2, long j) {
            if (i2 - i >= this.bytesToAwait) {
                FetchResponseDecoder.this.decoderState = this.nextState;
            }
            return i;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/FetchResponseDecoder$DecoderState.class */
    public interface DecoderState {
        int decode(DirectBuffer directBuffer, int i, int i2, long j);
    }

    /* loaded from: input_file:org/reaktivity/nukleus/kafka/internal/stream/FetchResponseDecoder$SkipBytesDecoderState.class */
    final class SkipBytesDecoderState implements DecoderState {
        int bytesToSkip;
        DecoderState nextState;

        SkipBytesDecoderState() {
        }

        @Override // org.reaktivity.nukleus.kafka.internal.stream.FetchResponseDecoder.DecoderState
        public int decode(DirectBuffer directBuffer, int i, int i2, long j) {
            int min = Math.min(i2 - i, this.bytesToSkip);
            int i3 = i + min;
            this.bytesToSkip -= min;
            if (this.bytesToSkip == 0) {
                FetchResponseDecoder.this.decoderState = this.nextState;
            }
            return i3;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FetchResponseDecoder(Function<String, DecoderMessageDispatcher> function, StringIntToLongFunction stringIntToLongFunction, StringIntLongToLongFunction stringIntLongToLongFunction, KafkaErrorConsumer kafkaErrorConsumer, MutableDirectBuffer mutableDirectBuffer) {
        this.getDispatcher = function;
        this.getRequestedOffsetForPartition = stringIntToLongFunction;
        this.updateStartOffsetForPartition = stringIntLongToLongFunction;
        this.errorHandler = kafkaErrorConsumer;
        this.buffer = (MutableDirectBuffer) Objects.requireNonNull(mutableDirectBuffer);
        this.maxRecordBatchSize = this.buffer.capacity();
    }

    @Override // org.reaktivity.nukleus.kafka.internal.stream.ResponseDecoder
    public int decode(OctetsFW octetsFW, long j) {
        int decodePayload = decodePayload(octetsFW.buffer(), octetsFW.offset(), octetsFW.limit(), j);
        if (decodePayload > 0) {
            decodePayload = decodePayload(octetsFW.buffer(), octetsFW.limit() - decodePayload, octetsFW.limit(), j);
        }
        switch (this.responseBytesRemaining) {
            case -1:
                return -this.slotLimit;
            case 0:
                return decodePayload;
            default:
                return -this.responseBytesRemaining;
        }
    }

    private int decodePayload(DirectBuffer directBuffer, int i, int i2, long j) {
        int i3 = 0;
        if (this.slotLimit > 0) {
            i3 = appendToSlot(directBuffer, i, i2);
            directBuffer = this.buffer;
            i = this.slotOffset;
            i2 = this.slotLimit;
        }
        int decode = decode(directBuffer, i, i2, j);
        if (this.responseBytesRemaining != -1) {
            this.responseBytesRemaining -= decode - i;
        }
        if (this.responseBytesRemaining == 0) {
            if (!$assertionsDisabled && decode != i2) {
                throw new AssertionError(String.format("no pipelined requests offset = %d limit = %d newOffset = %d", Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(decode)));
            }
        } else if (decode == i2) {
            this.slotLimit = 0;
            this.slotOffset = 0;
        } else if (this.slotLimit == 0) {
            this.slotOffset = 0;
            this.slotLimit = 0;
            i3 = appendToSlot(directBuffer, decode, i2);
        } else {
            this.slotOffset = decode;
        }
        return i3;
    }

    @Override // org.reaktivity.nukleus.kafka.internal.stream.ResponseDecoder
    public void reinitialize() {
        this.slotOffset = 0;
        this.slotLimit = 0;
        this.responseBytesRemaining = -1;
        this.topicCount = 0;
        this.partitionCount = 0;
        this.decoderState = this::decodeResponseHeader;
    }

    private int appendToSlot(DirectBuffer directBuffer, int i, int i2) {
        int i3 = i2 - i;
        int min = Math.min(i3, this.buffer.capacity() - (this.slotLimit - this.slotOffset));
        if (min + this.slotLimit > this.buffer.capacity()) {
            alignSlotData(this.buffer);
        }
        this.buffer.putBytes(this.slotLimit, directBuffer, i, min);
        this.slotLimit += min;
        return i3 - min;
    }

    private void alignSlotData(MutableDirectBuffer mutableDirectBuffer) {
        int i = this.slotLimit - this.slotOffset;
        mutableDirectBuffer.putBytes(0, mutableDirectBuffer, this.slotOffset, i);
        this.slotOffset = 0;
        this.slotLimit = i;
    }

    private int decode(DirectBuffer directBuffer, int i, int i2, long j) {
        boolean z = true;
        while (z) {
            int i3 = i;
            DecoderState decoderState = this.decoderState;
            i = this.decoderState.decode(directBuffer, i, i2, j);
            z = (i == i3 && this.decoderState == decoderState) ? false : true;
        }
        return i;
    }

    private int decodeResponseHeader(DirectBuffer directBuffer, int i, int i2, long j) {
        int i3 = i;
        ResponseHeaderFW tryWrap = this.responseRO.tryWrap(directBuffer, i, i2);
        if (tryWrap != null) {
            this.responseBytesRemaining = tryWrap.size() + tryWrap.sizeof();
            this.decoderState = this::decodeFetchResponse;
            i3 = tryWrap.limit();
        }
        return i3;
    }

    private int decodeFetchResponse(DirectBuffer directBuffer, int i, int i2, long j) {
        int i3 = i;
        FetchResponseFW tryWrap = this.fetchResponseRO.tryWrap(directBuffer, i, i2);
        if (tryWrap != null) {
            this.topicCount = tryWrap.topicCount();
            i3 = tryWrap.limit();
            this.decoderState = this::decodeTopicResponse;
        }
        return i3;
    }

    private int decodeTopicResponse(DirectBuffer directBuffer, int i, int i2, long j) {
        int i3 = i;
        if (this.topicCount > 0) {
            TopicResponseFW tryWrap = this.topicResponseRO.tryWrap(directBuffer, i, i2);
            if (tryWrap != null) {
                this.topicCount--;
                this.topicName = tryWrap.name().asString();
                this.messageDispatcher = this.getDispatcher.apply(this.topicName);
                this.partitionCount = tryWrap.partitionCount();
                this.decoderState = this::decodePartitionResponse;
                i3 = tryWrap.limit();
            }
        } else {
            reinitialize();
        }
        return i3;
    }

    private int decodePartitionResponse(DirectBuffer directBuffer, int i, int i2, long j) {
        int i3 = i;
        if (this.partitionCount > 0) {
            PartitionResponseFW tryWrap = this.partitionResponseRO.tryWrap(directBuffer, i, i2);
            if (tryWrap != null) {
                this.partitionCount--;
                i3 = tryWrap.limit();
                this.partition = tryWrap.partitionId();
                this.errorCode = tryWrap.errorCode();
                this.highWatermark = tryWrap.highWatermark();
                this.abortedTransactionCount = tryWrap.abortedTransactionCount();
                this.requestedOffset = this.getRequestedOffsetForPartition.apply(this.topicName, this.partition);
                this.nextFetchAt = this.requestedOffset;
                this.decoderState = this.abortedTransactionCount > 0 ? this::decodeTransactionResponse : this::decodeRecordSet;
                long logStartOffset = tryWrap.logStartOffset();
                if (this.updateStartOffsetForPartition.apply(this.topicName, this.partition, logStartOffset) < logStartOffset) {
                    this.messageDispatcher.startOffset(this.partition, logStartOffset);
                }
                if (this.errorCode != KafkaError.NONE.errorCode) {
                    this.errorHandler.accept(this.topicName, this.partition, KafkaError.asKafkaError(this.errorCode));
                }
            }
        } else {
            this.decoderState = this::decodeTopicResponse;
        }
        return i3;
    }

    private int decodeTransactionResponse(DirectBuffer directBuffer, int i, int i2, long j) {
        int i3 = i;
        if (this.abortedTransactionCount > 0) {
            TransactionResponseFW tryWrap = this.transactionResponseRO.tryWrap(directBuffer, i, i2);
            if (tryWrap != null) {
                this.abortedTransactionCount--;
                i3 = tryWrap.limit();
            }
        } else {
            this.decoderState = this::decodeRecordSet;
        }
        return i3;
    }

    private int decodeRecordSet(DirectBuffer directBuffer, int i, int i2, long j) {
        int i3 = i;
        RecordSetFW tryWrap = this.recordSetRO.tryWrap(directBuffer, i, i2);
        if (tryWrap != null) {
            this.recordSetBytesRemaining = tryWrap.recordSetSize();
            if (!$assertionsDisabled && this.recordSetBytesRemaining < 0) {
                throw new AssertionError("protocol violation: negative recordSetSize");
            }
            if (!$assertionsDisabled && this.recordSetBytesRemaining + tryWrap.sizeof() > this.responseBytesRemaining) {
                throw new AssertionError("protocol violation, record set goes beyond end of response");
            }
            i3 = tryWrap.limit();
            if (this.recordSetBytesRemaining == 0) {
                this.decoderState = this::decodePartitionResponse;
            } else if (this.recordSetBytesRemaining > this.maxRecordBatchSize) {
                System.out.format("[nukleus-kafka] skipping topic: %s partition: %d offset: %d because record set size %d exceeds configured nukleus.kafka.fetch.partition.max.bytes %d\n", this.topicName, Integer.valueOf(this.partition), Long.valueOf(this.requestedOffset), Integer.valueOf(this.recordSetBytesRemaining), Integer.valueOf(this.maxRecordBatchSize));
                this.messageDispatcher.flush(this.partition, this.requestedOffset, this.requestedOffset + 1);
                this.skipBytesDecoderState.bytesToSkip = this.recordSetBytesRemaining;
                this.skipBytesDecoderState.nextState = this::decodePartitionResponse;
                this.decoderState = this.skipBytesDecoderState;
            } else {
                this.bufferBytesDecoderState.bytesToAwait = this.recordSetBytesRemaining;
                this.bufferBytesDecoderState.nextState = this::decodeRecordBatch;
                this.decoderState = this.bufferBytesDecoderState;
            }
        }
        return i3;
    }

    private int decodeRecordBatch(DirectBuffer directBuffer, int i, int i2, long j) {
        int i3 = i;
        RecordBatchFW tryWrap = this.recordBatchRO.tryWrap(directBuffer, i, Math.min(i + this.recordSetBytesRemaining, i2));
        if (tryWrap == null) {
            if (this.nextFetchAt > this.requestedOffset) {
                this.messageDispatcher.flush(this.partition, this.requestedOffset, this.nextFetchAt);
            }
            this.skipBytesDecoderState.bytesToSkip = this.recordSetBytesRemaining;
            this.skipBytesDecoderState.nextState = this::decodePartitionResponse;
            this.decoderState = this.skipBytesDecoderState;
        } else if (isCompressed(tryWrap) || isControlBatch(tryWrap)) {
            this.nextFetchAt = tryWrap.firstOffset() + tryWrap.lastOffsetDelta() + 1;
            if (this.nextFetchAt > this.requestedOffset) {
                this.messageDispatcher.flush(this.partition, this.requestedOffset, this.nextFetchAt);
            }
            int min = Math.min(12 + tryWrap.length(), this.recordSetBytesRemaining);
            this.recordSetBytesRemaining -= min;
            this.skipBytesDecoderState.bytesToSkip = min;
            this.skipBytesDecoderState.nextState = this.recordSetBytesRemaining == 0 ? this::decodePartitionResponse : this::decodeRecordBatch;
            this.decoderState = this.skipBytesDecoderState;
        } else {
            int length = 12 + tryWrap.length();
            if (length <= this.recordSetBytesRemaining || tryWrap.lastOffsetDelta() != 0 || length <= this.maxRecordBatchSize) {
                this.firstOffset = tryWrap.firstOffset();
                this.lastOffsetDelta = tryWrap.lastOffsetDelta();
                this.firstTimestamp = tryWrap.firstTimestamp();
                this.recordCount = tryWrap.recordCount();
                this.nextFetchAt = this.firstOffset;
                this.recordSetBytesRemaining -= tryWrap.sizeof();
                if (!$assertionsDisabled && this.recordSetBytesRemaining < 0) {
                    throw new AssertionError();
                }
                this.recordBatchBytesRemaining = length - tryWrap.sizeof();
                if (!$assertionsDisabled && this.recordBatchBytesRemaining < 0) {
                    throw new AssertionError();
                }
                this.decoderState = this::decodeRecordLength;
                i3 = tryWrap.limit();
            } else {
                System.out.format("[nukleus-kafka] skipping large message at topic: %s partition: %d offset: %d, message size %d bytes exceeds configured nukleus.kafka.fetch.partition.max.bytes %d\n", this.topicName, Integer.valueOf(this.partition), Long.valueOf(tryWrap.firstOffset()), Integer.valueOf(tryWrap.length()), Integer.valueOf(this.maxRecordBatchSize));
                this.nextFetchAt = tryWrap.firstOffset() + 1;
                this.messageDispatcher.flush(this.partition, this.requestedOffset, this.nextFetchAt);
                this.skipBytesDecoderState.bytesToSkip = this.recordSetBytesRemaining;
                this.skipBytesDecoderState.nextState = this::decodePartitionResponse;
                this.decoderState = this.skipBytesDecoderState;
            }
        }
        return i3;
    }

    private int decodeRecordLength(DirectBuffer directBuffer, int i, int i2, long j) {
        int i3;
        if (this.recordSetBytesRemaining == 0) {
            if (this.recordBatchBytesRemaining == 0) {
                this.nextFetchAt = this.firstOffset + this.lastOffsetDelta + 1;
            }
            this.messageDispatcher.flush(this.partition, this.requestedOffset, this.nextFetchAt);
            this.decoderState = this::decodePartitionResponse;
        } else if (this.recordBatchBytesRemaining == 0) {
            this.nextFetchAt = this.firstOffset + this.lastOffsetDelta + 1;
            this.messageDispatcher.flush(this.partition, this.requestedOffset, this.nextFetchAt);
            this.decoderState = this::decodeRecordBatch;
        } else {
            if (!$assertionsDisabled && this.recordCount <= 0) {
                throw new AssertionError("protocol violation: excess bytes following partition response or incorrect record count");
            }
            Varint32FW tryWrap = this.varint32RO.tryWrap(directBuffer, i, i2);
            if (tryWrap != null) {
                i3 = tryWrap.sizeof() + tryWrap.value();
            } else {
                if (!$assertionsDisabled && i2 - i != this.recordSetBytesRemaining) {
                    throw new AssertionError();
                }
                i3 = this.recordSetBytesRemaining + 1;
            }
            if (i3 > this.recordSetBytesRemaining) {
                this.messageDispatcher.flush(this.partition, this.requestedOffset, this.nextFetchAt);
                this.skipBytesDecoderState.bytesToSkip = this.recordSetBytesRemaining;
                this.skipBytesDecoderState.nextState = this::decodePartitionResponse;
                this.decoderState = this.skipBytesDecoderState;
            } else {
                if (!$assertionsDisabled && i3 > this.recordBatchBytesRemaining) {
                    throw new AssertionError("protocol violation: truncated record batch not last in record set");
                }
                this.recordBatchBytesRemaining -= i3;
                this.recordSetBytesRemaining -= i3;
                this.decoderState = this::decodeRecord;
                if (this.recordSetBytesRemaining < 0) {
                    System.out.println(String.format("W2: recordCount=%d, recordSize=%d, recordSetBytesRemaining=%d, recordBatchBytesRemaining=%d", Integer.valueOf(this.recordCount), Integer.valueOf(i3), Integer.valueOf(this.recordSetBytesRemaining), Integer.valueOf(this.recordBatchBytesRemaining)));
                }
            }
        }
        return i;
    }

    private int decodeRecord(DirectBuffer directBuffer, int i, int i2, long j) {
        int i3 = i;
        RecordFW tryWrap = this.recordRO.tryWrap(directBuffer, i, i2);
        if (tryWrap != null) {
            long offsetDelta = this.firstOffset + tryWrap.offsetDelta();
            try {
                int limit = tryWrap.limit();
                int i4 = limit;
                int headerCount = tryWrap.headerCount();
                for (int i5 = 0; i5 < headerCount; i5++) {
                    HeaderFW tryWrap2 = this.headerRO.tryWrap(directBuffer, i4, i2);
                    if (tryWrap2 == null) {
                        return i3;
                    }
                    i4 = tryWrap2.limit();
                }
                this.recordCount--;
                if (offsetDelta >= this.requestedOffset) {
                    this.nextFetchAt = offsetDelta + 1;
                    DirectBuffer directBuffer2 = null;
                    OctetsFW key = tryWrap.key();
                    if (key != null) {
                        this.keyBuffer.wrap(key.buffer(), key.offset(), key.sizeof());
                        directBuffer2 = this.keyBuffer;
                    }
                    long timestampDelta = this.firstTimestamp + tryWrap.timestampDelta();
                    DirectBuffer directBuffer3 = null;
                    OctetsFW value = tryWrap.value();
                    if (value != null) {
                        this.valueBuffer.wrap(value.buffer(), value.offset(), value.sizeof());
                        directBuffer3 = this.valueBuffer;
                    }
                    this.headers.wrap(directBuffer, limit, i4);
                    this.messageDispatcher.dispatch(this.partition, this.requestedOffset, offsetDelta, this.highWatermark, directBuffer2, this.headers, timestampDelta, j, directBuffer3);
                }
                i3 = i4;
                this.decoderState = this::decodeRecordLength;
            } catch (Throwable th) {
                th.addSuppressed(new Exception(String.format("[kafka] Decoding fetch topic partition response %s[%d] @ offset %d", this.topicName, Integer.valueOf(this.partition), Long.valueOf(offsetDelta))));
                LangUtil.rethrowUnchecked(th);
            }
        }
        return i3;
    }

    private static boolean isCompressed(RecordBatchFW recordBatchFW) {
        return (recordBatchFW.attributes() & 7) != 0;
    }

    private static boolean isControlBatch(RecordBatchFW recordBatchFW) {
        return (recordBatchFW.attributes() & 32) != 0;
    }

    static {
        $assertionsDisabled = !FetchResponseDecoder.class.desiredAssertionStatus();
    }
}
