package io.gitlab.klawru.scheduler.executor;

import io.gitlab.klawru.scheduler.ExecutionOperations;
import io.gitlab.klawru.scheduler.exception.AbstractSchedulerException;
import io.gitlab.klawru.scheduler.executor.execution.state.AbstractExecutionState;
import io.gitlab.klawru.scheduler.executor.execution.state.EnqueuedState;
import io.gitlab.klawru.scheduler.executor.execution.state.ExecutionStateName;
import io.gitlab.klawru.scheduler.stats.SchedulerMetricsRegistry;
import io.gitlab.klawru.scheduler.task.ExecutionContext;
import io.gitlab.klawru.scheduler.task.instance.TaskInstance;
import io.gitlab.klawru.scheduler.util.MapperUtil;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Stream;
import lombok.Generated;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.util.Pair;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

/* loaded from: input_file:io/gitlab/klawru/scheduler/executor/TaskExecutorService.class */
public class TaskExecutorService implements TaskExecutor {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(TaskExecutorService.class);
    private final TaskSchedulers schedulers;
    private final SchedulerMetricsRegistry registry;
    private final AtomicInteger currentlyInQueueOrProcessing = new AtomicInteger(0);
    private final ConcurrentHashMap<UUID, Pair<Execution<?>, ExecutionSubscriber>> currentlyProcessing = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/gitlab/klawru/scheduler/executor/TaskExecutorService$ExecutionSubscriber.class */
    public static class ExecutionSubscriber extends BaseSubscriber<Void> {
        private final UUID id;
        private final Consumer<UUID> hookFinally;

        public ExecutionSubscriber(UUID uuid, Consumer<UUID> consumer) {
            this.id = uuid;
            this.hookFinally = consumer;
        }

        protected void hookFinally(@NotNull SignalType signalType) {
            super.hookFinally(signalType);
            this.hookFinally.accept(this.id);
        }
    }

    @Override // io.gitlab.klawru.scheduler.executor.TaskExecutor
    public <T> void addToQueue(Execution<T> execution, ExecutionContext<T> executionContext, ExecutionOperations executionOperations) {
        TaskInstance<T> taskInstance = execution.getTaskInstance();
        log.debug("Add a task {} to the queue", taskInstance);
        Mono.defer(() -> {
            execution.processed();
            return ((Mono) taskInstance.getTask().execute(taskInstance, executionContext).as(MapperUtil::mapToOptional)).flatMap(optional -> {
                return catchComplete(execution, optional, executionOperations);
            });
        }).onErrorResume(th -> {
            return catchFailure(execution, executionOperations, th);
        }).subscribeOn(this.schedulers.getTaskScheduler()).subscribe(addCurrentlyProcessing(execution));
    }

    @Override // io.gitlab.klawru.scheduler.executor.TaskExecutor
    public void removeFromQueue(Execution<?> execution) {
        stopExecution(execution);
    }

    private void stopExecution(Execution<?> execution) {
        Optional<AbstractExecutionState> lastState = execution.getLastState(ExecutionStateName.ENQUEUED);
        if (!lastState.isPresent()) {
            log.info("Cannot stop execution={} not found state ENQUEUED", execution.getTaskInstance());
        } else {
            ((ExecutionSubscriber) this.currentlyProcessing.get(((EnqueuedState) lastState.get()).getEnqueuedId()).getSecond()).cancel();
        }
    }

    private <T> Mono<Void> catchFailure(Execution<T> execution, ExecutionOperations executionOperations, Throwable th) {
        return Mono.defer(() -> {
            log.error("Task '{}' failure", execution.getTaskInstance().getTaskNameId(), th);
            if (th instanceof AbstractSchedulerException) {
                return Mono.empty();
            }
            execution.failed(th);
            return execution.onFailure(executionOperations);
        });
    }

    private <T> Mono<Void> catchComplete(Execution<T> execution, Optional<T> optional, ExecutionOperations executionOperations) {
        return Mono.defer(() -> {
            log.debug("Task '{}' complete", execution.getTaskInstance().getTaskNameId());
            execution.complete();
            return execution.onComplete(optional, executionOperations);
        });
    }

    private <T> ExecutionSubscriber addCurrentlyProcessing(Execution<T> execution) {
        UUID randomUUID = UUID.randomUUID();
        ExecutionSubscriber executionSubscriber = new ExecutionSubscriber(randomUUID, this::removeCurrentlyProcessing);
        this.currentlyInQueueOrProcessing.incrementAndGet();
        this.currentlyProcessing.put(randomUUID, Pair.of(execution, executionSubscriber));
        execution.enqueued(randomUUID);
        return executionSubscriber;
    }

    private void removeCurrentlyProcessing(UUID uuid) {
        Pair<Execution<?>, ExecutionSubscriber> remove = this.currentlyProcessing.remove(uuid);
        if (remove == null) {
            log.warn("Released execution was not found in collection of executions currently being processed. Should never happen. Execution-id: " + uuid);
        } else {
            this.registry.afterExecution((Execution) remove.getFirst());
        }
        this.currentlyInQueueOrProcessing.decrementAndGet();
    }

    @Override // io.gitlab.klawru.scheduler.executor.TaskExecutor
    public void stop(Duration duration) {
        this.currentlyProcessing.values().stream().map((v0) -> {
            return v0.getFirst();
        }).forEach(this::stopExecution);
    }

    @Override // io.gitlab.klawru.scheduler.executor.TaskExecutor
    public int getNumberInQueueOrProcessing() {
        return this.currentlyInQueueOrProcessing.get();
    }

    @Override // io.gitlab.klawru.scheduler.executor.TaskExecutor
    public int getFreePlaceInQueue() {
        return taskUpperLimit() - getNumberInQueueOrProcessing();
    }

    @Override // io.gitlab.klawru.scheduler.executor.TaskExecutor
    public int taskUpperLimit() {
        return this.schedulers.getTaskUpperLimit();
    }

    @Override // io.gitlab.klawru.scheduler.executor.TaskExecutor
    public int taskLowerLimit() {
        return this.schedulers.getTaskLowerLimit();
    }

    @Override // io.gitlab.klawru.scheduler.executor.TaskExecutor
    public Stream<Execution<?>> currentlyExecuting() {
        return this.currentlyProcessing.values().stream().map((v0) -> {
            return v0.getFirst();
        });
    }

    @Generated
    public TaskExecutorService(TaskSchedulers taskSchedulers, SchedulerMetricsRegistry schedulerMetricsRegistry) {
        this.schedulers = taskSchedulers;
        this.registry = schedulerMetricsRegistry;
    }
}
