package com.github.labai.ted.sys;

import com.github.labai.ted.Ted;
import com.github.labai.ted.sys.Model;
import com.github.labai.ted.sys.Registry;
import com.github.labai.ted.sys.TedDriverImpl;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/github/labai/ted/sys/TaskManager.class */
public class TaskManager {
    private static final Logger logger = LoggerFactory.getLogger(TaskManager.class);
    private static final Logger taskExceptionLogger = LoggerFactory.getLogger("ted-task");
    static final int SLOW_START_COUNT = 3;
    static final int MAX_TASK_COUNT = 1000;
    private static final int LIMIT_TOTAL_WAIT_TASKS = 20000;
    private static final long RARE_MAINT_INTERVAL_MILIS = 10800000;
    private static final long UNKNOWN_TASK_POSTPONE_MS = 120000;
    private static final long UNKNOWN_TASK_CANCEL_AFTER_MS = 86400000;
    private final TedDriverImpl.TedContext context;
    private Map<String, ChannelWorkContext> channelContextMap = new HashMap();
    private long lastRareMaintExecTimeMilis = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/labai/ted/sys/TaskManager$ChannelWorkContext.class */
    public class ChannelWorkContext {
        final String channelName;
        int nextSlowLimit = TaskManager.SLOW_START_COUNT;
        int lastGotCount = 0;
        int nextPortion = 0;
        boolean foundTask = false;

        ChannelWorkContext(String str) {
            this.channelName = str;
            dropNextSlowLimit();
        }

        void dropNextSlowLimit() {
            this.nextSlowLimit = TaskManager.this.context.registry.getChannel(this.channelName).getSlowStartCount();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/github/labai/ted/sys/TaskManager$TedRunnable.class */
    public static abstract class TedRunnable implements Runnable {
        private final Model.TaskRec task;
        private final List<Model.TaskRec> tasks;

        public TedRunnable(Model.TaskRec taskRec) {
            this.task = taskRec;
            this.tasks = null;
        }

        public TedRunnable(List<Model.TaskRec> list) {
            this.task = null;
            this.tasks = new ArrayList(list);
        }

        public List<Model.TaskRec> getTasks() {
            return this.tasks != null ? this.tasks : Collections.singletonList(this.task);
        }

        public int getTaskCount() {
            if (this.tasks != null) {
                return this.tasks.size();
            }
            return 1;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TaskManager(TedDriverImpl.TedContext tedContext) {
        this.context = tedContext;
    }

    public void changeTaskStatusPostponed(long j, Ted.TedStatus tedStatus, String str, Date date) {
        this.context.tedDao.setStatusPostponed(j, tedStatus, str, date);
    }

    public void changeTaskStatus(long j, Ted.TedStatus tedStatus, String str) {
        changeTaskStatusPostponed(j, tedStatus, str, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processMaintenanceTasks() {
        this.context.tedDao.processMaintenanceFrequent();
        processTimeouts();
        if (System.currentTimeMillis() - this.lastRareMaintExecTimeMilis > RARE_MAINT_INTERVAL_MILIS) {
            logger.debug("Start process rare maintenance tasks");
            this.context.tedDao.processMaintenanceRare(this.context.config.oldTaskArchiveDays());
            this.lastRareMaintExecTimeMilis = System.currentTimeMillis();
        }
    }

    private void processTimeouts() {
        List<Model.TaskRec> workingTooLong = this.context.tedDao.getWorkingTooLong();
        long currentTimeMillis = System.currentTimeMillis();
        for (Model.TaskRec taskRec : workingTooLong) {
            long time = ((currentTimeMillis - taskRec.startTs.getTime()) / 1000) / 60;
            Registry.TaskConfig taskConfig = this.context.registry.getTaskConfig(taskRec.name);
            if (taskConfig == null) {
                logger.error("Unknown task " + taskRec);
            } else if (taskConfig.workTimeoutMinutes <= time) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Work timeout for task_id=" + taskRec.taskId + " name=" + taskRec.name + " startTs=" + taskRec.startTs + " now=" + dateToStrTs(currentTimeMillis) + " ttl-minutes=" + taskConfig.workTimeoutMinutes);
                }
                changeTaskStatusPostponed(taskRec.taskId.longValue(), Ted.TedStatus.RETRY, "Too long in status [work](3)", new Date());
            } else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Set finishTs for task_id=" + taskRec.taskId + " name=" + taskRec.name + " startTs=" + taskRec.startTs + " now=" + dateToStrTs(currentTimeMillis) + " ttl-minutes=" + taskConfig.workTimeoutMinutes);
                }
                this.context.tedDao.setTaskPlannedWorkTimeout(taskRec.taskId.longValue(), new Date(taskRec.startTs.getTime() + (taskConfig.workTimeoutMinutes * 60 * MAX_TASK_COUNT)));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processTasks() {
        int calcWaitingTaskCountInAllChannels = calcWaitingTaskCountInAllChannels();
        if (calcWaitingTaskCountInAllChannels >= LIMIT_TOTAL_WAIT_TASKS) {
            logger.warn("Total size of waiting tasks ({}) already exceeded limit ({}), skip this iteration", Integer.valueOf(calcWaitingTaskCountInAllChannels), Integer.valueOf(LIMIT_TOTAL_WAIT_TASKS));
            return;
        }
        List<String> waitChannels = this.context.tedDao.getWaitChannels();
        if (waitChannels.isEmpty()) {
            logger.trace("no wait tasks");
            Iterator<ChannelWorkContext> it = this.channelContextMap.values().iterator();
            while (it.hasNext()) {
                it.next().dropNextSlowLimit();
            }
            return;
        }
        for (String str : waitChannels) {
            if (this.context.registry.getChannel(str) == null) {
                logger.warn("Channel '" + str + "' is not configured, but exists a waiting task with that channel");
            }
        }
        for (Registry.Channel channel : this.context.registry.getChannels()) {
            ChannelWorkContext channelWorkContext = this.channelContextMap.get(channel.name);
            if (channelWorkContext == null) {
                channelWorkContext = new ChannelWorkContext(channel.name);
                this.channelContextMap.put(channel.name, channelWorkContext);
            }
            channelWorkContext.foundTask = waitChannels.contains(channel.name);
            if (channelWorkContext.foundTask) {
                int maximumPoolSize = channel.workers.getMaximumPoolSize();
                int queueRemainingCapacity = channel.getQueueRemainingCapacity();
                int max = Math.max(Math.min((maximumPoolSize - channel.workers.getActiveCount()) + queueRemainingCapacity, MAX_TASK_COUNT), 0);
                logger.debug(channel.name + " max_count=" + max + " (workerCount=" + maximumPoolSize + " activeCount=" + channel.workers.getActiveCount() + " remainingCapacity=" + queueRemainingCapacity + " maxQueue=" + channel.taskBufferSize + ")");
                if (max == 0) {
                    logger.debug("Channel " + channel.name + " queue is full");
                }
                channelWorkContext.nextPortion = Math.min(max, channelWorkContext.nextSlowLimit);
            }
        }
        HashMap hashMap = new HashMap();
        for (ChannelWorkContext channelWorkContext2 : this.channelContextMap.values()) {
            if (channelWorkContext2.nextPortion > 0 && channelWorkContext2.foundTask) {
                hashMap.put(channelWorkContext2.channelName, Integer.valueOf(channelWorkContext2.nextPortion));
            }
        }
        List<Model.TaskRec> reserveTaskPortion = this.context.tedDao.reserveTaskPortion(hashMap);
        Iterator<ChannelWorkContext> it2 = this.channelContextMap.values().iterator();
        while (it2.hasNext()) {
            it2.next().lastGotCount = 0;
        }
        for (Model.TaskRec taskRec : reserveTaskPortion) {
            ChannelWorkContext channelWorkContext3 = this.channelContextMap.get(taskRec.channel);
            if (channelWorkContext3 == null) {
                channelWorkContext3 = new ChannelWorkContext(taskRec.channel);
                this.channelContextMap.put(taskRec.channel, channelWorkContext3);
            }
            channelWorkContext3.lastGotCount++;
        }
        for (ChannelWorkContext channelWorkContext4 : this.channelContextMap.values()) {
            if (channelWorkContext4.foundTask) {
                channelWorkContext4.nextSlowLimit = Math.min(channelWorkContext4.nextSlowLimit * 2, MAX_TASK_COUNT);
                logger.debug("Channel " + channelWorkContext4.channelName + " nextSlowLimit=" + channelWorkContext4.nextSlowLimit);
            } else {
                channelWorkContext4.dropNextSlowLimit();
            }
        }
        if (reserveTaskPortion.isEmpty()) {
            logger.debug("no tasks (full check)");
        } else {
            sendTaskListToChannels(reserveTaskPortion);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendTaskListToChannels(List<Model.TaskRec> list) {
        HashMap hashMap = new HashMap();
        for (Model.TaskRec taskRec : list) {
            List list2 = (List) hashMap.get(taskRec.name);
            if (list2 == null) {
                list2 = new ArrayList();
                hashMap.put(taskRec.name, list2);
            }
            list2.add(taskRec);
        }
        Iterator it = hashMap.keySet().iterator();
        while (it.hasNext()) {
            final List<Model.TaskRec> list3 = (List) hashMap.get((String) it.next());
            Model.TaskRec taskRec2 = list3.get(0);
            Registry.Channel channel = this.context.registry.getChannel(taskRec2.channel);
            if (channel == null) {
                logger.warn("Task channel '" + taskRec2.channel + "' not exists. Use channel MAIN (task={} taskId={})", taskRec2.name, taskRec2.taskId);
                channel = this.context.registry.getChannel("MAIN");
            }
            Registry.TaskConfig taskConfig = this.context.registry.getTaskConfig(taskRec2.name);
            if (taskConfig == null) {
                handleUnknownTasks(list3);
            } else if (taskConfig.isPackProcessing) {
                logger.debug("got tasks (task={} count={}) for pack processing", taskRec2.name, Integer.valueOf(list3.size()));
                channel.workers.execute(new TedRunnable(list3) { // from class: com.github.labai.ted.sys.TaskManager.1
                    @Override // java.lang.Runnable
                    public void run() {
                        TaskManager.this.processTask(list3);
                    }
                });
            } else {
                for (final Model.TaskRec taskRec3 : list3) {
                    logger.debug("got task: " + taskRec3);
                    channel.workers.execute(new TedRunnable(taskRec3) { // from class: com.github.labai.ted.sys.TaskManager.2
                        @Override // java.lang.Runnable
                        public void run() {
                            TaskManager.this.processTask(Collections.singletonList(taskRec3));
                        }
                    });
                }
            }
        }
    }

    private void handleUnknownTasks(List<Model.TaskRec> list) {
        long currentTimeMillis = System.currentTimeMillis();
        for (Model.TaskRec taskRec : list) {
            if (taskRec.createTs.getTime() < currentTimeMillis - UNKNOWN_TASK_CANCEL_AFTER_MS) {
                logger.warn("Task is unknown and was not processed during 24 hours, mark as error: {}", taskRec);
                changeTaskStatus(taskRec.taskId.longValue(), Ted.TedStatus.ERROR, "unknown task");
            } else {
                logger.warn("Task is unknown, mark as new, postpone: {}", taskRec);
                changeTaskStatusPostponed(taskRec.taskId.longValue(), Ted.TedStatus.NEW, "unknown task. postpone", new Date(currentTimeMillis + UNKNOWN_TASK_POSTPONE_MS));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public void processTask(List<Model.TaskRec> list) {
        Map hashMap;
        if (list == null || list.isEmpty()) {
            throw new IllegalStateException("taskRecList is empty");
        }
        Model.TaskRec taskRec = list.get(0);
        TedDao tedDao = this.context.tedDao;
        String name = Thread.currentThread().getName();
        try {
            try {
                Registry.TaskConfig taskConfig = this.context.registry.getTaskConfig(taskRec.name);
                if (taskConfig.taskType == Registry.TaskType.BATCH) {
                    boolean checkIsBatchFinished = tedDao.checkIsBatchFinished(taskRec.taskId.longValue());
                    String str = "waiting for finish... [B" + taskRec.taskId + "]";
                    if (!checkIsBatchFinished) {
                        if (((System.currentTimeMillis() - taskRec.createTs.getTime()) / 1000) / 60 >= taskConfig.batchTimeoutMinutes) {
                            logger.warn("Batch timeout for task_id=" + taskRec.taskId + " name=" + taskRec.name + " createTs=" + taskRec.createTs + " now=" + dateToStrTs(System.currentTimeMillis()) + " ttl-minutes=" + taskConfig.batchTimeoutMinutes);
                            changeTaskStatus(taskRec.taskId.longValue(), Ted.TedStatus.ERROR, "Batch processing too long");
                            Thread.currentThread().setName(name);
                            return;
                        } else {
                            Date nextRetryTime = ConfigUtils.BATCH_RETRY_SCHEDULER.getNextRetryTime(taskRec.getTedTask(), taskRec.retries.intValue() + 1, taskRec.startTs);
                            if (nextRetryTime == null) {
                                nextRetryTime = new Date(System.currentTimeMillis() + 60000);
                            }
                            tedDao.setStatusPostponed(taskRec.taskId.longValue(), Ted.TedStatus.RETRY, str, nextRetryTime);
                            Thread.currentThread().setName(name);
                            return;
                        }
                    }
                    if (str.equals(taskRec.msg)) {
                        tedDao.cleanupRetries(taskRec.taskId, "");
                        taskRec.retries = 0;
                        taskRec.msg = "";
                    }
                }
                Thread.currentThread().setName(name + "-" + taskConfig.shortLogName + "-" + taskRec.taskId);
                if (taskConfig.isPackProcessing) {
                    Ted.TedPackProcessor packProcessor = taskConfig.tedPackProcessorFactory.getPackProcessor(taskRec.name);
                    ArrayList arrayList = new ArrayList();
                    Iterator<Model.TaskRec> it = list.iterator();
                    while (it.hasNext()) {
                        arrayList.add(it.next().getTedTask());
                    }
                    hashMap = packProcessor.process(arrayList);
                    if (hashMap == null) {
                        hashMap = Collections.emptyMap();
                    }
                } else {
                    if (list.size() != 1) {
                        throw new IllegalStateException("taskRecList size must by 1");
                    }
                    Ted.TedResult process = taskConfig.tedProcessorFactory.getProcessor(taskRec.name).process(taskRec.getTedTask());
                    hashMap = new HashMap();
                    hashMap.put(taskRec.taskId, process);
                }
                for (Model.TaskRec taskRec2 : list) {
                    Ted.TedResult tedResult = (Ted.TedResult) hashMap.get(taskRec2.taskId);
                    if (tedResult == null) {
                        changeTaskStatus(taskRec2.taskId.longValue(), Ted.TedStatus.ERROR, "result is null");
                    } else if (tedResult.status == Ted.TedStatus.RETRY) {
                        Date nextRetryTime2 = taskConfig.retryScheduler.getNextRetryTime(taskRec2.getTedTask(), taskRec2.retries.intValue() + 1, taskRec2.startTs);
                        if (nextRetryTime2 == null) {
                            changeTaskStatus(taskRec2.taskId.longValue(), Ted.TedStatus.ERROR, "max retries. " + tedResult.message);
                        } else {
                            tedDao.setStatusPostponed(taskRec2.taskId.longValue(), tedResult.status, tedResult.message, nextRetryTime2);
                        }
                    } else if (tedResult.status == Ted.TedStatus.DONE || tedResult.status == Ted.TedStatus.ERROR) {
                        changeTaskStatus(taskRec2.taskId.longValue(), tedResult.status, tedResult.message);
                    } else {
                        changeTaskStatus(taskRec2.taskId.longValue(), Ted.TedStatus.ERROR, "invalid result status: " + tedResult.status);
                    }
                }
                Thread.currentThread().setName(name);
            } catch (Exception e) {
                logger.info("Unhandled exception while calling processor for task '{}': {}", taskRec.name, e.getMessage());
                taskExceptionLogger.error("Unhandled exception while calling processor for task '" + taskRec.name + "'", e);
                try {
                    Iterator<Model.TaskRec> it2 = list.iterator();
                    while (it2.hasNext()) {
                        changeTaskStatus(it2.next().taskId.longValue(), Ted.TedStatus.ERROR, "Catch: " + e.getMessage());
                    }
                } catch (Exception e2) {
                    logger.warn("Unhandled exception while handling exception for task '{}', statuses will be not changed: {}", taskRec.name, e2.getMessage());
                }
                Thread.currentThread().setName(name);
            }
        } catch (Throwable th) {
            Thread.currentThread().setName(name);
            throw th;
        }
    }

    private int calcWaitingTaskCountInAllChannels() {
        int i = 0;
        Iterator<Registry.Channel> it = this.context.registry.getChannels().iterator();
        while (it.hasNext()) {
            try {
                Iterator<Runnable> it2 = it.next().workers.getQueue().iterator();
                while (it2.hasNext()) {
                    i += ((TedRunnable) it2.next()).getTaskCount();
                }
            } catch (ConcurrentModificationException e) {
            }
        }
        return i;
    }

    private static String dateToStrTs(long j) {
        return new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS").format(new Date(j));
    }
}
