package io.zeebe.broker.workflow.processor;

import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.subscription.command.SubscriptionCommandSender;
import io.zeebe.broker.subscription.message.data.WorkflowInstanceSubscriptionRecord;
import io.zeebe.broker.subscription.message.processor.MessageStreamProcessor;
import io.zeebe.broker.util.StreamProcessorControl;
import io.zeebe.broker.util.StreamProcessorRule;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.model.bpmn.BpmnModelInstance;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.protocol.intent.WorkflowInstanceSubscriptionIntent;
import io.zeebe.test.util.MsgPackUtil;
import io.zeebe.test.util.TestUtil;
import io.zeebe.util.buffer.BufferUtil;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.agrona.DirectBuffer;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/zeebe/broker/workflow/processor/WorkflowInstanceStreamProcessorTest.class */
public class WorkflowInstanceStreamProcessorTest {
    private static final String PROCESS_ID = "process";
    private static final BpmnModelInstance SERVICE_TASK_WORKFLOW = Bpmn.createExecutableProcess("process").startEvent("start").sequenceFlowId("flow1").serviceTask("task", serviceTaskBuilder -> {
        serviceTaskBuilder.zeebeTaskType("taskType");
    }).sequenceFlowId("flow2").endEvent("end").done();
    private static final BpmnModelInstance SUB_PROCESS_WORKFLOW = Bpmn.createExecutableProcess("process").startEvent().subProcess("subProcess").embeddedSubProcess().startEvent().serviceTask("task", serviceTaskBuilder -> {
        serviceTaskBuilder.zeebeTaskType("taskType");
    }).endEvent().subProcessDone().endEvent().done();
    private static final BpmnModelInstance MESSAGE_CATCH_EVENT_WORKFLOW = Bpmn.createExecutableProcess("process").startEvent().intermediateCatchEvent("catch-event", intermediateCatchEventBuilder -> {
        intermediateCatchEventBuilder.message(messageBuilder -> {
            messageBuilder.name("order canceled").zeebeCorrelationKey("$.orderId");
        });
    }).done();
    public StreamProcessorRule envRule = new StreamProcessorRule();
    public WorkflowInstanceStreamProcessorRule streamProcessorRule = new WorkflowInstanceStreamProcessorRule(this.envRule);

    @Rule
    public RuleChain chain = RuleChain.outerRule(this.envRule).around(this.streamProcessorRule);
    private StreamProcessorControl streamProcessor;

    @Before
    public void setUp() {
        this.streamProcessor = this.streamProcessorRule.getStreamProcessor();
    }

    @Test
    public void shouldRejectCancellationInDirectSuccession() {
        this.streamProcessorRule.deploy(SERVICE_TASK_WORKFLOW);
        this.streamProcessor.blockAfterJobEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == JobIntent.CREATE;
        });
        TypedRecord<WorkflowInstanceRecord> createWorkflowInstance = this.streamProcessorRule.createWorkflowInstance("process");
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.envRule.writeCommand(createWorkflowInstance.getKey(), WorkflowInstanceIntent.CANCEL, createWorkflowInstance.getValue());
        long writeCommand = this.envRule.writeCommand(createWorkflowInstance.getKey(), WorkflowInstanceIntent.CANCEL, createWorkflowInstance.getValue());
        this.streamProcessor.unblock();
        this.streamProcessorRule.awaitElementInState("process", WorkflowInstanceIntent.ELEMENT_TERMINATED);
        LifecycleAssert.assertThat((List) this.envRule.events().onlyStatesOf("process").collect(Collectors.toList())).compliesWithCompleteLifecycle();
        TypedRecord typedRecord2 = (TypedRecord) this.envRule.events().onlyWorkflowInstanceRecords().onlyRejections().findFirst().get();
        Assertions.assertThat(typedRecord2.getMetadata().getIntent()).isEqualTo(WorkflowInstanceIntent.CANCEL);
        Assertions.assertThat(typedRecord2.getSourcePosition()).isEqualTo(writeCommand);
        Assertions.assertThat(BufferUtil.bufferAsString(typedRecord2.getMetadata().getRejectionReason())).isEqualTo("Workflow instance is not running");
    }

    @Test
    public void shouldCancelActivityInStateReady() {
        this.streamProcessorRule.deploy(SERVICE_TASK_WORKFLOW);
        this.streamProcessor.blockAfterWorkflowInstanceRecord(isForElement("start"));
        TypedRecord<WorkflowInstanceRecord> createWorkflowInstance = this.streamProcessorRule.createWorkflowInstance("process");
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.envRule.writeCommand(createWorkflowInstance.getKey(), WorkflowInstanceIntent.CANCEL, createWorkflowInstance.getValue());
        this.streamProcessor.unblock();
        this.streamProcessorRule.awaitElementInState("process", WorkflowInstanceIntent.ELEMENT_TERMINATED);
        List list = (List) this.envRule.events().onlyWorkflowInstanceRecords().collect(Collectors.toList());
        List list2 = (List) this.envRule.events().onlyStatesOf("process").collect(Collectors.toList());
        List list3 = (List) this.envRule.events().onlyStatesOf("task").collect(Collectors.toList());
        LifecycleAssert.assertThat(list2).compliesWithCompleteLifecycle().endsWith(WorkflowInstanceIntent.ELEMENT_TERMINATED, new WorkflowInstanceIntent[0]);
        LifecycleAssert.assertThat(list3).compliesWithCompleteLifecycle();
        WorkflowInstanceAssert.assertThat(list).doesNotEvaluateFlowAfterTerminatingElement("process");
    }

    @Test
    public void shouldCancelScopeBeforeTakingSequenceFlow() {
        this.streamProcessorRule.deploy(SERVICE_TASK_WORKFLOW);
        this.streamProcessor.blockAfterWorkflowInstanceRecord(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == WorkflowInstanceIntent.ELEMENT_COMPLETING;
        });
        TypedRecord<WorkflowInstanceRecord> createWorkflowInstance = this.streamProcessorRule.createWorkflowInstance("process");
        this.streamProcessorRule.completeFirstJob();
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.envRule.writeCommand(createWorkflowInstance.getKey(), WorkflowInstanceIntent.CANCEL, createWorkflowInstance.getValue());
        this.streamProcessor.unblock();
        this.streamProcessorRule.awaitElementInState("process", WorkflowInstanceIntent.ELEMENT_TERMINATED);
        List list = (List) this.envRule.events().onlyWorkflowInstanceRecords().collect(Collectors.toList());
        List list2 = (List) this.envRule.events().onlyStatesOf("process").collect(Collectors.toList());
        List list3 = (List) this.envRule.events().onlyStatesOf("task").collect(Collectors.toList());
        LifecycleAssert.assertThat(list2).compliesWithCompleteLifecycle().endsWith(WorkflowInstanceIntent.ELEMENT_TERMINATED, new WorkflowInstanceIntent[0]);
        LifecycleAssert.assertThat(list3).compliesWithCompleteLifecycle();
        WorkflowInstanceAssert.assertThat(list).doesNotEvaluateFlowAfterTerminatingElement("process");
    }

    @Test
    public void shouldCancelActivityInStateCompleting() {
        this.streamProcessorRule.deploy(SERVICE_TASK_WORKFLOW);
        this.streamProcessor.blockAfterJobEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == JobIntent.COMPLETED;
        });
        TypedRecord<WorkflowInstanceRecord> createWorkflowInstance = this.streamProcessorRule.createWorkflowInstance("process");
        this.streamProcessorRule.completeFirstJob();
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.envRule.writeCommand(createWorkflowInstance.getKey(), WorkflowInstanceIntent.CANCEL, createWorkflowInstance.getValue());
        this.streamProcessor.unblock();
        this.streamProcessorRule.awaitElementInState("process", WorkflowInstanceIntent.ELEMENT_TERMINATED);
        List list = (List) this.envRule.events().onlyWorkflowInstanceRecords().collect(Collectors.toList());
        List list2 = (List) this.envRule.events().onlyStatesOf("process").collect(Collectors.toList());
        List list3 = (List) this.envRule.events().onlyStatesOf("task").collect(Collectors.toList());
        LifecycleAssert.assertThat(list2).compliesWithCompleteLifecycle().endsWith(WorkflowInstanceIntent.ELEMENT_TERMINATED, new WorkflowInstanceIntent[0]);
        LifecycleAssert.assertThat(list3).compliesWithCompleteLifecycle();
        WorkflowInstanceAssert.assertThat(list).doesNotEvaluateFlowAfterTerminatingElement("process");
    }

    @Test
    public void shouldCancelAndCompleteJobConcurrentlyInSubProcess() {
        this.streamProcessorRule.deploy(SUB_PROCESS_WORKFLOW);
        this.streamProcessor.blockAfterJobEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == JobIntent.CREATE;
        });
        TypedRecord<WorkflowInstanceRecord> createWorkflowInstance = this.streamProcessorRule.createWorkflowInstance("process");
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.envRule.writeCommand(createWorkflowInstance.getKey(), WorkflowInstanceIntent.CANCEL, createWorkflowInstance.getValue());
        this.streamProcessorRule.completeFirstJob();
        this.streamProcessor.unblock();
        this.streamProcessorRule.awaitElementInState("process", WorkflowInstanceIntent.ELEMENT_TERMINATED);
        List list = (List) this.envRule.events().onlyStatesOf("process").collect(Collectors.toList());
        List list2 = (List) this.envRule.events().onlyStatesOf("subProcess").collect(Collectors.toList());
        List list3 = (List) this.envRule.events().onlyStatesOf("task").collect(Collectors.toList());
        LifecycleAssert.assertThat(list).compliesWithCompleteLifecycle().endsWith(WorkflowInstanceIntent.ELEMENT_TERMINATED, new WorkflowInstanceIntent[0]);
        LifecycleAssert.assertThat(list2).compliesWithCompleteLifecycle();
        LifecycleAssert.assertThat(list3).compliesWithCompleteLifecycle();
    }

    @Test
    public void shouldUpdateStateOnCommands() {
        this.streamProcessorRule.deploy(SERVICE_TASK_WORKFLOW);
        TypedRecord<WorkflowInstanceRecord> createWorkflowInstance = this.streamProcessorRule.createWorkflowInstance("process");
        this.streamProcessorRule.awaitElementInState("task", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        this.streamProcessor.blockAfterWorkflowInstanceRecord(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == WorkflowInstanceIntent.UPDATE_PAYLOAD;
        });
        UnpackedObject unpackedObject = (WorkflowInstanceRecord) createWorkflowInstance.getValue();
        unpackedObject.setPayload(MsgPackUtil.asMsgPack("key", "val"));
        this.envRule.writeCommand(createWorkflowInstance.getKey(), WorkflowInstanceIntent.UPDATE_PAYLOAD, unpackedObject);
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.streamProcessor.restart();
        this.streamProcessorRule.completeFirstJob();
        MsgPackUtil.assertEquality(this.streamProcessorRule.awaitElementInState("process", WorkflowInstanceIntent.ELEMENT_COMPLETED).getValue().getPayload(), "{'key': 'val'}");
    }

    @Test
    public void shouldRetryToOpenMessageSubscription() {
        this.streamProcessorRule.deploy(MESSAGE_CATCH_EVENT_WORKFLOW);
        this.streamProcessor.blockAfterWorkflowInstanceRecord(isForElement("catch-event", WorkflowInstanceIntent.ELEMENT_ACTIVATED));
        this.streamProcessorRule.createWorkflowInstance("process", MsgPackUtil.asMsgPack("orderId", "order-123"));
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        TypedRecord<WorkflowInstanceRecord> awaitElementInState = this.streamProcessorRule.awaitElementInState("catch-event", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        this.envRule.getClock().addTime(MessageStreamProcessor.SUBSCRIPTION_CHECK_INTERVAL.plus(MessageStreamProcessor.SUBSCRIPTION_TIMEOUT));
        this.streamProcessor.unblock();
        ((SubscriptionCommandSender) Mockito.verify(this.streamProcessorRule.getMockSubscriptionCommandSender(), Mockito.timeout(5000L).times(2))).openMessageSubscription(awaitElementInState.getValue().getWorkflowInstanceKey(), awaitElementInState.getKey(), BufferUtil.wrapString("order canceled"), BufferUtil.wrapString("order-123"));
    }

    @Test
    public void shouldRejectDuplicatedOpenWorkflowInstanceSubscription() {
        this.streamProcessorRule.deploy(MESSAGE_CATCH_EVENT_WORKFLOW);
        this.streamProcessor.blockAfterWorkflowInstanceRecord(isForElement("catch-event", WorkflowInstanceIntent.ELEMENT_ACTIVATED));
        this.streamProcessorRule.createWorkflowInstance("process", MsgPackUtil.asMsgPack("orderId", "order-123"));
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        UnpackedObject subscriptionRecordForEvent = subscriptionRecordForEvent(this.streamProcessorRule.awaitElementInState("catch-event", WorkflowInstanceIntent.ELEMENT_ACTIVATED));
        this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.OPEN, subscriptionRecordForEvent);
        long writeCommand = this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.OPEN, subscriptionRecordForEvent);
        this.streamProcessor.unblock();
        TypedRecord<WorkflowInstanceSubscriptionRecord> awaitAndGetFirstSubscriptionRejection = this.streamProcessorRule.awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getMetadata().getIntent()).isEqualTo(WorkflowInstanceSubscriptionIntent.OPEN);
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getSourcePosition()).isEqualTo(writeCommand);
        Assertions.assertThat(BufferUtil.bufferAsString(awaitAndGetFirstSubscriptionRejection.getMetadata().getRejectionReason())).isEqualTo("subscription is already open");
    }

    @Test
    public void shouldRejectDuplicatedCorrelateWorkflowInstanceSubscription() {
        this.streamProcessorRule.deploy(MESSAGE_CATCH_EVENT_WORKFLOW);
        this.streamProcessorRule.createWorkflowInstance("process", MsgPackUtil.asMsgPack("orderId", "order-123"));
        UnpackedObject subscriptionRecordForEvent = subscriptionRecordForEvent(this.streamProcessorRule.awaitElementInState("catch-event", WorkflowInstanceIntent.ELEMENT_ACTIVATED));
        this.streamProcessor.blockAfterWorkflowInstanceSubscriptionEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == WorkflowInstanceSubscriptionIntent.OPENED;
        });
        this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.OPEN, subscriptionRecordForEvent);
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.CORRELATE, subscriptionRecordForEvent);
        long writeCommand = this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.CORRELATE, subscriptionRecordForEvent);
        this.streamProcessor.unblock();
        TypedRecord<WorkflowInstanceSubscriptionRecord> awaitAndGetFirstSubscriptionRejection = this.streamProcessorRule.awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getMetadata().getIntent()).isEqualTo(WorkflowInstanceSubscriptionIntent.CORRELATE);
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getSourcePosition()).isEqualTo(writeCommand);
        Assertions.assertThat(BufferUtil.bufferAsString(awaitAndGetFirstSubscriptionRejection.getMetadata().getRejectionReason())).isEqualTo("subscription is already correlated");
        ((SubscriptionCommandSender) Mockito.verify(this.streamProcessorRule.getMockSubscriptionCommandSender(), Mockito.timeout(5000L).times(2))).correlateMessageSubscription(ArgumentMatchers.eq(subscriptionRecordForEvent.getSubscriptionPartitionId()), ArgumentMatchers.eq(subscriptionRecordForEvent.getWorkflowInstanceKey()), ArgumentMatchers.eq(subscriptionRecordForEvent.getElementInstanceKey()), (DirectBuffer) ArgumentMatchers.any());
    }

    @Test
    public void shouldRejectCorrelateWorkflowInstanceSubscription() {
        this.streamProcessorRule.deploy(MESSAGE_CATCH_EVENT_WORKFLOW);
        TypedRecord<WorkflowInstanceRecord> createWorkflowInstance = this.streamProcessorRule.createWorkflowInstance("process", MsgPackUtil.asMsgPack("orderId", "order-123"));
        TypedRecord<WorkflowInstanceRecord> awaitElementInState = this.streamProcessorRule.awaitElementInState("catch-event", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        UnpackedObject subscriptionRecordForEvent = subscriptionRecordForEvent(awaitElementInState);
        this.streamProcessor.blockAfterWorkflowInstanceSubscriptionEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == WorkflowInstanceSubscriptionIntent.OPENED;
        });
        this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.OPEN, subscriptionRecordForEvent);
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.envRule.writeCommand(createWorkflowInstance.getKey(), WorkflowInstanceIntent.CANCEL, awaitElementInState.getValue());
        this.streamProcessor.unblock();
        this.streamProcessorRule.awaitElementInState("process", WorkflowInstanceIntent.ELEMENT_TERMINATED);
        long writeCommand = this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.CORRELATE, subscriptionRecordForEvent);
        TypedRecord<WorkflowInstanceSubscriptionRecord> awaitAndGetFirstSubscriptionRejection = this.streamProcessorRule.awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getMetadata().getIntent()).isEqualTo(WorkflowInstanceSubscriptionIntent.CORRELATE);
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getSourcePosition()).isEqualTo(writeCommand);
        Assertions.assertThat(BufferUtil.bufferAsString(awaitAndGetFirstSubscriptionRejection.getMetadata().getRejectionReason())).isEqualTo("activity is not active anymore");
    }

    @Test
    public void shouldRetryToCloseMessageSubscription() {
        this.streamProcessorRule.deploy(MESSAGE_CATCH_EVENT_WORKFLOW);
        TypedRecord<WorkflowInstanceRecord> createWorkflowInstance = this.streamProcessorRule.createWorkflowInstance("process", MsgPackUtil.asMsgPack("orderId", "order-123"));
        TypedRecord<WorkflowInstanceRecord> awaitElementInState = this.streamProcessorRule.awaitElementInState("catch-event", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        WorkflowInstanceSubscriptionRecord subscriptionRecordForEvent = subscriptionRecordForEvent(awaitElementInState);
        this.envRule.writeCommand(createWorkflowInstance.getKey(), WorkflowInstanceIntent.CANCEL, awaitElementInState.getValue());
        this.streamProcessorRule.awaitElementInState("process", WorkflowInstanceIntent.ELEMENT_TERMINATED);
        this.envRule.getClock().addTime(MessageStreamProcessor.SUBSCRIPTION_CHECK_INTERVAL.plus(MessageStreamProcessor.SUBSCRIPTION_TIMEOUT));
        this.streamProcessor.unblock();
        ((SubscriptionCommandSender) Mockito.verify(this.streamProcessorRule.getMockSubscriptionCommandSender(), Mockito.timeout(5000L).times(2))).closeMessageSubscription(subscriptionRecordForEvent.getSubscriptionPartitionId(), subscriptionRecordForEvent.getWorkflowInstanceKey(), subscriptionRecordForEvent.getElementInstanceKey());
    }

    @Test
    public void shouldRejectDuplicatedCloseWorkflowInstanceSubscription() {
        this.streamProcessorRule.deploy(MESSAGE_CATCH_EVENT_WORKFLOW);
        this.streamProcessorRule.createWorkflowInstance("process", MsgPackUtil.asMsgPack("orderId", "order-123"));
        UnpackedObject subscriptionRecordForEvent = subscriptionRecordForEvent(this.streamProcessorRule.awaitElementInState("catch-event", WorkflowInstanceIntent.ELEMENT_ACTIVATED));
        this.streamProcessor.blockAfterWorkflowInstanceSubscriptionEvent(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == WorkflowInstanceSubscriptionIntent.OPENED;
        });
        this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.OPEN, subscriptionRecordForEvent);
        TestUtil.waitUntil(() -> {
            return this.streamProcessor.isBlocked();
        });
        this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.CLOSE, subscriptionRecordForEvent);
        long writeCommand = this.envRule.writeCommand(WorkflowInstanceSubscriptionIntent.CLOSE, subscriptionRecordForEvent);
        this.streamProcessor.unblock();
        TypedRecord<WorkflowInstanceSubscriptionRecord> awaitAndGetFirstSubscriptionRejection = this.streamProcessorRule.awaitAndGetFirstSubscriptionRejection();
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getMetadata().getIntent()).isEqualTo(WorkflowInstanceSubscriptionIntent.CLOSE);
        Assertions.assertThat(awaitAndGetFirstSubscriptionRejection.getSourcePosition()).isEqualTo(writeCommand);
        Assertions.assertThat(BufferUtil.bufferAsString(awaitAndGetFirstSubscriptionRejection.getMetadata().getRejectionReason())).isEqualTo("subscription is already closed");
    }

    private Predicate<TypedRecord<WorkflowInstanceRecord>> isForElement(String str) {
        return typedRecord -> {
            return BufferUtil.wrapString(str).equals(typedRecord.getValue().getElementId());
        };
    }

    private Predicate<TypedRecord<WorkflowInstanceRecord>> isForElement(String str, WorkflowInstanceIntent workflowInstanceIntent) {
        return isForElement(str).and(typedRecord -> {
            return typedRecord.getMetadata().getIntent() == workflowInstanceIntent;
        });
    }

    private WorkflowInstanceSubscriptionRecord subscriptionRecordForEvent(TypedRecord<WorkflowInstanceRecord> typedRecord) {
        return new WorkflowInstanceSubscriptionRecord().setSubscriptionPartitionId(0).setWorkflowInstanceKey(typedRecord.getValue().getWorkflowInstanceKey()).setElementInstanceKey(typedRecord.getKey()).setMessageName(BufferUtil.wrapString("order canceled"));
    }
}
