package io.gitlab.klawru.scheduler;

import io.gitlab.klawru.scheduler.config.SchedulerConfiguration;
import io.gitlab.klawru.scheduler.executor.Execution;
import io.gitlab.klawru.scheduler.executor.TaskExecutor;
import io.gitlab.klawru.scheduler.executor.TaskSchedulers;
import io.gitlab.klawru.scheduler.repository.TaskService;
import io.gitlab.klawru.scheduler.service.DeadExecutionDetectService;
import io.gitlab.klawru.scheduler.service.DeleteUnresolvedTaskService;
import io.gitlab.klawru.scheduler.service.TaskFetchService;
import io.gitlab.klawru.scheduler.service.UpdateHeartbeatService;
import io.gitlab.klawru.scheduler.stats.SchedulerClientStatus;
import io.gitlab.klawru.scheduler.stats.SchedulerMetricsRegistry;
import io.gitlab.klawru.scheduler.task.instance.TaskInstance;
import io.gitlab.klawru.scheduler.task.instance.TaskInstanceId;
import io.gitlab.klawru.scheduler.util.Clock;
import io.gitlab.klawru.scheduler.util.DataHolder;
import java.io.IOException;
import java.time.Instant;
import java.util.Objects;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/gitlab/klawru/scheduler/DefaultSchedulerClient.class */
public class DefaultSchedulerClient implements SchedulerClient, StartPauseService {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(DefaultSchedulerClient.class);
    private final TaskService taskService;
    private final TaskExecutor executor;
    private final TaskSchedulers schedulers;
    private final SchedulerConfiguration config;
    private final Clock clock;
    private final SchedulerMetricsRegistry schedulerMetricsRegistry;
    private final TaskFetchService taskFetchService;
    private final UpdateHeartbeatService updateHeartbeatService;
    private final DeadExecutionDetectService deadExecutionDetectService;
    private final DeleteUnresolvedTaskService deleteUnresolvedTaskService;
    private SchedulerClientStatus status = SchedulerClientStatus.PAUSED;

    public DefaultSchedulerClient(TaskService taskService, TaskExecutor taskExecutor, TaskSchedulers taskSchedulers, SchedulerMetricsRegistry schedulerMetricsRegistry, SchedulerConfiguration schedulerConfiguration, Clock clock) {
        this.taskService = taskService;
        this.executor = taskExecutor;
        this.schedulers = taskSchedulers;
        this.schedulerMetricsRegistry = schedulerMetricsRegistry;
        this.config = schedulerConfiguration;
        this.clock = clock;
        this.taskFetchService = new TaskFetchService(this, this.taskService, this.executor, this.schedulers, this.config);
        this.updateHeartbeatService = new UpdateHeartbeatService(this.taskService, this.executor, this.schedulers, this.config);
        this.deadExecutionDetectService = new DeadExecutionDetectService(this.taskService, this.schedulers, this.config);
        this.deleteUnresolvedTaskService = new DeleteUnresolvedTaskService(this.taskService, this.schedulers, this.config);
    }

    @Override // io.gitlab.klawru.scheduler.SchedulerClient
    public <T> Mono<Void> schedule(TaskInstance<T> taskInstance) {
        TaskService taskService = this.taskService;
        Objects.requireNonNull(taskInstance);
        return taskService.createIfNotExists(taskInstance, taskInstance::nextExecutionTime, DataHolder.of(taskInstance.getData()));
    }

    @Override // io.gitlab.klawru.scheduler.SchedulerClient
    public <T> Mono<Void> schedule(TaskInstance<T> taskInstance, Instant instant) {
        return this.taskService.createIfNotExists(taskInstance, instant2 -> {
            return instant;
        }, DataHolder.of(taskInstance.getData()));
    }

    @Override // io.gitlab.klawru.scheduler.SchedulerClient
    public <T> Mono<Void> schedule(TaskInstance<T> taskInstance, T t) {
        return this.taskService.createIfNotExists(taskInstance, taskInstance.getNextExecutionTime(), DataHolder.of(t));
    }

    @Override // io.gitlab.klawru.scheduler.SchedulerClient
    public <T> Mono<Void> schedule(TaskInstance<T> taskInstance, Instant instant, T t) {
        return this.taskService.createIfNotExists(taskInstance, instant2 -> {
            return instant;
        }, DataHolder.of(t));
    }

    @Override // io.gitlab.klawru.scheduler.SchedulerClient
    public <T> Mono<Void> reschedule(TaskInstance<T> taskInstance) {
        return this.taskService.reschedule(taskInstance, taskInstance.getNextExecutionTime(), DataHolder.empty());
    }

    @Override // io.gitlab.klawru.scheduler.SchedulerClient
    public <T> Mono<Void> reschedule(TaskInstance<T> taskInstance, Instant instant) {
        return this.taskService.reschedule(taskInstance, instant2 -> {
            return instant;
        }, DataHolder.empty());
    }

    @Override // io.gitlab.klawru.scheduler.SchedulerClient
    public <T> Mono<Void> reschedule(TaskInstance<T> taskInstance, Instant instant, T t) {
        return this.taskService.reschedule(taskInstance, instant2 -> {
            return instant;
        }, DataHolder.of(t));
    }

    @Override // io.gitlab.klawru.scheduler.SchedulerClient
    public <T> Mono<Void> reschedule(TaskInstance<T> taskInstance, T t) {
        return this.taskService.reschedule(taskInstance, taskInstance.getNextExecutionTime(), DataHolder.of(t));
    }

    @Override // io.gitlab.klawru.scheduler.SchedulerClient
    public Mono<Void> cancel(TaskInstanceId taskInstanceId) {
        return this.taskService.remove(taskInstanceId);
    }

    @Override // io.gitlab.klawru.scheduler.StartPauseService
    public void start() {
        log.info("Starting scheduler '{}'", this.config.getSchedulerName());
        if (this.status == SchedulerClientStatus.STOPPED) {
            throw new IllegalStateException("Unable to start a stopped Scheduler");
        }
        this.status = SchedulerClientStatus.RUNNING;
        startTask();
        this.taskFetchService.start();
        this.updateHeartbeatService.start();
        this.deadExecutionDetectService.start();
        this.deleteUnresolvedTaskService.start();
    }

    private void startTask() {
        TaskService taskService = this.taskService;
        Objects.requireNonNull(taskService);
        Flux.fromStream(taskService::scheduleOnStartUp).concatMapDelayError(scheduleOnStartup -> {
            return scheduleOnStartup.onStartup(this, this.clock);
        }, 1).subscribeOn(this.schedulers.getTaskScheduler()).then().block();
    }

    @Override // io.gitlab.klawru.scheduler.StartPauseService
    public void pause() {
        log.info("Stop scheduler '{}'", this.config.getSchedulerName());
        this.status = SchedulerClientStatus.PAUSED;
        this.taskFetchService.pause();
        this.updateHeartbeatService.pause();
        this.deadExecutionDetectService.pause();
        this.deleteUnresolvedTaskService.pause();
    }

    @Override // io.gitlab.klawru.scheduler.SchedulerClient
    public void fetchTask() {
        this.taskFetchService.fetchTask();
    }

    @Override // io.gitlab.klawru.scheduler.SchedulerClient
    public <T> Flux<Execution<T>> findExecutions(TaskExample<T> taskExample) {
        return this.taskService.findExecutions(taskExample);
    }

    @Override // io.gitlab.klawru.scheduler.SchedulerClient
    public <T> Mono<Long> countExecution(TaskExample<T> taskExample) {
        return this.taskService.countExecution(taskExample);
    }

    @Override // io.gitlab.klawru.scheduler.SchedulerClient
    public int getCountProcessingTask() {
        return this.executor.getNumberInQueueOrProcessing();
    }

    @Override // io.gitlab.klawru.scheduler.SchedulerClient
    public SchedulerClientStatus getCurrentStatus() {
        return this.status;
    }

    @Override // io.gitlab.klawru.scheduler.SchedulerClient
    public SchedulerConfiguration getConfig() {
        return this.config;
    }

    @Override // io.gitlab.klawru.scheduler.SchedulerClient
    public SchedulerMetricsRegistry getMetricsRegistry() {
        return this.schedulerMetricsRegistry;
    }

    @Override // io.gitlab.klawru.scheduler.SchedulerClient, java.lang.AutoCloseable
    public void close() {
        pause();
        this.status = SchedulerClientStatus.STOPPED;
        this.executor.stop(this.config.getShutdownMaxWait());
        try {
            this.taskService.close();
        } catch (IOException e) {
            log.error("Error on close task service", e);
        }
        this.schedulers.close();
    }

    @Generated
    public SchedulerMetricsRegistry getSchedulerMetricsRegistry() {
        return this.schedulerMetricsRegistry;
    }
}
