package io.gitlab.klawru.scheduler.service;

import io.gitlab.klawru.scheduler.DefaultExecutionOperations;
import io.gitlab.klawru.scheduler.DefaultSchedulerClient;
import io.gitlab.klawru.scheduler.StartPauseService;
import io.gitlab.klawru.scheduler.config.SchedulerConfiguration;
import io.gitlab.klawru.scheduler.executor.Execution;
import io.gitlab.klawru.scheduler.executor.TaskExecutor;
import io.gitlab.klawru.scheduler.executor.TaskSchedulers;
import io.gitlab.klawru.scheduler.repository.TaskService;
import io.gitlab.klawru.scheduler.task.DefaultExecutionContext;
import io.gitlab.klawru.scheduler.util.Trigger;
import java.util.Optional;
import lombok.Generated;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

/* loaded from: input_file:io/gitlab/klawru/scheduler/service/TaskFetchService.class */
public class TaskFetchService implements StartPauseService {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(TaskFetchService.class);
    private final DefaultSchedulerClient client;
    private final TaskExecutor executor;
    private final TaskService taskService;
    private final SchedulerConfiguration config;
    private final TaskSchedulers schedulers;
    Disposable taskFetchStream = startTaskFetch();
    private final Trigger triggerFetch = new Trigger();

    public TaskFetchService(DefaultSchedulerClient defaultSchedulerClient, TaskService taskService, TaskExecutor taskExecutor, TaskSchedulers taskSchedulers, SchedulerConfiguration schedulerConfiguration) {
        this.client = defaultSchedulerClient;
        this.executor = taskExecutor;
        this.taskService = taskService;
        this.config = schedulerConfiguration;
        this.schedulers = taskSchedulers;
    }

    @NotNull
    protected Disposable startTaskFetch() {
        log.debug("Start TaskFetchService on '{}'", this.config.getSchedulerName());
        return getFetchTaskFlux().subscribe(this::addToExecutor);
    }

    @NotNull
    private Flux<Execution<?>> getFetchTaskFlux() {
        return Flux.defer(() -> {
            return this.taskService.lockAndGetDue(this.executor.getFreePlaceInQueue());
        }).publishOn(this.schedulers.getHousekeeperScheduler()).doOnNext(execution -> {
            log.debug("task fetch '{}'", execution.getTaskInstance().getTaskNameId());
        }).doOnError(th -> {
            log.error("Exception on task fetch", th);
        }).repeatWhen(flux -> {
            return flux.delayUntil(l -> {
                return Flux.firstWithSignal(new Publisher[]{Mono.delay(this.config.getPollingInterval(), this.schedulers.getHousekeeperScheduler()), this.triggerFetch.getFlux()});
            });
        }).retryWhen(Retry.fixedDelay(Long.MAX_VALUE, this.config.getPollingInterval()).scheduler(this.schedulers.getHousekeeperScheduler())).doOnError(th2 -> {
            log.error("Unexpected exception on task fetch", th2);
        }).subscribeOn(this.schedulers.getHousekeeperScheduler());
    }

    protected <T> void addToExecutor(Execution<T> execution) {
        this.executor.addToQueue(execution, DefaultExecutionContext.of(execution, this.client), DefaultExecutionOperations.of(this.taskService, this::getTriggerFetchCallback));
    }

    protected void getTriggerFetchCallback() {
        if (this.executor.getNumberInQueueOrProcessing() <= this.executor.taskLowerLimit()) {
            fetchTask();
        }
    }

    public void fetchTask() {
        this.triggerFetch.emit();
    }

    @Override // io.gitlab.klawru.scheduler.StartPauseService
    public void pause() {
        Optional.ofNullable(this.taskFetchStream).filter(disposable -> {
            return !disposable.isDisposed();
        }).ifPresent((v0) -> {
            v0.dispose();
        });
        this.taskFetchStream = null;
    }

    @Override // io.gitlab.klawru.scheduler.StartPauseService
    public void start() {
        if (this.taskFetchStream == null) {
            startTaskFetch();
        }
    }
}
