package io.zeebe.broker.workflow.message;

import io.zeebe.broker.exporter.util.TestJarExporter;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessorTest;
import io.zeebe.broker.test.EmbeddedBrokerConfigurator;
import io.zeebe.broker.test.EmbeddedBrokerRule;
import io.zeebe.broker.workflow.WorkflowAssert;
import io.zeebe.broker.workflow.gateway.ParallelGatewayStreamProcessorTest;
import io.zeebe.exporter.record.Record;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.model.bpmn.BpmnModelInstance;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.intent.DeploymentIntent;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.MessageSubscriptionIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import io.zeebe.protocol.intent.WorkflowInstanceSubscriptionIntent;
import io.zeebe.test.broker.protocol.clientapi.ClientApiRule;
import io.zeebe.test.broker.protocol.clientapi.PartitionTestClient;
import io.zeebe.test.util.MsgPackUtil;
import io.zeebe.test.util.record.RecordingExporter;
import java.util.List;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:io/zeebe/broker/workflow/message/MessageCatchElementTest.class */
public class MessageCatchElementTest {
    public EmbeddedBrokerRule brokerRule = new EmbeddedBrokerRule(EmbeddedBrokerConfigurator.setPartitionCount(3));
    public ClientApiRule apiRule;

    @Rule
    public RuleChain ruleChain;
    private static final BpmnModelInstance CATCH_EVENT_WORKFLOW = Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().intermediateCatchEvent("receive-message").message(messageBuilder -> {
        messageBuilder.name("order canceled").zeebeCorrelationKey("$.orderId");
    }).sequenceFlowId("to-end").endEvent().done();
    private static final BpmnModelInstance RECEIVE_TASK_WORKFLOW = Bpmn.createExecutableProcess(ParallelGatewayStreamProcessorTest.PROCESS_ID).startEvent().receiveTask("receive-message").message(messageBuilder -> {
        messageBuilder.name("order canceled").zeebeCorrelationKey("$.orderId");
    }).sequenceFlowId("to-end").endEvent().done();

    @Parameterized.Parameter(0)
    public String elementType;

    @Parameterized.Parameter(1)
    public BpmnModelInstance workflow;
    private PartitionTestClient testClient;

    public MessageCatchElementTest() {
        EmbeddedBrokerRule embeddedBrokerRule = this.brokerRule;
        embeddedBrokerRule.getClass();
        this.apiRule = new ClientApiRule(embeddedBrokerRule::getClientAddress);
        this.ruleChain = RuleChain.outerRule(this.brokerRule).around(this.apiRule);
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Parameterized.Parameters(name = "{0}")
    public static final Object[][] parameters() {
        return new Object[]{new Object[]{"intermediate message catch event", CATCH_EVENT_WORKFLOW}, new Object[]{"receive task", RECEIVE_TASK_WORKFLOW}};
    }

    @Before
    public void init() {
        this.apiRule.waitForPartition(3);
        this.testClient = this.apiRule.partitionClient();
        long deploy = this.testClient.deploy(this.workflow);
        this.testClient.receiveFirstDeploymentEvent(DeploymentIntent.CREATED, deploy);
        this.apiRule.partitionClient(1).receiveFirstDeploymentEvent(DeploymentIntent.CREATED, deploy);
        this.apiRule.partitionClient(2).receiveFirstDeploymentEvent(DeploymentIntent.CREATED, deploy);
    }

    @Test
    public void testWorkflowInstanceLifeCycle() {
        this.testClient.publishMessage("order canceled", "order-123");
        this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, MsgPackUtil.asMsgPack("orderId", "order-123"));
        Assertions.assertThat((List) this.testClient.receiveWorkflowInstances().limit(10L).collect(Collectors.toList())).extracting((v0) -> {
            return v0.getMetadata();
        }).extracting((v0) -> {
            return v0.getIntent();
        }).containsExactly(new Intent[]{WorkflowInstanceIntent.CREATE, WorkflowInstanceIntent.ELEMENT_READY, WorkflowInstanceIntent.ELEMENT_ACTIVATED, WorkflowInstanceIntent.START_EVENT_OCCURRED, WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN, WorkflowInstanceIntent.ELEMENT_READY, WorkflowInstanceIntent.ELEMENT_ACTIVATED, WorkflowInstanceIntent.ELEMENT_COMPLETING, WorkflowInstanceIntent.ELEMENT_COMPLETED, WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN});
    }

    @Test
    public void shouldActivateElement() {
        WorkflowAssert.assertWorkflowInstanceRecord(this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, MsgPackUtil.asMsgPack("orderId", "order-123")), "receive-message", this.testClient.receiveElementInState("receive-message", WorkflowInstanceIntent.ELEMENT_ACTIVATED));
    }

    @Test
    public void shouldOpenMessageSubscription() {
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, MsgPackUtil.asMsgPack("orderId", "order-123"));
        Record receiveElementInState = this.testClient.receiveElementInState("receive-message", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        Record record = (Record) RecordingExporter.messageSubscriptionRecords(MessageSubscriptionIntent.OPENED).getFirst();
        Assertions.assertThat(record.getMetadata().getValueType()).isEqualTo(ValueType.MESSAGE_SUBSCRIPTION);
        Assertions.assertThat(record.getMetadata().getRecordType()).isEqualTo(RecordType.EVENT);
        WorkflowAssert.assertMessageSubscription(createWorkflowInstance, "order-123", receiveElementInState, record);
    }

    @Test
    public void shouldOpenWorkflowInstanceSubscription() {
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, MsgPackUtil.asMsgPack("orderId", "order-123"));
        Record receiveElementInState = this.testClient.receiveElementInState("receive-message", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        Record record = (Record) this.testClient.receiveWorkflowInstanceSubscriptions().withIntent(WorkflowInstanceSubscriptionIntent.OPENED).getFirst();
        Assertions.assertThat(record.getMetadata().getValueType()).isEqualTo(ValueType.WORKFLOW_INSTANCE_SUBSCRIPTION);
        Assertions.assertThat(record.getMetadata().getRecordType()).isEqualTo(RecordType.EVENT);
        WorkflowAssert.assertWorkflowSubscription(createWorkflowInstance, receiveElementInState, record);
    }

    @Test
    public void shouldCorrelateWorkflowInstanceSubscription() {
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, MsgPackUtil.asMsgPack("orderId", "order-123"));
        Record receiveElementInState = this.testClient.receiveElementInState("receive-message", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        this.testClient.publishMessage("order canceled", "order-123", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, TestJarExporter.FOO));
        Record record = (Record) this.testClient.receiveWorkflowInstanceSubscriptions().withIntent(WorkflowInstanceSubscriptionIntent.CORRELATED).getFirst();
        Assertions.assertThat(record.getMetadata().getValueType()).isEqualTo(ValueType.WORKFLOW_INSTANCE_SUBSCRIPTION);
        Assertions.assertThat(record.getMetadata().getRecordType()).isEqualTo(RecordType.EVENT);
        WorkflowAssert.assertWorkflowSubscription(createWorkflowInstance, "{\"foo\":\"bar\"}", receiveElementInState, record);
    }

    @Test
    public void shouldCorrelateMessageSubscription() {
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, MsgPackUtil.asMsgPack("orderId", "order-123"));
        Record receiveElementInState = this.testClient.receiveElementInState("receive-message", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        this.testClient.publishMessage("order canceled", "order-123", MsgPackUtil.asMsgPack(TypedStreamProcessorTest.STREAM_NAME, TestJarExporter.FOO));
        Record record = (Record) this.testClient.receiveMessageSubscriptions().withIntent(MessageSubscriptionIntent.CORRELATED).getFirst();
        Assertions.assertThat(record.getMetadata().getValueType()).isEqualTo(ValueType.MESSAGE_SUBSCRIPTION);
        Assertions.assertThat(record.getMetadata().getRecordType()).isEqualTo(RecordType.EVENT);
        WorkflowAssert.assertMessageSubscription(createWorkflowInstance, receiveElementInState, record);
    }

    @Test
    public void shouldCloseMessageSubscription() {
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, MsgPackUtil.asMsgPack("orderId", "order-123"));
        Record receiveElementInState = this.testClient.receiveElementInState("receive-message", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        this.testClient.cancelWorkflowInstance(createWorkflowInstance);
        Record record = (Record) RecordingExporter.messageSubscriptionRecords(MessageSubscriptionIntent.CLOSED).getFirst();
        Assertions.assertThat(record.getMetadata().getRecordType()).isEqualTo(RecordType.EVENT);
        io.zeebe.exporter.record.Assertions.assertThat(record.getValue()).hasWorkflowInstanceKey(createWorkflowInstance).hasElementInstanceKey(receiveElementInState.getKey()).hasMessageName("").hasCorrelationKey("");
    }

    @Test
    public void shouldCloseWorkflowInstanceSubscription() {
        long createWorkflowInstance = this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, MsgPackUtil.asMsgPack("orderId", "order-123"));
        Record receiveElementInState = this.testClient.receiveElementInState("receive-message", WorkflowInstanceIntent.ELEMENT_ACTIVATED);
        this.testClient.cancelWorkflowInstance(createWorkflowInstance);
        Record record = (Record) RecordingExporter.workflowInstanceSubscriptionRecords(WorkflowInstanceSubscriptionIntent.CLOSED).getFirst();
        Assertions.assertThat(record.getMetadata().getRecordType()).isEqualTo(RecordType.EVENT);
        io.zeebe.exporter.record.Assertions.assertThat(record.getValue()).hasWorkflowInstanceKey(createWorkflowInstance).hasElementInstanceKey(receiveElementInState.getKey()).hasMessageName("");
    }

    @Test
    public void shouldCorrelateMessageAndContinue() {
        this.testClient.createWorkflowInstance(ParallelGatewayStreamProcessorTest.PROCESS_ID, MsgPackUtil.asMsgPack("orderId", "order-123"));
        this.testClient.publishMessage("order canceled", "order-123");
        Assertions.assertThat(RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.ELEMENT_COMPLETED).withElementId("receive-message").exists()).isTrue();
        Assertions.assertThat(RecordingExporter.workflowInstanceRecords(WorkflowInstanceIntent.SEQUENCE_FLOW_TAKEN).withElementId("to-end").exists()).isTrue();
    }
}
