package io.nflow.engine.internal.dao;

import ch.qos.logback.core.CoreConstants;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.nflow.engine.config.NFlow;
import io.nflow.engine.internal.storage.db.SQLVariants;
import io.nflow.engine.workflow.executor.WorkflowExecutor;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.cxf.staxutils.PropertiesExpandingStreamReader;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCreator;
import org.springframework.jdbc.core.PreparedStatementSetter;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationManager;

@Singleton
@SuppressFBWarnings(value = {"SIC_INNER_SHOULD_BE_STATIC_ANON"}, justification = "common jdbctemplate practice")
@Component
/* loaded from: input_file:io/nflow/engine/internal/dao/ExecutorDao.class */
public class ExecutorDao {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ExecutorDao.class);
    private final JdbcTemplate jdbc;
    final SQLVariants sqlVariants;
    private final int keepaliveIntervalSeconds;
    final String executorGroup;
    private final String executorGroupCondition;
    final int timeoutSeconds;
    private final int hostMaxLength;
    private final AtomicReference<DateTime> nextUpdate = new AtomicReference<>(DateTime.now());
    private int executorId = -1;

    @Inject
    public ExecutorDao(SQLVariants sQLVariants, @NFlow JdbcTemplate jdbcTemplate, Environment environment) {
        this.sqlVariants = sQLVariants;
        this.jdbc = jdbcTemplate;
        this.executorGroup = StringUtils.trimToNull(environment.getRequiredProperty("nflow.executor.group"));
        this.executorGroupCondition = createWhereCondition(this.executorGroup);
        this.timeoutSeconds = ((Integer) environment.getRequiredProperty("nflow.executor.timeout.seconds", Integer.class)).intValue();
        this.keepaliveIntervalSeconds = ((Integer) environment.getRequiredProperty("nflow.executor.keepalive.seconds", Integer.class)).intValue();
        this.hostMaxLength = ((Integer) environment.getProperty("nflow.executor.host.length", Integer.class, -1)).intValue();
    }

    private static String createWhereCondition(String str) {
        return "executor_group = '" + str + "'";
    }

    public boolean tick() {
        if (this.nextUpdate.get().isAfterNow()) {
            return false;
        }
        this.nextUpdate.set(DateTime.now().plusSeconds(this.keepaliveIntervalSeconds));
        updateActiveTimestamp();
        return true;
    }

    public String getExecutorGroup() {
        return this.executorGroup;
    }

    public String getExecutorGroupCondition() {
        return this.executorGroupCondition;
    }

    public synchronized int getExecutorId() {
        if (this.executorId == -1) {
            int i = this.hostMaxLength;
            if (i == -1) {
                i = ((Integer) Optional.ofNullable((Integer) this.jdbc.query("select host from nflow_executor where 1 = 0", DaoUtil.firstColumnLengthExtractor)).orElseThrow(() -> {
                    return new IllegalStateException("Failed to read nflow_executor.host column length from database, please set correct value to nflow.executor.host.length");
                })).intValue();
            }
            this.executorId = allocateExecutorId(i);
        }
        return this.executorId;
    }

    public DateTime getMaxWaitUntil() {
        return this.nextUpdate.get();
    }

    @Transactional
    public boolean isTransactionSupportEnabled() {
        return TransactionSynchronizationManager.isActualTransactionActive();
    }

    @SuppressFBWarnings(value = {"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"}, justification = "npe is unlikely")
    public boolean isAutoCommitEnabled() {
        return ((Boolean) this.jdbc.execute((v0) -> {
            return v0.getAutoCommit();
        })).booleanValue();
    }

    @SuppressFBWarnings(value = {"MDM_INETADDRESS_GETLOCALHOST", "WEM_WEAK_EXCEPTION_MESSAGING", "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"}, justification = "localhost is used for getting host name only, exception message is fine, npe is unlikely")
    private int allocateExecutorId(int i) {
        try {
            final String left = StringUtils.left(InetAddress.getLocalHost().getCanonicalHostName(), i);
            final int parseInt = Integer.parseInt(ManagementFactory.getRuntimeMXBean().getName().split(PropertiesExpandingStreamReader.DELIMITER)[0]);
            logger.info("Joining executor group {}", this.executorGroup);
            GeneratedKeyHolder generatedKeyHolder = new GeneratedKeyHolder();
            this.jdbc.update(new PreparedStatementCreator() { // from class: io.nflow.engine.internal.dao.ExecutorDao.1
                @Override // org.springframework.jdbc.core.PreparedStatementCreator
                @SuppressFBWarnings(value = {"OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE", "SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING"}, justification = "spotbugs does not trust jdbctemplate, sql is constant in practice")
                public PreparedStatement createPreparedStatement(Connection connection) throws SQLException {
                    PreparedStatement prepareStatement = connection.prepareStatement("insert into nflow_executor(host, pid, executor_group, active, expires) values (?, ?, ?, current_timestamp, " + ExecutorDao.this.sqlVariants.currentTimePlusSeconds(ExecutorDao.this.timeoutSeconds) + ")", new String[]{"id"});
                    prepareStatement.setString(1, left);
                    prepareStatement.setInt(2, parseInt);
                    prepareStatement.setString(3, ExecutorDao.this.executorGroup);
                    return prepareStatement;
                }
            }, generatedKeyHolder);
            int intValue = generatedKeyHolder.getKey().intValue();
            logger.info("Joined executor group {} as executor {} running on host {} with process id {}.", this.executorGroup, Integer.valueOf(intValue), left, Integer.valueOf(parseInt));
            return intValue;
        } catch (NumberFormatException | UnknownHostException e) {
            throw new RuntimeException("Failed to obtain host name and pid of running jvm", e);
        }
    }

    public void updateActiveTimestamp() {
        updateWithPreparedStatement("update nflow_executor set active=current_timestamp, recovered=null, expires=" + this.sqlVariants.currentTimePlusSeconds(this.timeoutSeconds) + " where id = " + getExecutorId());
    }

    private void updateWithPreparedStatement(String str) {
        this.jdbc.update(str, (PreparedStatementSetter) null);
    }

    public List<WorkflowExecutor> getExecutors() {
        return this.jdbc.query("select id, host, pid, started, active, expires, stopped, recovered from nflow_executor where " + getExecutorGroupCondition() + " order by id asc", (resultSet, i) -> {
            return new WorkflowExecutor(resultSet.getInt("id"), resultSet.getString("host"), resultSet.getInt("pid"), this.executorGroup, this.sqlVariants.getDateTime(resultSet, "started"), this.sqlVariants.getDateTime(resultSet, "active"), this.sqlVariants.getDateTime(resultSet, "expires"), this.sqlVariants.getDateTime(resultSet, "stopped"), this.sqlVariants.getDateTime(resultSet, "recovered"));
        });
    }

    public void markShutdown(boolean z) {
        String str;
        if (z) {
            str = "set expires=current_timestamp, stopped=current_timestamp, recovered=current_timestamp";
        } else {
            try {
                str = "set expires=" + this.sqlVariants.currentTimePlusSeconds(this.timeoutSeconds) + ", stopped=current_timestamp, recovered=null";
            } catch (DataAccessException e) {
                logger.warn("Failed to mark executor as stopped", (Throwable) e);
                return;
            }
        }
        this.jdbc.update("update nflow_executor " + str + " where id = " + getExecutorId());
    }

    public Collection<Integer> getRecoverableExecutorIds() {
        return this.jdbc.query("select id from nflow_executor where " + getExecutorGroupCondition() + " and id <> " + getExecutorId() + " and recovered is null and " + this.sqlVariants.dateLtEqDiff("expires", "current_timestamp"), (resultSet, i) -> {
            return Integer.valueOf(resultSet.getInt(1));
        });
    }

    public void markRecovered(int i) {
        try {
            this.jdbc.update("update nflow_executor set recovered=current_timestamp where recovered is null and " + this.sqlVariants.dateLtEqDiff("expires", "current_timestamp") + " and " + getExecutorGroupCondition() + " and id = ?", Integer.valueOf(i));
        } catch (DataAccessException e) {
            logger.warn("Failed to mark executor {} as recovered", Integer.valueOf(i), e);
        }
    }

    @SuppressFBWarnings(value = {"OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE", "SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING"}, justification = "spotbugs does not trust jdbctemplate, sql is constant in practice")
    public int deleteExpiredBefore(DateTime dateTime) {
        try {
            return this.jdbc.update("delete from nflow_executor where recovered is not null and " + this.sqlVariants.dateLtEqDiff("expires", CoreConstants.NA) + " and " + getExecutorGroupCondition(), dateTime.toDate());
        } catch (DataAccessException e) {
            logger.warn("Failed to delete expired executors", (Throwable) e);
            return 0;
        }
    }
}
