package io.zeebe.broker.exporter.stream;

import io.zeebe.broker.exporter.ExporterObjectMapper;
import io.zeebe.broker.exporter.record.value.DeploymentRecordValueImpl;
import io.zeebe.broker.exporter.record.value.IncidentRecordValueImpl;
import io.zeebe.broker.exporter.record.value.JobBatchRecordValueImpl;
import io.zeebe.broker.exporter.record.value.JobRecordValueImpl;
import io.zeebe.broker.exporter.record.value.MessageRecordValueImpl;
import io.zeebe.broker.exporter.record.value.MessageSubscriptionRecordValueImpl;
import io.zeebe.broker.exporter.record.value.RaftRecordValueImpl;
import io.zeebe.broker.exporter.record.value.WorkflowInstanceRecordValueImpl;
import io.zeebe.broker.exporter.record.value.WorkflowInstanceSubscriptionRecordValueImpl;
import io.zeebe.broker.exporter.record.value.deployment.DeployedWorkflowImpl;
import io.zeebe.broker.exporter.record.value.deployment.DeploymentResourceImpl;
import io.zeebe.broker.exporter.record.value.job.HeadersImpl;
import io.zeebe.broker.exporter.record.value.raft.RaftMemberImpl;
import io.zeebe.broker.exporter.repo.ExporterDescriptor;
import io.zeebe.broker.exporter.stream.ExporterRecord;
import io.zeebe.broker.exporter.util.ControlledTestExporter;
import io.zeebe.broker.exporter.util.PojoConfigurationExporter;
import io.zeebe.broker.exporter.util.TestJarExporter;
import io.zeebe.broker.incident.data.ErrorType;
import io.zeebe.broker.incident.data.IncidentRecord;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessorTest;
import io.zeebe.broker.subscription.message.data.MessageSubscriptionRecord;
import io.zeebe.broker.subscription.message.data.WorkflowInstanceSubscriptionRecord;
import io.zeebe.broker.util.StreamProcessorControl;
import io.zeebe.broker.util.StreamProcessorRule;
import io.zeebe.broker.workflow.gateway.ParallelGatewayStreamProcessorTest;
import io.zeebe.exporter.context.Controller;
import io.zeebe.exporter.record.Record;
import io.zeebe.exporter.record.RecordValue;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.msgpack.value.LongValue;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.zeebe.protocol.impl.record.value.deployment.DeploymentResource;
import io.zeebe.protocol.impl.record.value.deployment.ResourceType;
import io.zeebe.protocol.impl.record.value.deployment.Workflow;
import io.zeebe.protocol.impl.record.value.job.JobBatchRecord;
import io.zeebe.protocol.impl.record.value.job.JobRecord;
import io.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord;
import io.zeebe.protocol.intent.DeploymentIntent;
import io.zeebe.protocol.intent.ExporterIntent;
import io.zeebe.protocol.intent.IncidentIntent;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.JobBatchIntent;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.protocol.intent.MessageIntent;
import io.zeebe.protocol.intent.MessageSubscriptionIntent;
import io.zeebe.protocol.intent.RaftIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.protocol.intent.WorkflowInstanceSubscriptionIntent;
import io.zeebe.raft.event.RaftConfigurationEvent;
import io.zeebe.raft.event.RaftConfigurationEventMember;
import io.zeebe.test.util.TestUtil;
import io.zeebe.util.buffer.BufferUtil;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:io/zeebe/broker/exporter/stream/ExporterStreamProcessorTest.class */
public class ExporterStreamProcessorTest {
    private static final int PARTITION_ID = 1;
    private static final ExporterObjectMapper OBJECT_MAPPER = new ExporterObjectMapper();
    private static final Map<String, Object> PAYLOAD = Collections.singletonMap(TypedStreamProcessorTest.STREAM_NAME, TestJarExporter.FOO);
    private static final String PAYLOAD_JSON = OBJECT_MAPPER.toJson(PAYLOAD);
    private static final DirectBuffer PAYLOAD_MSGPACK = new UnsafeBuffer(OBJECT_MAPPER.toMsgpack(PAYLOAD));
    private static final Map<String, Object> CUSTOM_HEADERS = Collections.singletonMap("workerVersion", 42);
    private static final DirectBuffer CUSTOM_HEADERS_MSGPACK = new UnsafeBuffer(OBJECT_MAPPER.toMsgpack(CUSTOM_HEADERS));

    @Rule
    public StreamProcessorRule rule = new StreamProcessorRule();
    private List<ControlledTestExporter> exporters;

    @Test
    public void shouldConfigureAllExportersProperlyOnStart() throws InterruptedException {
        Map[] mapArr = {newConfig(TypedStreamProcessorTest.STREAM_NAME, TestJarExporter.FOO), newConfig(TestJarExporter.FOO, TypedStreamProcessorTest.STREAM_NAME)};
        List<ExporterDescriptor> createMockedExporters = createMockedExporters(mapArr);
        ExporterStreamProcessor exporterStreamProcessor = new ExporterStreamProcessor(PARTITION_ID, createMockedExporters);
        CountDownLatch countDownLatch = new CountDownLatch(this.exporters.size());
        Iterator<ControlledTestExporter> it = this.exporters.iterator();
        while (it.hasNext()) {
            it.next().onOpen(controller -> {
                countDownLatch.countDown();
            });
        }
        this.rule.initStreamProcessor(typedStreamEnvironment -> {
            return exporterStreamProcessor;
        }).start();
        Assertions.assertThat(countDownLatch.await(5L, TimeUnit.SECONDS)).isTrue();
        for (int i = 0; i < this.exporters.size(); i += PARTITION_ID) {
            Assertions.assertThat(this.exporters.get(i).getContext().getConfiguration().getId()).isEqualTo(createMockedExporters.get(i).getId());
            Assertions.assertThat(this.exporters.get(i).getContext().getConfiguration().getArguments()).isEqualTo(mapArr[i]);
            Assertions.assertThat(this.exporters.get(i).getContext().getLogger()).isNotNull();
            Assertions.assertThat(this.exporters.get(i).getController()).isNotNull();
        }
    }

    @Test
    public void shouldInstantiateConfigurationClass() {
        HashMap hashMap = new HashMap();
        hashMap.put(TestJarExporter.FOO, "baz");
        hashMap.put("y", Double.valueOf(32.12d));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(TypedStreamProcessorTest.STREAM_NAME, TestJarExporter.FOO);
        hashMap2.put("x", 123);
        hashMap2.put("nested", hashMap);
        ExporterStreamProcessor exporterStreamProcessor = new ExporterStreamProcessor(PARTITION_ID, Collections.singletonList(new ExporterDescriptor("instantiateConfiguration", PojoConfigurationExporter.class, hashMap2)));
        this.rule.runStreamProcessor(typedStreamEnvironment -> {
            return exporterStreamProcessor;
        });
        PojoConfigurationExporter.PojoExporterConfiguration pojoExporterConfiguration = PojoConfigurationExporter.configuration;
        Assertions.assertThat(pojoExporterConfiguration.foo).isEqualTo(TestJarExporter.FOO);
        Assertions.assertThat(pojoExporterConfiguration.x).isEqualTo(123);
        Assertions.assertThat(pojoExporterConfiguration.nested.bar).isEqualTo("baz");
        Assertions.assertThat(pojoExporterConfiguration.nested.y).isEqualTo(32.12d);
    }

    @Test
    public void shouldCloseAllExportersOnClose() {
        boolean[] zArr = {false, false};
        ExporterStreamProcessor createStreamProcessor = createStreamProcessor(zArr.length);
        this.exporters.get(0).onClose(() -> {
            zArr[0] = PARTITION_ID;
        });
        this.exporters.get(PARTITION_ID).onClose(() -> {
            zArr[PARTITION_ID] = PARTITION_ID;
        });
        StreamProcessorControl initStreamProcessor = this.rule.initStreamProcessor(typedStreamEnvironment -> {
            return createStreamProcessor;
        });
        initStreamProcessor.start();
        for (int i = 0; i < this.exporters.size(); i += PARTITION_ID) {
            Assertions.assertThat(zArr[i]).isFalse();
        }
        initStreamProcessor.close();
        for (int i2 = 0; i2 < this.exporters.size(); i2 += PARTITION_ID) {
            Assertions.assertThat(zArr[i2]).isTrue();
        }
    }

    @Test
    public void shouldRestartEachExporterFromCorrectPosition() {
        ExporterStreamProcessor createStreamProcessor = createStreamProcessor(2);
        StreamProcessorControl initStreamProcessor = this.rule.initStreamProcessor(typedStreamEnvironment -> {
            return createStreamProcessor;
        });
        long writeEvent = writeEvent();
        long writeEvent2 = writeEvent();
        initStreamProcessor.blockAfterEvent(loggedEvent -> {
            return loggedEvent.getPosition() == writeEvent2;
        });
        initStreamProcessor.start();
        initStreamProcessor.getClass();
        TestUtil.waitUntil(initStreamProcessor::isBlocked);
        initStreamProcessor.unblock();
        TestUtil.waitUntil(() -> {
            return this.exporters.get(0).getExportedRecords().size() == 2;
        });
        TestUtil.waitUntil(() -> {
            return this.exporters.get(PARTITION_ID).getExportedRecords().size() == 2;
        });
        this.exporters.get(0).getController().updateLastExportedRecordPosition(writeEvent2);
        this.exporters.get(PARTITION_ID).getController().updateLastExportedRecordPosition(writeEvent);
        initStreamProcessor.close();
        initStreamProcessor.blockAfterEvent(loggedEvent2 -> {
            return loggedEvent2.getPosition() == writeEvent2;
        });
        initStreamProcessor.start();
        initStreamProcessor.getClass();
        TestUtil.waitUntil(initStreamProcessor::isBlocked);
        Assertions.assertThat(this.exporters.get(0).getExportedRecords()).hasSize(2);
        Assertions.assertThat(this.exporters.get(0).getExportedRecords().get(0).getPosition()).isEqualTo(writeEvent);
        Assertions.assertThat(this.exporters.get(0).getExportedRecords().get(PARTITION_ID).getPosition()).isEqualTo(writeEvent2);
        Assertions.assertThat(this.exporters.get(PARTITION_ID).getExportedRecords()).hasSize(3);
        Assertions.assertThat(this.exporters.get(PARTITION_ID).getExportedRecords().get(0).getPosition()).isEqualTo(writeEvent);
        Assertions.assertThat(this.exporters.get(PARTITION_ID).getExportedRecords().get(PARTITION_ID).getPosition()).isEqualTo(writeEvent2);
        Assertions.assertThat(this.exporters.get(PARTITION_ID).getExportedRecords().get(2).getPosition()).isEqualTo(writeEvent2);
    }

    @Test
    public void shouldRecoverPositionsFromLogStream() {
        List<ExporterDescriptor> createMockedExporters = createMockedExporters(PARTITION_ID);
        ExporterStreamProcessor exporterStreamProcessor = new ExporterStreamProcessor(PARTITION_ID, createMockedExporters);
        ExporterStreamProcessorState stateController = exporterStreamProcessor.getStateController();
        StreamProcessorControl initStreamProcessor = this.rule.initStreamProcessor(typedStreamEnvironment -> {
            return exporterStreamProcessor;
        });
        long writeEvent = writeEvent();
        long writeExporterEvent = writeExporterEvent(createMockedExporters.get(0).getId(), writeEvent);
        initStreamProcessor.blockAfterEvent(loggedEvent -> {
            return loggedEvent.getPosition() == writeExporterEvent;
        });
        initStreamProcessor.start();
        initStreamProcessor.getClass();
        TestUtil.waitUntil(initStreamProcessor::isBlocked);
        Assertions.assertThat(stateController.getPosition(createMockedExporters.get(0).getId())).isEqualTo(writeEvent);
    }

    @Test
    public void shouldRetryExportingOnException() {
        ExporterStreamProcessor createStreamProcessor = createStreamProcessor(3);
        AtomicLong atomicLong = new AtomicLong(3L);
        this.exporters.get(PARTITION_ID).onExport(record -> {
            if (atomicLong.getAndDecrement() > 0) {
                throw new RuntimeException("Export failed (expected)");
            }
        });
        StreamProcessorControl initStreamProcessor = this.rule.initStreamProcessor(typedStreamEnvironment -> {
            return createStreamProcessor;
        });
        long writeEvent = writeEvent();
        long writeEvent2 = writeEvent();
        initStreamProcessor.blockAfterEvent(loggedEvent -> {
            return loggedEvent.getPosition() == writeEvent2;
        });
        initStreamProcessor.start();
        initStreamProcessor.getClass();
        TestUtil.waitUntil(initStreamProcessor::isBlocked);
        Iterator<ControlledTestExporter> it = this.exporters.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(it.next().getExportedRecords()).extracting("position").containsExactly(new Object[]{Long.valueOf(writeEvent), Long.valueOf(writeEvent2)});
        }
    }

    @Test
    public void shouldExecuteScheduledTask() throws InterruptedException {
        ExporterStreamProcessor createStreamProcessor = createStreamProcessor(PARTITION_ID);
        CountDownLatch countDownLatch = new CountDownLatch(PARTITION_ID);
        Duration ofSeconds = Duration.ofSeconds(10L);
        ControlledTestExporter controlledTestExporter = this.exporters.get(0);
        controlledTestExporter.onExport(record -> {
            Controller controller = controlledTestExporter.getController();
            countDownLatch.getClass();
            controller.scheduleTask(ofSeconds, countDownLatch::countDown);
        });
        long writeEvent = writeEvent();
        StreamProcessorControl initStreamProcessor = this.rule.initStreamProcessor(typedStreamEnvironment -> {
            return createStreamProcessor;
        });
        initStreamProcessor.blockAfterEvent(loggedEvent -> {
            return loggedEvent.getPosition() == writeEvent;
        });
        initStreamProcessor.start();
        initStreamProcessor.getClass();
        TestUtil.waitUntil(initStreamProcessor::isBlocked);
        this.rule.getClock().addTime(ofSeconds.plusSeconds(10L));
        Assertions.assertThat(countDownLatch.await(1L, TimeUnit.SECONDS)).isTrue();
    }

    @Test
    public void shouldNotExportExporterRecords() {
        List<ExporterDescriptor> createMockedExporters = createMockedExporters(PARTITION_ID);
        ExporterStreamProcessor exporterStreamProcessor = new ExporterStreamProcessor(PARTITION_ID, createMockedExporters);
        StreamProcessorControl initStreamProcessor = this.rule.initStreamProcessor(typedStreamEnvironment -> {
            return exporterStreamProcessor;
        });
        long writeEvent = writeEvent();
        long writeExporterEvent = writeExporterEvent(createMockedExporters.get(0).getId(), writeEvent);
        initStreamProcessor.blockAfterEvent(loggedEvent -> {
            return loggedEvent.getPosition() == writeExporterEvent;
        });
        initStreamProcessor.start();
        initStreamProcessor.getClass();
        TestUtil.waitUntil(initStreamProcessor::isBlocked);
        Assertions.assertThat(this.exporters.get(0).getExportedRecords()).hasSize(PARTITION_ID);
        Assertions.assertThat(this.exporters.get(0).getExportedRecords().get(0).getPosition()).isEqualTo(writeEvent);
    }

    @Test
    public void shouldExportDeploymentEvent() {
        ResourceType resourceType = ResourceType.BPMN_XML;
        DirectBuffer wrapString = BufferUtil.wrapString("contents");
        DeploymentRecord deploymentRecord = new DeploymentRecord();
        ((DeploymentResource) deploymentRecord.resources().add()).setResourceName(BufferUtil.wrapString("resource")).setResourceType(resourceType).setResource(wrapString);
        ((Workflow) deploymentRecord.workflows().add()).setBpmnProcessId(BufferUtil.wrapString("testProcess")).setKey(123L).setResourceName(BufferUtil.wrapString("resource")).setVersion(12);
        assertRecordExported(DeploymentIntent.CREATE, deploymentRecord, new DeploymentRecordValueImpl(OBJECT_MAPPER, Collections.singletonList(new DeployedWorkflowImpl("testProcess", "resource", 123L, 12)), Collections.singletonList(new DeploymentResourceImpl(BufferUtil.bufferAsArray(wrapString), io.zeebe.exporter.record.value.deployment.ResourceType.BPMN_XML, "resource"))));
    }

    @Test
    public void shouldExportIncidentRecord() {
        ErrorType errorType = ErrorType.IO_MAPPING_ERROR;
        assertRecordExported(IncidentIntent.CREATED, new IncidentRecord().setElementInstanceKey(34L).setWorkflowInstanceKey(10L).setFailureEventPosition(12L).setElementId(BufferUtil.wrapString("activity")).setBpmnProcessId(BufferUtil.wrapString(ParallelGatewayStreamProcessorTest.PROCESS_ID)).setErrorMessage("error").setErrorType(errorType).setJobKey(123L).setPayload(PAYLOAD_MSGPACK), new IncidentRecordValueImpl(OBJECT_MAPPER, PAYLOAD_JSON, errorType.name(), "error", ParallelGatewayStreamProcessorTest.PROCESS_ID, "activity", 10L, 34L, 123L));
    }

    @Test
    public void shouldExportJobRecord() {
        JobRecord deadline = new JobRecord().setWorker(BufferUtil.wrapString("myWorker")).setType(BufferUtil.wrapString("myType")).setPayload(PAYLOAD_MSGPACK).setRetries(12).setDeadline(13L);
        deadline.getHeaders().setBpmnProcessId(BufferUtil.wrapString("test-process")).setWorkflowKey(13L).setWorkflowDefinitionVersion(12).setWorkflowInstanceKey(1234L).setElementId(BufferUtil.wrapString("activity")).setElementInstanceKey(123L);
        deadline.setCustomHeaders(CUSTOM_HEADERS_MSGPACK);
        assertRecordExported(JobIntent.CREATED, deadline, new JobRecordValueImpl(OBJECT_MAPPER, PAYLOAD_JSON, "myType", "myWorker", Instant.ofEpochMilli(13L), new HeadersImpl("test-process", "activity", 123L, 1234L, 13L, 12), CUSTOM_HEADERS, 12));
    }

    @Test
    public void shouldExportMessageRecord() {
        assertRecordExported(MessageIntent.PUBLISHED, new MessageRecord().setCorrelationKey(BufferUtil.wrapString("test-key")).setName(BufferUtil.wrapString("test-message")).setPayload(PAYLOAD_MSGPACK).setTimeToLive(12L).setMessageId(BufferUtil.wrapString("test-id")), new MessageRecordValueImpl(OBJECT_MAPPER, PAYLOAD_JSON, "test-message", "test-id", "test-key", 12L));
    }

    @Test
    public void shouldExportMessageSubscriptionRecord() {
        assertRecordExported(MessageSubscriptionIntent.CORRELATE, new MessageSubscriptionRecord().setElementInstanceKey(1L).setMessageName(BufferUtil.wrapString("name")).setWorkflowInstanceKey(1L).setCorrelationKey(BufferUtil.wrapString("key")), new MessageSubscriptionRecordValueImpl(OBJECT_MAPPER, "name", "key", 1L, 1L));
    }

    @Test
    public void shouldExportRaftRecord() {
        List list = (List) IntStream.of(4).boxed().collect(Collectors.toList());
        RaftConfigurationEvent raftConfigurationEvent = new RaftConfigurationEvent();
        list.forEach(num -> {
            ((RaftConfigurationEventMember) raftConfigurationEvent.members().add()).setNodeId(num.intValue());
        });
        assertRecordExported(RaftIntent.MEMBER_ADDED, raftConfigurationEvent, new RaftRecordValueImpl(OBJECT_MAPPER, (List) list.stream().map((v1) -> {
            return new RaftMemberImpl(v1);
        }).collect(Collectors.toList())));
    }

    @Test
    public void shouldExportWorkflowInstanceRecord() {
        assertRecordExported(WorkflowInstanceIntent.ELEMENT_READY, new WorkflowInstanceRecord().setElementId("activity").setPayload(PAYLOAD_MSGPACK).setBpmnProcessId(BufferUtil.wrapString("test-process")).setVersion(12).setWorkflowKey(13L).setWorkflowInstanceKey(1234L).setScopeInstanceKey(123L), new WorkflowInstanceRecordValueImpl(OBJECT_MAPPER, PAYLOAD_JSON, "test-process", "activity", 12, 13L, 1234L, 123L));
    }

    @Test
    public void shouldExportWorkflowInstanceSubscriptionRecord() {
        assertRecordExported(WorkflowInstanceSubscriptionIntent.OPENED, new WorkflowInstanceSubscriptionRecord().setElementInstanceKey(123L).setMessageName(BufferUtil.wrapString("test-message")).setSubscriptionPartitionId(2).setWorkflowInstanceKey(1345L).setPayload(PAYLOAD_MSGPACK), new WorkflowInstanceSubscriptionRecordValueImpl(OBJECT_MAPPER, PAYLOAD_JSON, "test-message", 1345L, 123L));
    }

    @Test
    public void shouldExportJobBatchRecord() {
        JobBatchRecord worker = new JobBatchRecord().setAmount(PARTITION_ID).setTimeout(2L).setType("type").setWorker("worker");
        ((LongValue) worker.jobKeys().add()).setValue(3L);
        JobRecord jobRecord = (JobRecord) worker.jobs().add();
        jobRecord.setWorker(BufferUtil.wrapString("worker")).setType(BufferUtil.wrapString("type")).setPayload(PAYLOAD_MSGPACK).setRetries(3).setDeadline(1000L);
        jobRecord.getHeaders().setBpmnProcessId(BufferUtil.wrapString("test-process")).setWorkflowKey(13L).setWorkflowDefinitionVersion(12).setWorkflowInstanceKey(1234L).setElementId(BufferUtil.wrapString("activity")).setElementInstanceKey(123L);
        assertRecordExported(JobBatchIntent.ACTIVATED, worker, new JobBatchRecordValueImpl(OBJECT_MAPPER, "type", "worker", Duration.ofMillis(2L), PARTITION_ID, Arrays.asList(3L), Arrays.asList(new JobRecordValueImpl(OBJECT_MAPPER, PAYLOAD_JSON, "type", "worker", Instant.ofEpochMilli(1000L), new HeadersImpl("test-process", "activity", 123L, 1234L, 13L, 12), Collections.EMPTY_MAP, 3))));
    }

    private ExporterStreamProcessor createStreamProcessor(int i) {
        return new ExporterStreamProcessor(PARTITION_ID, createMockedExporters(i));
    }

    private List<ExporterDescriptor> createMockedExporters(int i) {
        return createMockedExporters(i, new Map[0]);
    }

    private List<ExporterDescriptor> createMockedExporters(Map... mapArr) {
        return createMockedExporters(mapArr.length, mapArr);
    }

    private List<ExporterDescriptor> createMockedExporters(int i, Map[] mapArr) {
        ArrayList arrayList = new ArrayList(i);
        this.exporters = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2 += PARTITION_ID) {
            Map map = mapArr.length > 0 ? mapArr[i2] : null;
            ControlledTestExporter controlledTestExporter = (ControlledTestExporter) Mockito.spy(new ControlledTestExporter());
            ExporterDescriptor exporterDescriptor = (ExporterDescriptor) Mockito.spy(new ExporterDescriptor(String.valueOf(i2), controlledTestExporter.getClass(), map));
            ((ExporterDescriptor) Mockito.doAnswer(invocationOnMock -> {
                return controlledTestExporter;
            }).when(exporterDescriptor)).newInstance();
            this.exporters.add(controlledTestExporter);
            arrayList.add(exporterDescriptor);
        }
        return arrayList;
    }

    private long writeEvent() {
        return this.rule.writeEvent(DeploymentIntent.CREATED, new DeploymentRecord());
    }

    private long writeExporterEvent(String str, long j) {
        UnpackedObject exporterRecord = new ExporterRecord();
        ExporterRecord.ExporterPosition exporterPosition = (ExporterRecord.ExporterPosition) exporterRecord.getPositions().add();
        exporterPosition.setId(str);
        exporterPosition.setPosition(j);
        return this.rule.writeEvent(ExporterIntent.EXPORTED, exporterRecord);
    }

    private Map<String, Object> newConfig(String... strArr) {
        HashMap hashMap = new HashMap();
        for (int i = 0; i < strArr.length; i += 2) {
            hashMap.put(strArr[i], strArr[i + PARTITION_ID]);
        }
        return hashMap;
    }

    private void assertRecordExported(Intent intent, UnpackedObject unpackedObject, RecordValue recordValue) {
        StreamProcessorControl initStreamProcessor = this.rule.initStreamProcessor(typedStreamEnvironment -> {
            return createStreamProcessor(PARTITION_ID);
        });
        long writeEvent = this.rule.writeEvent(intent, unpackedObject);
        initStreamProcessor.blockAfterEvent(loggedEvent -> {
            return loggedEvent.getPosition() == writeEvent;
        });
        initStreamProcessor.start();
        initStreamProcessor.getClass();
        TestUtil.waitUntil(initStreamProcessor::isBlocked);
        List<Record> exportedRecords = this.exporters.get(0).getExportedRecords();
        Assertions.assertThat(exportedRecords).hasSize(PARTITION_ID);
        Record record = exportedRecords.get(0);
        LoggedEvent withPosition = this.rule.events().withPosition(writeEvent);
        RecordMetadata recordMetadata = new RecordMetadata();
        io.zeebe.exporter.record.Assertions.assertThat(record).hasPosition(withPosition.getPosition()).hasRaftTerm(withPosition.getRaftTerm()).hasSourceRecordPosition(withPosition.getSourceEventPosition()).hasProducerId(withPosition.getProducerId()).hasKey(withPosition.getKey()).hasTimestamp(Instant.ofEpochMilli(withPosition.getTimestamp()));
        withPosition.readMetadata(recordMetadata);
        io.zeebe.exporter.record.Assertions.assertThat(record.getMetadata()).hasIntent(recordMetadata.getIntent()).hasPartitionId(PARTITION_ID).hasRecordType(recordMetadata.getRecordType()).hasRejectionType(recordMetadata.getRejectionType()).hasRejectionReason(BufferUtil.bufferAsString(recordMetadata.getRejectionReason())).hasValueType(recordMetadata.getValueType());
        io.zeebe.exporter.record.Assertions.assertThat(record).hasValue(recordValue);
    }
}
