/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.jobs.service.messaging;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudevents.CloudEvent;
import io.smallrye.mutiny.Uni;
import java.util.concurrent.CompletionStage;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.kie.kogito.jobs.api.Job;
import org.kie.kogito.jobs.api.event.CancelJobRequestEvent;
import org.kie.kogito.jobs.api.event.CreateProcessInstanceJobRequestEvent;
import org.kie.kogito.jobs.api.event.JobCloudEvent;
import org.kie.kogito.jobs.api.event.serialization.JobCloudEventDeserializer;
import org.kie.kogito.jobs.service.exception.JobServiceException;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.model.ScheduledJob;
import org.kie.kogito.jobs.service.model.job.ScheduledJobAdapter;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.scheduler.impl.TimerDelegateJobScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ReactiveMessagingEventConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveMessagingEventConsumer.class);
    protected TimerDelegateJobScheduler scheduler;
    protected ReactiveJobRepository jobRepository;
    protected ObjectMapper objectMapper;
    protected JobCloudEventDeserializer deserializer;

    protected ReactiveMessagingEventConsumer() {
        this.scheduler = null;
        this.jobRepository = null;
        this.objectMapper = null;
        this.deserializer = null;
    }

    protected ReactiveMessagingEventConsumer(TimerDelegateJobScheduler scheduler, ReactiveJobRepository jobRepository, ObjectMapper objectMapper) {
        this.scheduler = scheduler;
        this.jobRepository = jobRepository;
        this.objectMapper = objectMapper;
        this.deserializer = new JobCloudEventDeserializer(objectMapper);
    }

    protected Uni<Void> onKogitoServiceRequest(Message<CloudEvent> message) {
        JobCloudEvent jobCloudEvent = this.deserializer.deserialize((CloudEvent)message.getPayload());
        switch (jobCloudEvent.getType()) {
            case "CreateProcessInstanceJobRequest": {
                return this.handleEvent(message, (CreateProcessInstanceJobRequestEvent)jobCloudEvent);
            }
            case "CancelJobRequest": {
                return this.handleEvent(message, (CancelJobRequestEvent)jobCloudEvent);
            }
        }
        LOGGER.error("Unexpected job request type: {}, for the cloud event: {}", (Object)jobCloudEvent.getType(), (Object)jobCloudEvent);
        return Uni.createFrom().completionStage(message.nack((Throwable)new JobServiceException("Unexpected job request type: " + jobCloudEvent.getType())));
    }

    protected Uni<Void> handleEvent(Message<?> message, CreateProcessInstanceJobRequestEvent event) {
        return Uni.createFrom().completionStage(this.jobRepository.get(((Job)event.getData()).getId())).flatMap(existingJob -> {
            if (existingJob == null || existingJob.getStatus() == JobStatus.SCHEDULED) {
                return Uni.createFrom().publisher(this.scheduler.schedule(ScheduledJobAdapter.to(ScheduledJob.builder().job((Job)event.getData()).build())));
            }
            LOGGER.info("A Job in status: {} already exists for the job id: {}, no processing will be done fot the event: {}.", new Object[]{existingJob.getStatus(), existingJob.getId(), event});
            return Uni.createFrom().item(existingJob);
        }).onItem().transformToUni(createdJob -> {
            if (createdJob == null) {
                return Uni.createFrom().failure((Throwable)new JobServiceException("An internal scheduler error was produced during Job scheduling"));
            }
            return Uni.createFrom().completionStage(message.ack());
        }).onFailure().recoverWithUni(throwable -> {
            String msg = String.format("An error was produced during Job scheduling for the event: %s", event);
            LOGGER.error(msg, throwable);
            return Uni.createFrom().completionStage(message.nack((Throwable)new JobServiceException("An error was produced during Job scheduling: " + throwable.getMessage(), (Throwable)throwable)));
        });
    }

    protected Uni<Void> handleEvent(Message<?> message, CancelJobRequestEvent event) {
        return Uni.createFrom().completionStage((CompletionStage)this.scheduler.cancel(((CancelJobRequestEvent.JobId)event.getData()).getId())).onItemOrFailure().transformToUni((cancelledJob, throwable) -> {
            if (throwable != null) {
                String msg = String.format("An error was produced during Job cancelling for the event: %s", event);
                LOGGER.error(msg, throwable);
                return Uni.createFrom().completionStage(message.nack((Throwable)new JobServiceException("An error was produced during Job cancelling: " + throwable.getMessage(), (Throwable)throwable)));
            }
            if (cancelledJob == null) {
                LOGGER.info("No Job exists for the job id: {} or it was already cancelled", (Object)((CancelJobRequestEvent.JobId)event.getData()).getId());
            }
            return Uni.createFrom().completionStage(message.ack());
        });
    }
}

