package io.gitlab.klawru.scheduler.service;

import io.gitlab.klawru.scheduler.StartPauseService;
import io.gitlab.klawru.scheduler.config.SchedulerConfiguration;
import io.gitlab.klawru.scheduler.executor.TaskSchedulers;
import io.gitlab.klawru.scheduler.repository.TaskService;
import io.gitlab.klawru.scheduler.util.AlwaysDisposed;
import java.time.Duration;
import java.util.Optional;
import lombok.Generated;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.util.retry.Retry;

/* loaded from: input_file:io/gitlab/klawru/scheduler/service/DeadExecutionDetectService.class */
public class DeadExecutionDetectService implements StartPauseService {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(DeadExecutionDetectService.class);
    private final TaskService taskService;
    private final TaskSchedulers schedulers;
    private final SchedulerConfiguration config;
    private Disposable deadExecutionDetectDisposable = AlwaysDisposed.get();

    public DeadExecutionDetectService(TaskService taskService, TaskSchedulers taskSchedulers, SchedulerConfiguration schedulerConfiguration) {
        this.taskService = taskService;
        this.schedulers = taskSchedulers;
        this.config = schedulerConfiguration;
    }

    @Override // io.gitlab.klawru.scheduler.StartPauseService
    public void start() {
        if (this.deadExecutionDetectDisposable.isDisposed()) {
            log.debug("Start dead execution detect");
            this.deadExecutionDetectDisposable = getRescheduleDeadExecutionFlux().subscribe();
        }
    }

    @NotNull
    protected Flux<Long> getRescheduleDeadExecutionFlux() {
        Duration multipliedBy = this.config.getHeartbeatInterval().multipliedBy(4L);
        Duration multipliedBy2 = this.config.getPollingInterval().multipliedBy(2L);
        return this.taskService.rescheduleDeadExecutionTask(multipliedBy).doOnError(th -> {
            log.error("Exception on reschedule dead execution", th);
        }).repeatWhen(flux -> {
            return flux.delaySequence(multipliedBy2, this.schedulers.getHousekeeperScheduler());
        }).retryWhen(Retry.fixedDelay(Long.MAX_VALUE, multipliedBy2).scheduler(this.schedulers.getHousekeeperScheduler())).doOnError(th2 -> {
            log.error("Unexpected exception in 'deadExecutionDetect'. Restart subscription", th2);
        }).subscribeOn(this.schedulers.getHousekeeperScheduler());
    }

    @Override // io.gitlab.klawru.scheduler.StartPauseService
    public void pause() {
        Optional.ofNullable(this.deadExecutionDetectDisposable).filter(disposable -> {
            return !disposable.isDisposed();
        }).ifPresent(disposable2 -> {
            log.debug("Stop dead execution detect");
            disposable2.dispose();
        });
    }
}
