package io.gitlab.klawru.scheduler.repository.postgres;

import io.gitlab.klawru.scheduler.TaskExample;
import io.gitlab.klawru.scheduler.exception.RepositoryException;
import io.gitlab.klawru.scheduler.r2dbc.R2dbcClient;
import io.gitlab.klawru.scheduler.repository.ExecutionEntity;
import io.gitlab.klawru.scheduler.repository.TaskRepository;
import io.gitlab.klawru.scheduler.task.instance.TaskInstanceId;
import io.gitlab.klawru.scheduler.util.DataHolder;
import io.r2dbc.postgresql.codec.Json;
import java.time.Instant;
import java.util.Collection;
import java.util.Optional;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.IncorrectResultSizeDataAccessException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/gitlab/klawru/scheduler/repository/postgres/PostgresTaskRepository.class */
public class PostgresTaskRepository implements TaskRepository {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(PostgresTaskRepository.class);
    private final R2dbcClient r2dbcClient;
    private final String tableName;

    @Override // io.gitlab.klawru.scheduler.repository.TaskRepository
    public Mono<Void> createIfNotExists(TaskInstanceId taskInstanceId, Instant instant, DataHolder<byte[]> dataHolder) {
        return findExecution(taskInstanceId).map(executionEntity -> {
            return false;
        }).switchIfEmpty(create(taskInstanceId, instant, dataHolder)).flatMap(bool -> {
            return Boolean.TRUE.equals(bool) ? Mono.empty() : Mono.error(new RepositoryException("Failed to save the task to the database. Perhaps such a task has already been created", taskInstanceId));
        });
    }

    protected Mono<Boolean> create(TaskInstanceId taskInstanceId, Instant instant, DataHolder<byte[]> dataHolder) {
        return this.r2dbcClient.execute("INSERT INTO " + this.tableName + " (task_name, task_instance, " + (dataHolder.isPresent() ? "task_data, " : " ") + "execution_time, picked, version) VALUES (:taskName, :taskInstance, " + (dataHolder.isPresent() ? ":taskData, " : " ") + ":executionTime, :picked, :version)", bindTarget -> {
            bindTarget.bind("taskName", taskInstanceId.getTaskName()).bind("taskInstance", taskInstanceId.getId()).bind("executionTime", instant).bind("picked", (Object) false).bind("version", (Object) 1L);
            if (dataHolder.isPresent()) {
                bindTarget.bind("taskData", (String) Optional.ofNullable((byte[]) dataHolder.getData()).map(Json::of).orElse(null), (Class<String>) Json.class);
            }
            return bindTarget;
        }).map(num -> {
            if (num.intValue() != 1) {
                throw new RepositoryException("Expected one task to be created, but create " + num + ". Indicates a bug.", taskInstanceId);
            }
            return true;
        });
    }

    @Override // io.gitlab.klawru.scheduler.repository.TaskRepository
    public Flux<ExecutionEntity> lockAndGetDue(String str, Instant instant, int i, Collection<String> collection) {
        return this.r2dbcClient.queryMany("UPDATE " + this.tableName + " ut SET picked = true, picked_by = :pickedBy, last_heartbeat = :lastHeartbeat, version = version + 1  WHERE (ut.task_name, ut.task_instance) IN (   SELECT st.task_name, st.task_instance FROM " + this.tableName + " st    WHERE picked = false and execution_time <= :execution_time AND task_name <> all ( :task_name )   ORDER BY execution_time FOR UPDATE SKIP LOCKED LIMIT :limit )  RETURNING ut.* ", bindTarget -> {
            bindTarget.bind("pickedBy", str).bind("lastHeartbeat", instant);
            bindTarget.bind("execution_time", instant).bind("task_name", collection.toArray(i2 -> {
                return new String[i2];
            })).bind("limit", Integer.valueOf(i));
            return bindTarget;
        }, ExecutionResultSetMapper.EXECUTION_MAPPER);
    }

    @Override // io.gitlab.klawru.scheduler.repository.TaskRepository
    public Mono<Void> remove(TaskInstanceId taskInstanceId) {
        return this.r2dbcClient.execute("DELETE FROM " + this.tableName + " WHERE task_name = :taskName AND task_instance = :taskInstance ", bindTarget -> {
            return bindTarget.bind("taskName", taskInstanceId.getTaskName()).bind("taskInstance", taskInstanceId.getId());
        }).map(num -> {
            if (num.intValue() != 1) {
                throw new RepositoryException("Expected one execution to be removed, but removed " + num + ". Indicates a bug.", taskInstanceId);
            }
            return true;
        }).then();
    }

    @Override // io.gitlab.klawru.scheduler.repository.TaskRepository
    public Mono<Integer> removeAllExecutions(String str) {
        return this.r2dbcClient.execute("DELETE FROM " + this.tableName + " WHERE task_name = :taskName", bindTarget -> {
            return bindTarget.bind("taskName", str);
        }).doOnNext(num -> {
            log.trace("removed by removeAllByName count={}", num);
        });
    }

    @Override // io.gitlab.klawru.scheduler.repository.TaskRepository
    public Mono<Integer> updateHeartbeat(TaskInstanceId taskInstanceId, long j, Instant instant) {
        return this.r2dbcClient.execute("UPDATE " + this.tableName + " SET last_heartbeat = :lastHeartbeat WHERE  task_name = :taskName AND task_instance = :taskInstance AND version = :version ", bindTarget -> {
            return bindTarget.bind("lastHeartbeat", instant).bind("taskName", taskInstanceId.getTaskName()).bind("taskInstance", taskInstanceId.getId()).bind("version", Long.valueOf(j));
        });
    }

    @Override // io.gitlab.klawru.scheduler.repository.TaskRepository
    public Mono<ExecutionEntity> findExecution(TaskInstanceId taskInstanceId) {
        String taskName = taskInstanceId.getTaskName();
        String id = taskInstanceId.getId();
        return this.r2dbcClient.query("SELECT * FROM " + this.tableName + " WHERE task_name = :taskName AND task_instance = :taskInstance", bindTarget -> {
            return bindTarget.bind("taskName", taskName).bind("taskInstance", id);
        }, ExecutionResultSetMapper.EXECUTION_MAPPER).onErrorMap(IncorrectResultSizeDataAccessException.class, incorrectResultSizeDataAccessException -> {
            return new RepositoryException("More than one task found that matches the combination of taskName='" + taskName + "' and taskId='" + id + "'", taskName, id);
        });
    }

    @Override // io.gitlab.klawru.scheduler.repository.TaskRepository
    public <T> Flux<ExecutionEntity> findExecutions(TaskExample<T> taskExample) {
        return this.r2dbcClient.queryMany("SELECT * FROM " + this.tableName + " WHERE (:taskName IS NULL OR task_name = :taskName) AND (:taskInstance IS NULL OR  task_instance = :taskInstance) AND (:picked IS NULL OR picked = :picked)", bindTarget -> {
            return bindTarget.bind("taskName", taskExample.getName(), (Class<String>) String.class).bind("taskInstance", taskExample.getId(), (Class<String>) String.class).bind("picked", (String) taskExample.getPicked(), (Class<String>) Boolean.class);
        }, ExecutionResultSetMapper.EXECUTION_MAPPER);
    }

    @Override // io.gitlab.klawru.scheduler.repository.TaskRepository
    public <T> Mono<Long> countExecutions(TaskExample<T> taskExample) {
        return this.r2dbcClient.query("SELECT count(*) FROM " + this.tableName + " WHERE (:taskName IS NULL OR task_name = :taskName) AND (:taskInstance IS NULL OR  task_instance = :taskInstance) AND (:picked IS NULL OR picked = :picked)", bindTarget -> {
            return bindTarget.bind("taskName", taskExample.getName(), (Class<String>) String.class).bind("taskInstance", taskExample.getId(), (Class<String>) String.class).bind("picked", (String) taskExample.getPicked(), (Class<String>) Boolean.class);
        }, row -> {
            return (Long) row.get(0, Long.class);
        });
    }

    @Override // io.gitlab.klawru.scheduler.repository.TaskRepository
    public Mono<Integer> removeOldUnresolvedTask(Collection<String> collection, Instant instant) {
        return this.r2dbcClient.execute("DELETE FROM " + this.tableName + " WHERE execution_time <= :before AND task_name = any( :taskName )", bindTarget -> {
            return bindTarget.bind("before", instant).bind("taskName", collection.toArray(i -> {
                return new String[i];
            }));
        });
    }

    @Override // io.gitlab.klawru.scheduler.repository.TaskRepository
    public Flux<ExecutionEntity> getDeadExecution(Instant instant) {
        return this.r2dbcClient.queryMany("SELECT * FROM " + this.tableName + " WHERE last_heartbeat < :lastHeartbeat", bindTarget -> {
            return bindTarget.bind("lastHeartbeat", instant);
        }, ExecutionResultSetMapper.EXECUTION_MAPPER);
    }

    @Override // io.gitlab.klawru.scheduler.repository.TaskRepository
    public Mono<Void> reschedule(TaskInstanceId taskInstanceId, long j, Instant instant, DataHolder<byte[]> dataHolder, Instant instant2, Instant instant3, int i) {
        return this.r2dbcClient.execute("UPDATE " + this.tableName + " SET picked = :picked, picked_by = :picked_by, last_heartbeat = :lastHeartbeat, last_success = :lastSuccess, last_failure = :lastFailure, consecutive_failures = :consecutiveFailures, execution_time = :executionTime, " + (dataHolder.isPresent() ? "task_data = :taskData, " : "") + "version = version + 1 WHERE task_name = :taskName AND task_instance = :taskInstance AND version = :version", bindTarget -> {
            bindTarget.bind("picked", (Object) false).bind("picked_by", (String) null, (Class<String>) String.class).bind("lastHeartbeat", (String) null, (Class<String>) Instant.class).bind("lastSuccess", (String) instant2, (Class<String>) Instant.class).bind("lastFailure", (String) instant3, (Class<String>) Instant.class).bind("consecutiveFailures", Integer.valueOf(i)).bind("executionTime", instant).bind("taskName", taskInstanceId.getTaskName()).bind("taskInstance", taskInstanceId.getId()).bind("version", Long.valueOf(j));
            if (dataHolder.isPresent()) {
                bindTarget.bind("taskData", (String) Optional.ofNullable((byte[]) dataHolder.getData()).map(Json::of).orElse(null), (Class<String>) Json.class);
            }
            return bindTarget;
        }).flatMap(num -> {
            return num.intValue() != 1 ? Mono.error(() -> {
                return new RepositoryException("Expected one execution to be updated, but updated " + num + ". Indicates a bug.", taskInstanceId);
            }) : Mono.empty();
        });
    }

    @Generated
    public PostgresTaskRepository(R2dbcClient r2dbcClient, String str) {
        this.r2dbcClient = r2dbcClient;
        this.tableName = str;
    }
}
