package io.zeebe.logstreams.log;

import io.zeebe.dispatcher.impl.log.DataFrameDescriptor;
import io.zeebe.logstreams.impl.CompleteEventsInBlockProcessor;
import io.zeebe.logstreams.impl.LogEntryDescriptor;
import io.zeebe.logstreams.impl.log.fs.FsLogStorage;
import io.zeebe.logstreams.impl.log.fs.FsLogStorageConfiguration;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:io/zeebe/logstreams/log/CompleteInBlockProcessorTest.class */
public class CompleteInBlockProcessorTest {
    private static final int SEGMENT_SIZE = 16384;
    protected static final int LENGTH = LogEntryDescriptor.headerLength(0);
    protected static final int ALIGNED_LEN = DataFrameDescriptor.alignedFramedLength(LENGTH);

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private CompleteEventsInBlockProcessor processor;
    private String logPath;
    private FsLogStorageConfiguration fsStorageConfig;
    private FsLogStorage fsLogStorage;
    private long appendedAddress;

    @Before
    public void init() throws IOException {
        this.processor = new CompleteEventsInBlockProcessor();
        this.logPath = this.tempFolder.getRoot().getAbsolutePath();
        this.fsStorageConfig = new FsLogStorageConfiguration(SEGMENT_SIZE, this.logPath, 0, false);
        this.fsLogStorage = new FsLogStorage(this.fsStorageConfig);
        ByteBuffer allocate = ByteBuffer.allocate(192);
        UnsafeBuffer unsafeBuffer = new UnsafeBuffer(0L, 0);
        unsafeBuffer.wrap(allocate);
        unsafeBuffer.putInt(DataFrameDescriptor.lengthOffset(0), DataFrameDescriptor.framedLength(LENGTH));
        unsafeBuffer.putLong(LogEntryDescriptor.positionOffset(DataFrameDescriptor.messageOffset(0)), 1L);
        int i = ALIGNED_LEN;
        unsafeBuffer.putInt(DataFrameDescriptor.lengthOffset(i), DataFrameDescriptor.framedLength(LENGTH));
        unsafeBuffer.putLong(LogEntryDescriptor.positionOffset(DataFrameDescriptor.messageOffset(i)), 2L);
        int i2 = 2 * ALIGNED_LEN;
        unsafeBuffer.putInt(DataFrameDescriptor.lengthOffset(i2), DataFrameDescriptor.framedLength(LogEntryDescriptor.headerLength(256)));
        unsafeBuffer.putLong(LogEntryDescriptor.positionOffset(DataFrameDescriptor.messageOffset(i2)), 3L);
        this.fsLogStorage.open();
        this.appendedAddress = this.fsLogStorage.append(allocate);
    }

    @Test
    public void shouldReadAndProcessFirstEvent() {
        ByteBuffer allocate = ByteBuffer.allocate(ALIGNED_LEN);
        Assertions.assertThat(this.fsLogStorage.read(allocate, this.appendedAddress, this.processor)).isEqualTo(this.appendedAddress + ALIGNED_LEN);
        UnsafeBuffer unsafeBuffer = new UnsafeBuffer(0L, 0);
        unsafeBuffer.wrap(allocate);
        Assertions.assertThat(unsafeBuffer.getInt(DataFrameDescriptor.lengthOffset(0))).isEqualTo(DataFrameDescriptor.framedLength(LENGTH));
        Assertions.assertThat(LogEntryDescriptor.getPosition(unsafeBuffer, 0)).isEqualTo(1L);
    }

    @Test
    public void shouldReadAndProcessTwoEvents() {
        ByteBuffer allocate = ByteBuffer.allocate(2 * ALIGNED_LEN);
        Assertions.assertThat(this.fsLogStorage.read(allocate, this.appendedAddress, this.processor)).isEqualTo(this.appendedAddress + (ALIGNED_LEN * 2));
        UnsafeBuffer unsafeBuffer = new UnsafeBuffer(0L, 0);
        unsafeBuffer.wrap(allocate);
        Assertions.assertThat(unsafeBuffer.getInt(DataFrameDescriptor.lengthOffset(0))).isEqualTo(DataFrameDescriptor.framedLength(LENGTH));
        Assertions.assertThat(LogEntryDescriptor.getPosition(unsafeBuffer, 0)).isEqualTo(1L);
        Assertions.assertThat(unsafeBuffer.getInt(DataFrameDescriptor.lengthOffset(ALIGNED_LEN))).isEqualTo(DataFrameDescriptor.framedLength(LENGTH));
        Assertions.assertThat(LogEntryDescriptor.getPosition(unsafeBuffer, ALIGNED_LEN)).isEqualTo(2L);
    }

    @Test
    public void shouldTruncateHalfEvent() {
        ByteBuffer allocate = ByteBuffer.allocate((int) (ALIGNED_LEN * 1.5d));
        Assertions.assertThat(this.fsLogStorage.read(allocate, this.appendedAddress, this.processor)).isEqualTo(this.appendedAddress + ALIGNED_LEN);
        UnsafeBuffer unsafeBuffer = new UnsafeBuffer(0L, 0);
        unsafeBuffer.wrap(allocate);
        Assertions.assertThat(unsafeBuffer.getInt(DataFrameDescriptor.lengthOffset(0))).isEqualTo(DataFrameDescriptor.framedLength(LENGTH));
        Assertions.assertThat(LogEntryDescriptor.getPosition(unsafeBuffer, 0)).isEqualTo(1L);
        Assertions.assertThat(allocate.position()).isEqualTo(ALIGNED_LEN);
        Assertions.assertThat(allocate.limit()).isEqualTo(ALIGNED_LEN);
    }

    @Test
    public void shouldTruncateEventWithMissingLen() {
        ByteBuffer allocate = ByteBuffer.allocate(ALIGNED_LEN + 3);
        Assertions.assertThat(this.fsLogStorage.read(allocate, this.appendedAddress, this.processor)).isEqualTo(this.appendedAddress + ALIGNED_LEN);
        UnsafeBuffer unsafeBuffer = new UnsafeBuffer(0L, 0);
        unsafeBuffer.wrap(allocate);
        Assertions.assertThat(unsafeBuffer.getInt(DataFrameDescriptor.lengthOffset(0))).isEqualTo(DataFrameDescriptor.framedLength(LENGTH));
        Assertions.assertThat(LogEntryDescriptor.getPosition(unsafeBuffer, 0)).isEqualTo(1L);
        Assertions.assertThat(allocate.position()).isEqualTo(ALIGNED_LEN);
        Assertions.assertThat(allocate.limit()).isEqualTo(ALIGNED_LEN);
    }

    @Test
    public void shouldInsufficientBufferCapacity() {
        Assertions.assertThat(this.fsLogStorage.read(ByteBuffer.allocate(ALIGNED_LEN - 1), this.appendedAddress, this.processor)).isEqualTo(-3L);
    }

    @Test
    public void shouldInsufficientBufferCapacityIfEventIsLargerThenBufferCapacity() {
        Assertions.assertThat(this.fsLogStorage.read(ByteBuffer.allocate((2 * ALIGNED_LEN) + ALIGNED_LEN), this.appendedAddress, this.processor)).isEqualTo(-3L);
    }

    @Test
    public void shouldInsufficientBufferCapacityIfPosWasSetAndNewEventCantReadCompletely() throws IOException {
        int i = (3 * ALIGNED_LEN) + 8;
        ByteBuffer allocate = ByteBuffer.allocate(i);
        UnsafeBuffer unsafeBuffer = new UnsafeBuffer(0L, 0);
        unsafeBuffer.wrap(allocate);
        unsafeBuffer.putInt(DataFrameDescriptor.lengthOffset(0), DataFrameDescriptor.framedLength(LENGTH));
        unsafeBuffer.putLong(LogEntryDescriptor.positionOffset(DataFrameDescriptor.messageOffset(0)), 1L);
        int i2 = ALIGNED_LEN;
        unsafeBuffer.putInt(DataFrameDescriptor.lengthOffset(i2), DataFrameDescriptor.framedLength(LENGTH));
        unsafeBuffer.putLong(LogEntryDescriptor.positionOffset(DataFrameDescriptor.messageOffset(i2)), 2L);
        int i3 = 2 * ALIGNED_LEN;
        unsafeBuffer.putInt(DataFrameDescriptor.lengthOffset(i3), DataFrameDescriptor.framedLength(LogEntryDescriptor.headerLength(8)));
        unsafeBuffer.putLong(LogEntryDescriptor.positionOffset(DataFrameDescriptor.messageOffset(i3)), 3L);
        long append = this.fsLogStorage.append(allocate);
        ByteBuffer allocate2 = ByteBuffer.allocate(2 * ALIGNED_LEN);
        long read = this.fsLogStorage.read(allocate2, append, this.processor);
        Assertions.assertThat(allocate2.position()).isEqualTo(allocate2.capacity());
        allocate2.position(ALIGNED_LEN);
        Assertions.assertThat(this.fsLogStorage.read(allocate2, read, this.processor)).isEqualTo(-3L);
        allocate2.limit(ALIGNED_LEN);
        allocate2.position(0);
        ByteBuffer allocate3 = ByteBuffer.allocate(4 * ALIGNED_LEN);
        allocate3.put(allocate2);
        Assertions.assertThat(this.fsLogStorage.read(allocate3, read, this.processor)).isGreaterThan(read);
        Assertions.assertThat(allocate3.position()).isEqualTo(i - ALIGNED_LEN);
        Assertions.assertThat(allocate3.limit()).isEqualTo(i - ALIGNED_LEN);
    }

    @Test
    public void shouldTruncateBufferOnHalfBufferWasRead() {
        ByteBuffer allocate = ByteBuffer.allocate(DataFrameDescriptor.alignedFramedLength(LogEntryDescriptor.headerLength(256)));
        Assertions.assertThat(this.fsLogStorage.read(allocate, this.appendedAddress, this.processor)).isEqualTo(this.appendedAddress + (ALIGNED_LEN * 2));
        UnsafeBuffer unsafeBuffer = new UnsafeBuffer(0L, 0);
        unsafeBuffer.wrap(allocate);
        Assertions.assertThat(unsafeBuffer.getInt(DataFrameDescriptor.lengthOffset(0))).isEqualTo(DataFrameDescriptor.framedLength(LENGTH));
        Assertions.assertThat(LogEntryDescriptor.getPosition(unsafeBuffer, 0)).isEqualTo(1L);
        Assertions.assertThat(unsafeBuffer.getInt(DataFrameDescriptor.lengthOffset(ALIGNED_LEN))).isEqualTo(DataFrameDescriptor.framedLength(LENGTH));
        Assertions.assertThat(LogEntryDescriptor.getPosition(unsafeBuffer, ALIGNED_LEN)).isEqualTo(2L);
        Assertions.assertThat(allocate.position()).isEqualTo(2 * ALIGNED_LEN);
        Assertions.assertThat(allocate.limit()).isEqualTo(2 * ALIGNED_LEN);
    }
}
