/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.jobs.service.stream;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.URI;
import java.time.ZonedDateTime;
import org.assertj.core.api.Assertions;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.kie.kogito.jobs.service.events.JobDataEvent;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.model.ScheduledJob;
import org.kie.kogito.jobs.service.model.job.JobDetails;
import org.kie.kogito.jobs.service.model.job.Recipient;
import org.kie.kogito.jobs.service.stream.AbstractJobStreams;
import org.kie.kogito.timer.Trigger;
import org.kie.kogito.timer.impl.PointInTimeTrigger;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.verification.VerificationMode;

@ExtendWith(value={MockitoExtension.class})
abstract class AbstractJobStreamsTest<T extends AbstractJobStreams> {
    protected static final String URL = "http://localhost:8180";
    private static final String SERIALIZED_MESSAGE = "SERIALIZED_MESSAGE";
    private static final String JOB_ID = "JOB_ID";
    private static final String CORRELATION_ID = "CORRELATION_ID";
    private static final JobStatus STATUS = JobStatus.SCHEDULED;
    private static final ZonedDateTime LAST_UPDATE = ZonedDateTime.parse("2022-08-03T18:00:15.001+01:00");
    private static final Integer RETRIES = 1;
    private static final Integer PRIORITY = 1;
    private static final Integer EXECUTION_COUNTER = 1;
    private static final String SCHEDULE_ID = "SCHEDULE_ID";
    private static final Recipient RECIPIENT = new Recipient.HTTPRecipient("http://recipient");
    private static final Trigger TRIGGER = new PointInTimeTrigger();
    private static final JobDetails.Type TYPE = JobDetails.Type.HTTP;
    @Captor
    ArgumentCaptor<Message<String>> messageCaptor;
    @Mock
    ObjectMapper objectMapper;
    @Captor
    ArgumentCaptor<Object> eventCaptor;
    @Mock
    Emitter<String> emitter;
    T jobStreams;

    AbstractJobStreamsTest() {
    }

    @BeforeEach
    void setUp() {
        this.jobStreams = (AbstractJobStreams)Mockito.spy(this.createJobStreams());
    }

    protected abstract T createJobStreams();

    @Test
    void jobStatusChangeWithAck() throws Exception {
        JobDetails job = this.mockJobDetails();
        ((ObjectMapper)Mockito.doReturn((Object)SERIALIZED_MESSAGE).when((Object)this.objectMapper)).writeValueAsString(ArgumentMatchers.any());
        Message<String> message = this.executeStatusChange(job);
        message.ack();
        ((AbstractJobStreams)Mockito.verify(this.jobStreams)).onAck(job);
    }

    @Test
    void jobStatusChangeWithNack() throws Exception {
        JobDetails job = this.mockJobDetails();
        ((ObjectMapper)Mockito.doReturn((Object)SERIALIZED_MESSAGE).when((Object)this.objectMapper)).writeValueAsString(ArgumentMatchers.any());
        Message<String> message = this.executeStatusChange(job);
        Exception error = new Exception("Nack error");
        message.nack((Throwable)error);
        ((AbstractJobStreams)Mockito.verify(this.jobStreams)).onNack((Throwable)error, job);
    }

    private Message<String> executeStatusChange(JobDetails job) throws Exception {
        this.jobStreams.jobStatusChange(job);
        ((ObjectMapper)Mockito.verify((Object)this.objectMapper)).writeValueAsString(this.eventCaptor.capture());
        Assertions.assertThat((Object)this.eventCaptor.getValue()).isInstanceOf(JobDataEvent.class);
        this.assertExpectedEvent((JobDataEvent)this.eventCaptor.getValue());
        ((Emitter)Mockito.verify(this.emitter)).send((Message)this.messageCaptor.capture());
        Message message = (Message)this.messageCaptor.getValue();
        Assertions.assertThat((Object)message).isNotNull();
        Assertions.assertThat((String)((String)message.getPayload())).isEqualTo(SERIALIZED_MESSAGE);
        this.assertExpectedMetadata((Message<String>)message);
        return message;
    }

    @Test
    void jobStatusChangeWithUnexpectedError() throws Exception {
        JobDetails job = this.mockJobDetails();
        this.executeStatusChangeWithUnexpectedError(job);
    }

    @Test
    void jobStatusChangeWithUnexpectedErrorAndContinue() throws Exception {
        JobDetails job = this.mockJobDetails();
        this.executeStatusChangeWithUnexpectedError(job);
        ((ObjectMapper)Mockito.doReturn((Object)SERIALIZED_MESSAGE).when((Object)this.objectMapper)).writeValueAsString(ArgumentMatchers.any());
        this.jobStreams.jobStatusChange(job);
        ((Emitter)Mockito.verify(this.emitter)).send((Message)this.messageCaptor.capture());
        Message message = (Message)this.messageCaptor.getValue();
        Assertions.assertThat((Object)message).isNotNull();
        Assertions.assertThat((String)((String)message.getPayload())).isEqualTo(SERIALIZED_MESSAGE);
        this.assertExpectedMetadata((Message<String>)message);
        message.ack();
        ((AbstractJobStreams)Mockito.verify(this.jobStreams)).onAck(job);
    }

    private void executeStatusChangeWithUnexpectedError(JobDetails job) throws Exception {
        ((ObjectMapper)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("Unexpected error")}).when((Object)this.objectMapper)).writeValueAsString(ArgumentMatchers.any());
        this.jobStreams.jobStatusChange(job);
        ((AbstractJobStreams)Mockito.verify(this.jobStreams, (VerificationMode)Mockito.never())).onAck((JobDetails)ArgumentMatchers.any());
        ((AbstractJobStreams)Mockito.verify(this.jobStreams, (VerificationMode)Mockito.never())).onNack((Throwable)ArgumentMatchers.any(), (JobDetails)ArgumentMatchers.any());
    }

    private JobDetails mockJobDetails() {
        return JobDetails.builder().id(JOB_ID).correlationId(CORRELATION_ID).status(STATUS).lastUpdate(LAST_UPDATE).retries(RETRIES).priority(PRIORITY).executionCounter(EXECUTION_COUNTER).scheduledId(SCHEDULE_ID).recipient(RECIPIENT).trigger(TRIGGER).type(TYPE).build();
    }

    private void assertExpectedEvent(JobDataEvent event) {
        Assertions.assertThat((String)event.getId()).isNotNull();
        Assertions.assertThat((String)event.getType()).isEqualTo("JobEvent");
        Assertions.assertThat((URI)event.getSource()).hasToString("http://localhost:8180/jobs");
        ScheduledJob data = (ScheduledJob)event.getData();
        Assertions.assertThat((Object)data).isNotNull();
        Assertions.assertThat((String)data.getId()).isEqualTo(JOB_ID);
        Assertions.assertThat((String)data.getScheduledId()).isEqualTo(SCHEDULE_ID);
        Assertions.assertThat((Comparable)data.getStatus()).isEqualTo((Object)STATUS);
        Assertions.assertThat((Integer)data.getRetries()).isEqualTo((Object)RETRIES);
        Assertions.assertThat((Integer)data.getExecutionCounter()).isEqualTo((Object)EXECUTION_COUNTER);
        Assertions.assertThat((ZonedDateTime)data.getLastUpdate()).isEqualTo((Object)LAST_UPDATE);
    }

    protected void assertExpectedMetadata(Message<String> message) {
        Assertions.assertThat((Iterable)message.getMetadata()).isEqualTo((Object)Metadata.empty());
    }
}

