package ru.taskurotta.service.hz.queue;

import com.hazelcast.core.DistributedObject;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.taskurotta.hazelcast.queue.CachedQueue;
import ru.taskurotta.hazelcast.queue.delay.CachedDelayQueue;
import ru.taskurotta.hazelcast.queue.delay.QueueFactory;
import ru.taskurotta.hazelcast.util.ClusterUtils;
import ru.taskurotta.service.console.model.GenericPage;
import ru.taskurotta.service.console.model.QueueStatVO;
import ru.taskurotta.service.console.retriever.QueueInfoRetriever;
import ru.taskurotta.service.hz.console.HzQueueStatTask;
import ru.taskurotta.service.queue.QueueService;
import ru.taskurotta.service.queue.TaskQueueItem;
import ru.taskurotta.transport.utils.TransportUtils;
import ru.taskurotta.util.ActorUtils;
import ru.taskurotta.util.StringUtils;

/* loaded from: input_file:ru/taskurotta/service/hz/queue/HzQueueService.class */
public class HzQueueService implements QueueService, QueueInfoRetriever {
    private long pollDelay;
    protected QueueFactory queueFactory;
    protected HazelcastInstance hazelcastInstance;
    protected String queueNamePrefix;
    protected static final String HZ_QUEUE_INFO_EXECUTOR_SERVICE = "hzQueueInfoExecutorService";
    private static final String LAST_POLLED_TASK_ENQUEUE_TIME = "lastPolledTaskEnqueueTimes";
    private ILock synchLock;
    private ILock drainLock;
    public static AtomicInteger pushedTaskToQueue = new AtomicInteger();
    public static AtomicInteger pushedTaskToQueueWithDelay = new AtomicInteger();
    private static final Logger logger = LoggerFactory.getLogger(HzQueueService.class);
    private static final String SYNCH_LOCK_NAME = HzQueueService.class.getName().concat("#SINCH_LOCK");
    private static final String DRAIN_LOCK_NAME = HzQueueService.class.getName().concat("#DRAIN_LOCK");
    private final transient ReentrantLock lock = new ReentrantLock();
    protected final ConcurrentHashMap<String, Long> lastPolledTaskEnqueueTimes = new ConcurrentHashMap<>();
    private Map<String, CachedDelayQueue<TaskQueueItem>> queueMap = new ConcurrentHashMap();
    private AtomicInteger cnt = new AtomicInteger(0);

    /* loaded from: input_file:ru/taskurotta/service/hz/queue/HzQueueService$StatisticsMerger.class */
    class StatisticsMerger implements Runnable {
        StatisticsMerger() {
        }

        @Override // java.lang.Runnable
        public void run() {
            HzQueueService.this.synchLock.lock();
            try {
                try {
                    IMap map = HzQueueService.this.hazelcastInstance.getMap(HzQueueService.LAST_POLLED_TASK_ENQUEUE_TIME);
                    ConcurrentHashMap<String, Long> concurrentHashMap = HzQueueService.this.lastPolledTaskEnqueueTimes;
                    HashSet<String> hashSet = new HashSet();
                    hashSet.addAll(map.keySet());
                    hashSet.addAll(concurrentHashMap.keySet());
                    for (String str : hashSet) {
                        Long l = (Long) map.get(str);
                        Long l2 = concurrentHashMap.get(str);
                        if (l == null) {
                            map.set(str, l2);
                        } else if (l2 == null) {
                            concurrentHashMap.put(str, l);
                        } else if (l2.longValue() > l.longValue()) {
                            map.set(str, l2);
                        } else {
                            concurrentHashMap.put(str, l);
                        }
                    }
                } catch (Throwable th) {
                    HzQueueService.logger.error("StatisticsMerger iteration failed", th);
                    HzQueueService.this.synchLock.unlock();
                }
            } finally {
                HzQueueService.this.synchLock.unlock();
            }
        }
    }

    public HzQueueService(QueueFactory queueFactory, HazelcastInstance hazelcastInstance, String str, long j, long j2) {
        this.synchLock = null;
        this.queueFactory = queueFactory;
        this.hazelcastInstance = hazelcastInstance;
        this.queueNamePrefix = str;
        this.pollDelay = j2;
        this.synchLock = hazelcastInstance.getLock(SYNCH_LOCK_NAME);
        this.drainLock = hazelcastInstance.getLock(DRAIN_LOCK_NAME);
        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new StatisticsMerger(), 0L, j, TimeUnit.MILLISECONDS);
    }

    private static QueueStatVO getItemByName(List<QueueStatVO> list, String str) {
        QueueStatVO queueStatVO = null;
        if (list != null && !list.isEmpty() && !StringUtils.isBlank(str)) {
            Iterator<QueueStatVO> it = list.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                QueueStatVO next = it.next();
                if (str.equals(next.getName())) {
                    queueStatVO = next;
                    break;
                }
            }
        }
        return queueStatVO;
    }

    public long getLastPolledTaskEnqueueTime(String str) {
        Long l = this.lastPolledTaskEnqueueTimes.get(ActorUtils.toPrefixed(str, this.queueNamePrefix));
        if (l == null) {
            return -1L;
        }
        return l.longValue();
    }

    public void clearQueue(String str) {
        getQueue(ActorUtils.toPrefixed(str, this.queueNamePrefix)).clear();
    }

    public void removeQueue(String str) {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            CachedDelayQueue<TaskQueueItem> cachedDelayQueue = this.queueMap.get(ActorUtils.toPrefixed(str, this.queueNamePrefix));
            logger.debug("Removing queue with name [{}], cached queue is [{}]", str, cachedDelayQueue);
            if (cachedDelayQueue != null) {
                this.queueMap.remove(str);
                cachedDelayQueue.destroy();
            }
        } finally {
            reentrantLock.unlock();
        }
    }

    public long getQueueStorageCount(String str) {
        if (this.queueMap.get(ActorUtils.toPrefixed(str, this.queueNamePrefix)) != null) {
            return r0.size();
        }
        return -1L;
    }

    public TaskQueueItem poll(String str, String str2) {
        String createQueueName = createQueueName(str, str2);
        CachedDelayQueue<TaskQueueItem> queue = getQueue(createQueueName);
        TaskQueueItem taskQueueItem = null;
        try {
            taskQueueItem = (TaskQueueItem) queue.poll(this.pollDelay, TimeUnit.MILLISECONDS);
            if (logger.isDebugEnabled()) {
                logger.debug("poll() returns taskQueueItem [{}]. [{}].size: {}", new Object[]{taskQueueItem, createQueueName, Integer.valueOf(queue.size())});
            }
            updateQueueEffectiveTime(createQueueName, taskQueueItem);
        } catch (InterruptedException e) {
            logger.error("Queue poll operation interrupted", e);
        }
        return taskQueueItem;
    }

    private void updateQueueEffectiveTime(String str, TaskQueueItem taskQueueItem) {
        long enqueueTime = taskQueueItem != null ? taskQueueItem.getEnqueueTime() : System.currentTimeMillis();
        this.lastPolledTaskEnqueueTimes.put(str, Long.valueOf(enqueueTime));
        logger.debug("lastPolledTaskEnqueueTimes updated for queue[{}] with new value [{}]", str, Long.valueOf(enqueueTime));
    }

    public boolean enqueueItem(String str, UUID uuid, UUID uuid2, long j, String str2) {
        pushedTaskToQueue.incrementAndGet();
        long currentTimeMillis = System.currentTimeMillis();
        if (j <= 0) {
            j = currentTimeMillis;
        }
        TaskQueueItem taskQueueItem = new TaskQueueItem();
        taskQueueItem.setTaskId(uuid);
        taskQueueItem.setProcessId(uuid2);
        taskQueueItem.setStartTime(j);
        taskQueueItem.setEnqueueTime(currentTimeMillis);
        taskQueueItem.setTaskList(str2);
        CachedDelayQueue<TaskQueueItem> queue = getQueue(createQueueName(str, str2));
        long j2 = j - currentTimeMillis;
        if (j2 > 0) {
            pushedTaskToQueueWithDelay.incrementAndGet();
        }
        try {
            return queue.delayOffer(taskQueueItem, j2, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }

    public boolean isTaskInQueue(String str, String str2, UUID uuid, UUID uuid2) {
        throw new UnsupportedOperationException("Only for all-in-memory backend");
    }

    public String createQueueName(String str, String str2) {
        return TransportUtils.createQueueName(str, str2, this.queueNamePrefix);
    }

    public GenericPage<String> getQueueList(int i, int i2) {
        List<String> taskQueueNamesByPrefix = getTaskQueueNamesByPrefix(this.queueNamePrefix, null, false);
        logger.debug("Stored queue names for queue service are [{}]", taskQueueNamesByPrefix);
        String[] strArr = (String[]) taskQueueNamesByPrefix.toArray(new String[taskQueueNamesByPrefix.size()]);
        ArrayList arrayList = new ArrayList(i2);
        if (strArr.length > 0) {
            arrayList.addAll(Arrays.asList(strArr).subList((i - 1) * i2, i2 * i >= strArr.length ? strArr.length : i2 * i));
        }
        return new GenericPage<>(prefixStrip(arrayList), i, i2, strArr.length);
    }

    public int getQueueTaskCount(String str) {
        return getQueue(ActorUtils.toPrefixed(str, this.queueNamePrefix)).size();
    }

    public GenericPage<TaskQueueItem> getQueueContent(String str, int i, int i2) {
        ArrayList arrayList = new ArrayList();
        CachedDelayQueue<TaskQueueItem> queue = getQueue(ActorUtils.toPrefixed(str, this.queueNamePrefix));
        TaskQueueItem[] taskQueueItemArr = (TaskQueueItem[]) queue.toArray(new TaskQueueItem[queue.size()]);
        if (taskQueueItemArr.length > 0) {
            arrayList.addAll(Arrays.asList(taskQueueItemArr).subList((i - 1) * i2, i2 * i >= taskQueueItemArr.length ? taskQueueItemArr.length : i2 * i));
        }
        return new GenericPage<>(arrayList, i, i2, taskQueueItemArr.length);
    }

    public Map<String, Integer> getHoveringCount(float f) {
        return null;
    }

    public GenericPage<QueueStatVO> getQueuesStatsPage(int i, int i2, String str) {
        GenericPage<QueueStatVO> genericPage = null;
        List<String> taskQueueNamesByPrefix = getTaskQueueNamesByPrefix(this.queueNamePrefix, str, true);
        if (taskQueueNamesByPrefix != null && !taskQueueNamesByPrefix.isEmpty()) {
            List<String> subList = taskQueueNamesByPrefix.subList((i - 1) * i2, Math.min(i2 * i, taskQueueNamesByPrefix.size()));
            if (!subList.isEmpty()) {
                Map submitToAllMembers = this.hazelcastInstance.getExecutorService(HZ_QUEUE_INFO_EXECUTOR_SERVICE).submitToAllMembers(new HzQueueStatTask(new ArrayList(subList), this.queueNamePrefix));
                ArrayList arrayList = new ArrayList();
                int i3 = 0;
                Iterator it = submitToAllMembers.values().iterator();
                while (it.hasNext()) {
                    try {
                        mergeByQueueName(arrayList, (List) ((Future) it.next()).get(5L, TimeUnit.SECONDS));
                        i3++;
                    } catch (Exception e) {
                        logger.warn("Cannot obtain QueueStatVO data from node", e);
                    }
                }
                if (!arrayList.isEmpty()) {
                    for (QueueStatVO queueStatVO : arrayList) {
                        queueStatVO.setNodes(i3);
                        queueStatVO.setLocal(ClusterUtils.isLocalCachedQueue(this.hazelcastInstance, this.queueNamePrefix + queueStatVO.getName()));
                    }
                    genericPage = new GenericPage<>(arrayList, i, i2, taskQueueNamesByPrefix.size());
                }
            }
        }
        return genericPage;
    }

    public List<String> getQueueNames() {
        return getTaskQueueNamesByPrefix(this.queueNamePrefix, null, false);
    }

    private CachedDelayQueue<TaskQueueItem> getQueue(String str) {
        CachedDelayQueue<TaskQueueItem> cachedDelayQueue = this.queueMap.get(str);
        if (cachedDelayQueue == null) {
            ReentrantLock reentrantLock = this.lock;
            reentrantLock.lock();
            try {
                cachedDelayQueue = this.queueMap.get(str);
                if (cachedDelayQueue == null) {
                    cachedDelayQueue = this.queueFactory.create(str);
                    this.queueMap.put(str, cachedDelayQueue);
                }
            } finally {
                reentrantLock.unlock();
            }
        }
        return cachedDelayQueue;
    }

    private List<String> getTaskQueueNamesByPrefix(String str, String str2, boolean z) {
        ArrayList arrayList = new ArrayList();
        for (DistributedObject distributedObject : this.hazelcastInstance.getDistributedObjects()) {
            if (distributedObject instanceof CachedQueue) {
                String name = distributedObject.getName();
                if (name.startsWith(str)) {
                    String substring = z ? name.substring(str.length()) : name;
                    if (StringUtils.isBlank(str2) || substring.startsWith(str2)) {
                        arrayList.add(substring);
                    }
                }
            }
        }
        return arrayList;
    }

    private List<String> prefixStrip(List<String> list) {
        if (this.queueNamePrefix == null) {
            return list;
        }
        ArrayList arrayList = null;
        if (list != null && !list.isEmpty()) {
            arrayList = new ArrayList();
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().substring(this.queueNamePrefix.length()));
            }
        }
        return arrayList;
    }

    private void mergeByQueueName(List<QueueStatVO> list, List<QueueStatVO> list2) {
        if (list2 == null || list2.isEmpty()) {
            return;
        }
        for (QueueStatVO queueStatVO : list2) {
            QueueStatVO itemByName = getItemByName(list, queueStatVO.getName());
            if (itemByName != null) {
                itemByName.sumValuesWith(queueStatVO);
            } else {
                list.add(queueStatVO);
            }
        }
    }
}
