package io.zeebe.logstreams.log;

import io.zeebe.logstreams.impl.log.LoggedEventImpl;
import io.zeebe.logstreams.util.LogStreamReaderRule;
import io.zeebe.logstreams.util.LogStreamRule;
import io.zeebe.logstreams.util.LogStreamWriterRule;
import io.zeebe.test.util.TestUtil;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.buffer.DirectBufferWriter;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.testing.ControlledActorSchedulerRule;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:io/zeebe/logstreams/log/LogStreamBatchWriterTest.class */
public final class LogStreamBatchWriterTest {
    private static final DirectBuffer EVENT_VALUE_1 = BufferUtil.wrapString("foo");
    private static final DirectBuffer EVENT_VALUE_2 = BufferUtil.wrapString("bar");
    private static final DirectBuffer EVENT_METADATA_1 = BufferUtil.wrapString("foobar");
    private static final DirectBuffer EVENT_METADATA_2 = BufferUtil.wrapString("baz");

    @Rule
    public final ControlledActorSchedulerRule writerScheduler = new ControlledActorSchedulerRule();

    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    public final LogStreamRule logStreamRule = LogStreamRule.startByDefault(this.temporaryFolder);
    public final LogStreamReaderRule readerRule = new LogStreamReaderRule(this.logStreamRule);
    public final LogStreamWriterRule writerRule = new LogStreamWriterRule(this.logStreamRule);

    @Rule
    public RuleChain ruleChain = RuleChain.outerRule(this.temporaryFolder).around(this.logStreamRule).around(this.writerRule).around(this.readerRule);
    private LogStreamBatchWriter writer;

    @Before
    public void setUp() {
        this.writer = this.logStreamRule.getLogStream().newLogStreamBatchWriter();
    }

    private List<LoggedEvent> getWrittenEvents(long j) {
        ArrayList arrayList = new ArrayList();
        Assertions.assertThat(j).isGreaterThan(0L);
        this.writerRule.waitForPositionToBeAppended(j);
        long j2 = -1;
        while (true) {
            long j3 = j2;
            if (j3 >= j) {
                Assertions.assertThat(j3).withFailMessage("No written event found at position: {}", new Object[]{Long.valueOf(j)}).isEqualTo(j);
                return arrayList;
            }
            LoggedEventImpl nextEvent = this.readerRule.nextEvent();
            LoggedEventImpl loggedEventImpl = new LoggedEventImpl();
            loggedEventImpl.wrap(BufferUtil.cloneBuffer(nextEvent.getBuffer()), nextEvent.getFragmentOffset());
            arrayList.add(loggedEventImpl);
            j2 = nextEvent.getPosition();
        }
    }

    private DirectBuffer getValueBuffer(LoggedEvent loggedEvent) {
        return new UnsafeBuffer(loggedEvent.getValueBuffer(), loggedEvent.getValueOffset(), loggedEvent.getValueLength());
    }

    private DirectBuffer getMetadataBuffer(LoggedEvent loggedEvent) {
        return new UnsafeBuffer(loggedEvent.getMetadata(), loggedEvent.getMetadataOffset(), loggedEvent.getMetadataLength());
    }

    private long write(Consumer<LogStreamBatchWriter> consumer) {
        consumer.accept(this.writer);
        return ((Long) TestUtil.doRepeatedly(() -> {
            return Long.valueOf(this.writer.tryWrite());
        }).until(l -> {
            return Boolean.valueOf(l.longValue() > 0);
        })).longValue();
    }

    @Test
    public void shouldReturnPositionOfSingleEvent() {
        long write = write(logStreamBatchWriter -> {
            logStreamBatchWriter.event().keyNull().value(EVENT_VALUE_1).done();
        });
        Assertions.assertThat(write).isGreaterThan(0L);
        List<LoggedEvent> writtenEvents = getWrittenEvents(write);
        Assertions.assertThat(writtenEvents).hasSize(1);
        Assertions.assertThat(writtenEvents.get(0).getPosition()).isEqualTo(write);
    }

    @Test
    public void shouldReturnPositionOfLastEvent() {
        long write = write(logStreamBatchWriter -> {
            logStreamBatchWriter.event().key(1L).value(EVENT_VALUE_1).done().event().key(2L).value(EVENT_VALUE_2).done();
        });
        Assertions.assertThat(write).isGreaterThan(0L);
        List<LoggedEvent> writtenEvents = getWrittenEvents(write);
        Assertions.assertThat(writtenEvents).hasSize(2);
        Assertions.assertThat(writtenEvents.get(1).getPosition()).isEqualTo(write);
    }

    @Test
    public void shouldWriteEventWithValueBuffer() {
        List<LoggedEvent> writtenEvents = getWrittenEvents(write(logStreamBatchWriter -> {
            logStreamBatchWriter.event().key(1L).value(EVENT_VALUE_1).done().event().key(2L).value(EVENT_VALUE_2).done();
        }));
        Assertions.assertThat(getValueBuffer(writtenEvents.get(0))).isEqualTo(EVENT_VALUE_1);
        Assertions.assertThat(getValueBuffer(writtenEvents.get(1))).isEqualTo(EVENT_VALUE_2);
    }

    @Test
    public void shouldWriteEventWithValueBufferPartially() {
        List<LoggedEvent> writtenEvents = getWrittenEvents(write(logStreamBatchWriter -> {
            logStreamBatchWriter.event().key(1L).value(EVENT_VALUE_1, 1, 2).done().event().key(2L).value(EVENT_VALUE_2, 1, 2).done();
        }));
        Assertions.assertThat(getValueBuffer(writtenEvents.get(0))).isEqualTo(new UnsafeBuffer(EVENT_VALUE_1, 1, 2));
        Assertions.assertThat(getValueBuffer(writtenEvents.get(1))).isEqualTo(new UnsafeBuffer(EVENT_VALUE_2, 1, 2));
    }

    @Test
    public void shouldWriteEventWithValueWriter() {
        List<LoggedEvent> writtenEvents = getWrittenEvents(write(logStreamBatchWriter -> {
            logStreamBatchWriter.event().key(1L).valueWriter(new DirectBufferWriter().wrap(EVENT_VALUE_1)).done().event().key(2L).valueWriter(new DirectBufferWriter().wrap(EVENT_VALUE_2)).done();
        }));
        Assertions.assertThat(getValueBuffer(writtenEvents.get(0))).isEqualTo(EVENT_VALUE_1);
        Assertions.assertThat(getValueBuffer(writtenEvents.get(1))).isEqualTo(EVENT_VALUE_2);
    }

    @Test
    public void shouldWriteEventWithMetadataBuffer() {
        List<LoggedEvent> writtenEvents = getWrittenEvents(write(logStreamBatchWriter -> {
            logStreamBatchWriter.event().key(1L).value(EVENT_VALUE_1).metadata(EVENT_METADATA_1).done().event().key(2L).value(EVENT_VALUE_2).metadata(EVENT_METADATA_2).done();
        }));
        Assertions.assertThat(getMetadataBuffer(writtenEvents.get(0))).isEqualTo(EVENT_METADATA_1);
        Assertions.assertThat(getMetadataBuffer(writtenEvents.get(1))).isEqualTo(EVENT_METADATA_2);
    }

    @Test
    public void shouldWriteEventWithMetadataBufferPartially() {
        List<LoggedEvent> writtenEvents = getWrittenEvents(write(logStreamBatchWriter -> {
            logStreamBatchWriter.event().key(1L).value(EVENT_VALUE_1).metadata(EVENT_METADATA_1, 1, 2).done().event().key(2L).value(EVENT_VALUE_2).metadata(EVENT_METADATA_2, 1, 2).done();
        }));
        Assertions.assertThat(getMetadataBuffer(writtenEvents.get(0))).isEqualTo(new UnsafeBuffer(EVENT_METADATA_1, 1, 2));
        Assertions.assertThat(getMetadataBuffer(writtenEvents.get(1))).isEqualTo(new UnsafeBuffer(EVENT_METADATA_2, 1, 2));
    }

    @Test
    public void shouldWriteEventWithMetadataWriter() {
        List<LoggedEvent> writtenEvents = getWrittenEvents(write(logStreamBatchWriter -> {
            logStreamBatchWriter.event().key(1L).value(EVENT_VALUE_1).metadataWriter(new DirectBufferWriter().wrap(EVENT_METADATA_1)).done().event().key(2L).value(EVENT_VALUE_2).metadataWriter(new DirectBufferWriter().wrap(EVENT_METADATA_2)).done();
        }));
        Assertions.assertThat(getMetadataBuffer(writtenEvents.get(0))).isEqualTo(EVENT_METADATA_1);
        Assertions.assertThat(getMetadataBuffer(writtenEvents.get(1))).isEqualTo(EVENT_METADATA_2);
    }

    @Test
    public void shouldWriteEventWithKey() {
        Assertions.assertThat(getWrittenEvents(write(logStreamBatchWriter -> {
            logStreamBatchWriter.event().key(123L).value(EVENT_VALUE_1).done().event().key(456L).value(EVENT_VALUE_2).done();
        }))).extracting((v0) -> {
            return v0.getKey();
        }).containsExactly(new Long[]{123L, 456L});
    }

    @Test
    public void shouldWriteEventWithSourceEvent() {
        List<LoggedEvent> writtenEvents = getWrittenEvents(write(logStreamBatchWriter -> {
            logStreamBatchWriter.sourceRecordPosition(123L).event().key(1L).value(EVENT_VALUE_1).done().event().key(2L).value(EVENT_VALUE_2).done();
        }));
        Assertions.assertThat(writtenEvents.get(0).getSourceEventPosition()).isEqualTo(123L);
        Assertions.assertThat(writtenEvents.get(1).getSourceEventPosition()).isEqualTo(123L);
    }

    @Test
    public void shouldWriteEventWithoutSourceEvent() {
        List<LoggedEvent> writtenEvents = getWrittenEvents(write(logStreamBatchWriter -> {
            logStreamBatchWriter.event().key(1L).value(EVENT_VALUE_1).done().event().key(2L).value(EVENT_VALUE_2).done();
        }));
        Assertions.assertThat(writtenEvents.get(0).getSourceEventPosition()).isEqualTo(-1L);
        Assertions.assertThat(writtenEvents.get(1).getSourceEventPosition()).isEqualTo(-1L);
    }

    @Test
    public void shouldWriteEventWithTimestamp() throws InterruptedException, ExecutionException {
        long currentTimeMillis = System.currentTimeMillis() + 10;
        this.writerScheduler.getClock().setCurrentTime(currentTimeMillis);
        ActorFuture call = this.writerScheduler.call(() -> {
            return Long.valueOf(write(logStreamBatchWriter -> {
                logStreamBatchWriter.event().key(1L).value(EVENT_VALUE_1).done().event().key(2L).value(EVENT_VALUE_2).done();
            }));
        });
        this.writerScheduler.workUntilDone();
        Assertions.assertThat(getWrittenEvents(((Long) call.get()).longValue())).extracting((v0) -> {
            return v0.getTimestamp();
        }).containsExactly(new Long[]{Long.valueOf(currentTimeMillis), Long.valueOf(currentTimeMillis)});
    }

    @Test
    public void shouldNotFailToWriteEventWithoutKey() {
        Assertions.assertThat(getWrittenEvents(write(logStreamBatchWriter -> {
            logStreamBatchWriter.event().keyNull().value(EVENT_VALUE_1).done().event().value(EVENT_VALUE_2).done();
        }))).extracting((v0) -> {
            return v0.getKey();
        }).contains(new Long[]{-1L});
    }

    @Test
    public void shouldFailToWriteEventWithoutValue() {
        Assertions.assertThatThrownBy(() -> {
            this.writer.event().key(1L).value(EVENT_VALUE_1).done().event().key(2L).done().tryWrite();
        }).isInstanceOf(RuntimeException.class).hasMessage("value must not be null");
    }

    @Test
    public void shouldNotFailToWriteBatchWithoutEvents() {
        Assertions.assertThat(this.writer.tryWrite()).isEqualTo(0L);
    }

    @Test
    public void shouldFailToWriteOnClosedLogStream() {
        this.logStreamRule.getLogStream().close();
        Assertions.assertThat(this.writer.event().key(1L).value(EVENT_VALUE_1).done().tryWrite()).isEqualTo(-1L);
    }
}
