/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.jobs.service.stream;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.kie.kogito.jobs.service.events.JobDataEvent;
import org.kie.kogito.jobs.service.model.job.JobDetails;
import org.kie.kogito.jobs.service.model.job.ScheduledJobAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractJobStreams {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJobStreams.class);
    protected ObjectMapper objectMapper;
    protected boolean enabled;
    protected Emitter<String> emitter;
    protected String url;

    protected AbstractJobStreams() {
    }

    protected AbstractJobStreams(ObjectMapper objectMapper, boolean enabled, Emitter<String> emitter, String url) {
        this.objectMapper = objectMapper;
        this.enabled = enabled;
        this.emitter = emitter;
        this.url = url;
    }

    protected void jobStatusChange(JobDetails job) {
        if (this.enabled) {
            try {
                JobDataEvent event = JobDataEvent.builder().source(this.url + "/jobs").data(ScheduledJobAdapter.of(job)).build();
                String json = this.objectMapper.writeValueAsString((Object)event);
                this.emitter.send(this.decorate((Message<String>)ContextAwareMessage.of((Object)json).withAck(() -> this.onAck(job)).withNack(reason -> this.onNack((Throwable)reason, job))));
            }
            catch (Exception e) {
                String msg = String.format("An unexpected error was produced while processing a Job status change for the job: %s", job);
                LOGGER.error(msg, (Throwable)e);
            }
        }
    }

    CompletionStage<Void> onAck(JobDetails job) {
        LOGGER.debug("Job Status change published: {}", (Object)job);
        return CompletableFuture.completedFuture(null);
    }

    CompletionStage<Void> onNack(Throwable reason, JobDetails job) {
        String msg = String.format("An error was produced while publishing a Job status change for the job: %s", job);
        LOGGER.error(msg, reason);
        return CompletableFuture.completedFuture(null);
    }

    protected Message<String> decorate(Message<String> message) {
        return message;
    }
}

