package org.usergrid.mq.cassandra;

import com.fasterxml.uuid.UUIDComparator;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
import me.prettyprint.cassandra.serializers.BytesArraySerializer;
import me.prettyprint.cassandra.serializers.DynamicCompositeSerializer;
import me.prettyprint.cassandra.serializers.LongSerializer;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.serializers.UUIDSerializer;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.AbstractComposite;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.CounterRow;
import me.prettyprint.hector.api.beans.CounterRows;
import me.prettyprint.hector.api.beans.CounterSlice;
import me.prettyprint.hector.api.beans.DynamicComposite;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.HCounterColumn;
import me.prettyprint.hector.api.beans.Row;
import me.prettyprint.hector.api.beans.Rows;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.MultigetSliceCounterQuery;
import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.SliceCounterQuery;
import me.prettyprint.hector.api.query.SliceQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.usergrid.mq.CounterQuery;
import org.usergrid.mq.Message;
import org.usergrid.mq.Query;
import org.usergrid.mq.QueryProcessor;
import org.usergrid.mq.Queue;
import org.usergrid.mq.QueueManager;
import org.usergrid.mq.QueuePosition;
import org.usergrid.mq.QueueQuery;
import org.usergrid.mq.QueueResults;
import org.usergrid.mq.QueueSet;
import org.usergrid.mq.cassandra.QueueIndexUpdate;
import org.usergrid.persistence.AggregateCounter;
import org.usergrid.persistence.AggregateCounterSet;
import org.usergrid.persistence.CounterResolution;
import org.usergrid.persistence.Results;
import org.usergrid.persistence.Schema;
import org.usergrid.persistence.cassandra.ApplicationCF;
import org.usergrid.persistence.cassandra.CassandraPersistenceUtils;
import org.usergrid.persistence.cassandra.CassandraService;
import org.usergrid.persistence.cassandra.CounterUtils;
import org.usergrid.utils.CompositeUtils;
import org.usergrid.utils.ConversionUtils;
import org.usergrid.utils.IndexUtils;
import org.usergrid.utils.MapUtils;
import org.usergrid.utils.NumberUtils;
import org.usergrid.utils.UUIDUtils;

/* loaded from: input_file:usergrid-core-0.0.12.jar:org/usergrid/mq/cassandra/QueueManagerImpl.class */
public class QueueManagerImpl implements QueueManager {
    public static final String DICTIONARY_SUBSCRIBER_INDEXES = "subscriber_indexes";
    public static final String DICTIONARY_MESSAGE_INDEXES = "message_indexes";
    public static final int QUEUE_SHARD_INTERVAL = 86400000;
    public static final int INDEX_ENTRY_LIST_COUNT = 1000;
    public static final int DEFAULT_SEARCH_COUNT = 10000;
    public static final int ALL_COUNT = 100000000;
    UUID applicationId;
    QueueManagerFactoryImpl mmf;
    CassandraService cass;
    CounterUtils counterUtils;
    public static final Logger logger = LoggerFactory.getLogger(QueueManagerImpl.class);
    public static final StringSerializer se = new StringSerializer();
    public static final ByteBufferSerializer be = new ByteBufferSerializer();
    public static final UUIDSerializer ue = new UUIDSerializer();
    public static final BytesArraySerializer bae = new BytesArraySerializer();
    public static final DynamicCompositeSerializer dce = new DynamicCompositeSerializer();
    public static final LongSerializer le = new LongSerializer();

    /* loaded from: input_file:usergrid-core-0.0.12.jar:org/usergrid/mq/cassandra/QueueManagerImpl$QueueBounds.class */
    public static final class QueueBounds {
        private final UUID oldest;
        private final UUID newest;

        public QueueBounds(UUID uuid, UUID uuid2) {
            this.oldest = uuid;
            this.newest = uuid2;
        }

        public UUID getOldest() {
            return this.oldest;
        }

        public UUID getNewest() {
            return this.newest;
        }

        public int hashCode() {
            return (31 * ((31 * 1) + (this.newest == null ? 0 : this.newest.hashCode()))) + (this.oldest == null ? 0 : this.oldest.hashCode());
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            QueueBounds queueBounds = (QueueBounds) obj;
            if (this.newest == null) {
                if (queueBounds.newest != null) {
                    return false;
                }
            } else if (!this.newest.equals(queueBounds.newest)) {
                return false;
            }
            return this.oldest == null ? queueBounds.oldest == null : this.oldest.equals(queueBounds.oldest);
        }

        public String toString() {
            return "QueueBounds [oldest=" + this.oldest + ", newest=" + this.newest + "]";
        }
    }

    public QueueManagerImpl init(QueueManagerFactoryImpl queueManagerFactoryImpl, CassandraService cassandraService, CounterUtils counterUtils, UUID uuid) {
        this.mmf = queueManagerFactoryImpl;
        this.cass = cassandraService;
        this.counterUtils = counterUtils;
        this.applicationId = uuid;
        return this;
    }

    @Override // org.usergrid.mq.QueueManager
    public Message getMessage(UUID uuid) {
        SliceQuery createSliceQuery = HFactory.createSliceQuery(this.cass.getApplicationKeyspace(this.applicationId), ue, se, be);
        createSliceQuery.setColumnFamily(QueuesCF.MESSAGE_PROPERTIES.getColumnFamily());
        createSliceQuery.setKey(uuid);
        createSliceQuery.setRange(null, null, false, ALL_COUNT);
        return CassandraMQUtils.deserializeMessage(((ColumnSlice) createSliceQuery.execute().get()).getColumns());
    }

    public Message batchPostToQueue(Mutator<ByteBuffer> mutator, String str, Message message, MessageIndexUpdate messageIndexUpdate, long j) {
        String normalizeQueuePath = Queue.normalizeQueuePath(str);
        UUID queueId = Queue.getQueueId(normalizeQueuePath);
        message.sync();
        CassandraMQUtils.addMessageToMutator(mutator, message, j);
        long roundLong = NumberUtils.roundLong(message.getTimestamp(), 86400000L);
        mutator.addInsertion((Mutator<ByteBuffer>) CassandraMQUtils.getQueueShardRowKey(queueId, roundLong), QueuesCF.QUEUE_INBOX.getColumnFamily(), HFactory.createColumn(message.getUuid(), ByteBuffer.allocate(0), j, ue, be));
        mutator.addInsertion((Mutator<ByteBuffer>) ConversionUtils.bytebuffer(queueId), QueuesCF.QUEUE_PROPERTIES.getColumnFamily(), HFactory.createColumn(Queue.QUEUE_OLDEST, message.getUuid(), Long.MAX_VALUE - UUIDUtils.getTimestampInMicros(message.getUuid()), se, ue));
        mutator.addInsertion((Mutator<ByteBuffer>) ConversionUtils.bytebuffer(queueId), QueuesCF.QUEUE_PROPERTIES.getColumnFamily(), HFactory.createColumn(Queue.QUEUE_NEWEST, message.getUuid(), UUIDUtils.getTimestampInMicros(message.getUuid()), se, ue));
        mutator.addInsertion((Mutator<ByteBuffer>) ConversionUtils.bytebuffer(Queue.getQueueId("/")), QueuesCF.QUEUE_SUBSCRIBERS.getColumnFamily(), HFactory.createColumn(normalizeQueuePath, queueId, j, se, ue));
        this.counterUtils.batchIncrementQueueCounter(mutator, Queue.getQueueId("/"), normalizeQueuePath, 1L, j, this.applicationId);
        if (messageIndexUpdate == null) {
            messageIndexUpdate = new MessageIndexUpdate(message);
        }
        messageIndexUpdate.addToMutation(mutator, queueId, roundLong, j);
        this.counterUtils.addMessageCounterMutations(mutator, this.applicationId, queueId, message, j);
        mutator.addInsertion((Mutator<ByteBuffer>) ConversionUtils.bytebuffer(queueId), QueuesCF.QUEUE_PROPERTIES.getColumnFamily(), HFactory.createColumn("created", Long.valueOf(j / 1000), Long.MAX_VALUE - j, se, le));
        mutator.addInsertion((Mutator<ByteBuffer>) ConversionUtils.bytebuffer(queueId), QueuesCF.QUEUE_PROPERTIES.getColumnFamily(), HFactory.createColumn("modified", Long.valueOf(j / 1000), j, se, le));
        return message;
    }

    @Override // org.usergrid.mq.QueueManager
    public Message postToQueue(String str, Message message) {
        QueueSet subscribers;
        long createTimestamp = this.cass.createTimestamp();
        Mutator<ByteBuffer> createMutator = HFactory.createMutator(this.cass.getApplicationKeyspace(this.applicationId), be);
        String normalizeQueuePath = Queue.normalizeQueuePath(str);
        MessageIndexUpdate messageIndexUpdate = new MessageIndexUpdate(message);
        batchPostToQueue(createMutator, normalizeQueuePath, message, messageIndexUpdate, createTimestamp);
        CassandraPersistenceUtils.batchExecute(createMutator, 5);
        String str2 = null;
        do {
            subscribers = getSubscribers(normalizeQueuePath, str2, 1000);
            if (subscribers.getQueues().isEmpty()) {
                break;
            }
            Mutator<ByteBuffer> createMutator2 = HFactory.createMutator(this.cass.getApplicationKeyspace(this.applicationId), be);
            for (QueueSet.QueueInfo queueInfo : subscribers.getQueues()) {
                batchPostToQueue(createMutator2, queueInfo.getPath(), message, messageIndexUpdate, createTimestamp);
                str2 = queueInfo.getPath();
            }
            CassandraPersistenceUtils.batchExecute(createMutator2, 5);
        } while (subscribers.hasMore());
        return message;
    }

    @Override // org.usergrid.mq.QueueManager
    public List<Message> postToQueue(String str, List<Message> list) {
        Iterator<Message> it = list.iterator();
        while (it.hasNext()) {
            postToQueue(str, it.next());
        }
        return list;
    }

    public UUID getNewClient(UUID uuid) {
        UUID newTimeUUID = UUIDUtils.newTimeUUID();
        long createTimestamp = this.cass.createTimestamp();
        Mutator createMutator = HFactory.createMutator(this.cass.getApplicationKeyspace(uuid), ue);
        createMutator.addInsertion((Mutator) newTimeUUID, QueuesCF.CONSUMERS.getColumnFamily(), HFactory.createColumn("timestamp", Long.valueOf(createTimestamp), createTimestamp, se, le));
        CassandraPersistenceUtils.batchExecute(createMutator, 5);
        return null;
    }

    public UUID getConsumerQueuePosition(Keyspace keyspace, UUID uuid, UUID uuid2) {
        try {
            HColumn hColumn = (HColumn) HFactory.createColumnQuery(keyspace, ue, ue, ue).setKey(uuid).setName(uuid2).setColumnFamily(QueuesCF.CONSUMERS.getColumnFamily()).execute().get();
            if (hColumn != null) {
                return (UUID) hColumn.getValue();
            }
            return null;
        } catch (Exception e) {
            logger.error("Error getting next session cursor", (Throwable) e);
            return null;
        }
    }

    public QueueBounds getQueueBounds(Keyspace keyspace, UUID uuid) {
        try {
            ColumnSlice columnSlice = (ColumnSlice) HFactory.createSliceQuery(keyspace, ue, se, ue).setKey(uuid).setColumnNames(Queue.QUEUE_NEWEST, Queue.QUEUE_OLDEST).setColumnFamily(QueuesCF.QUEUE_PROPERTIES.getColumnFamily()).execute().get();
            if (columnSlice != null) {
                return new QueueBounds((UUID) columnSlice.getColumnByName(Queue.QUEUE_OLDEST).getValue(), (UUID) columnSlice.getColumnByName(Queue.QUEUE_NEWEST).getValue());
            }
            return null;
        } catch (Exception e) {
            logger.error("Error getting oldest queue message ID", (Throwable) e);
            return null;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public TreeSet<UUID> getQueueRange(Keyspace keyspace, UUID uuid, QueueBounds queueBounds, TreeSet<UUID> treeSet, UUID uuid2, UUID uuid3, boolean z, int i) {
        if (queueBounds == null && (uuid2 == null || uuid3 == null)) {
            logger.error("Necessary queue bounds not found");
            return treeSet;
        }
        if (uuid2 == null) {
            uuid2 = z ? queueBounds.getNewest() : queueBounds.getOldest();
        }
        if (uuid2 == null) {
            logger.error("No first message in queue");
            return treeSet;
        }
        if (uuid3 == null) {
            uuid3 = z ? queueBounds.getOldest() : queueBounds.getNewest();
        }
        if (uuid3 == null) {
            logger.error("No last message in queue");
            return treeSet;
        }
        long roundLong = NumberUtils.roundLong(UUIDUtils.getTimestampInMillis(uuid2), 86400000L);
        long roundLong2 = NumberUtils.roundLong(UUIDUtils.getTimestampInMillis(uuid3), 86400000L);
        long j = roundLong;
        if (z) {
            j = roundLong2;
        }
        while (j >= roundLong && j <= roundLong2) {
            UUID uuid4 = UUIDUtils.MIN_TIME_UUID;
            UUID uuid5 = UUIDUtils.MAX_TIME_UUID;
            if (j == roundLong) {
                uuid4 = uuid2;
            }
            if (j == roundLong2) {
                uuid5 = uuid3;
            }
            try {
                SliceQuery createSliceQuery = HFactory.createSliceQuery(keyspace, be, ue, be);
                createSliceQuery.setColumnFamily(QueuesCF.QUEUE_INBOX.getColumnFamily());
                createSliceQuery.setKey(CassandraMQUtils.getQueueShardRowKey(uuid, j));
                createSliceQuery.setRange(uuid4, uuid5, z, i);
                Iterator it = ((ColumnSlice) createSliceQuery.execute().get()).getColumns().iterator();
                while (it.hasNext()) {
                    treeSet.add(((HColumn) it.next()).getName());
                    if (treeSet.size() >= i) {
                        return treeSet;
                    }
                }
                j = z ? j - 86400000 : j + 86400000;
            } catch (Exception e) {
                logger.error("Error getting previous messages", (Throwable) e);
                return treeSet;
            }
        }
        return treeSet;
    }

    public TreeSet<UUID> searchQueueRange(Keyspace keyspace, UUID uuid, QueueBounds queueBounds, TreeSet<UUID> treeSet, UUID uuid2, UUID uuid3, boolean z, QueryProcessor.QuerySlice querySlice, int i) {
        if (treeSet == null) {
            treeSet = new TreeSet<>(new UUIDComparator());
        }
        if (queueBounds == null && (uuid2 == null || uuid3 == null)) {
            logger.error("Necessary queue bounds not found");
            return treeSet;
        }
        if (uuid2 == null) {
            uuid2 = queueBounds.getOldest();
        }
        if (uuid2 == null) {
            logger.error("No first message in queue");
            return treeSet;
        }
        if (uuid3 == null) {
            uuid3 = z ? queueBounds.getOldest() : queueBounds.getNewest();
        }
        if (uuid3 == null) {
            logger.error("No last message in queue");
            return treeSet;
        }
        long roundLong = NumberUtils.roundLong(UUIDUtils.getTimestampInMillis(uuid2), 86400000L);
        long roundLong2 = NumberUtils.roundLong(UUIDUtils.getTimestampInMillis(uuid3), 86400000L);
        long j = roundLong;
        if (z) {
            j = roundLong2;
        }
        ByteBuffer byteBuffer = null;
        if (querySlice.getCursor() != null) {
            byteBuffer = querySlice.getCursor();
        } else if (querySlice.getStart() != null) {
            DynamicComposite dynamicComposite = new DynamicComposite(Byte.valueOf(querySlice.getStart().getCode()), querySlice.getStart().getValue());
            if (!querySlice.getStart().isInclusive()) {
                CompositeUtils.setEqualityFlag(dynamicComposite, AbstractComposite.ComponentEquality.GREATER_THAN_EQUAL);
            }
            byteBuffer = dynamicComposite.serialize();
        }
        ByteBuffer byteBuffer2 = null;
        if (querySlice.getFinish() != null) {
            DynamicComposite dynamicComposite2 = new DynamicComposite(Byte.valueOf(querySlice.getFinish().getCode()), querySlice.getFinish().getValue());
            if (querySlice.getFinish().isInclusive()) {
                CompositeUtils.setEqualityFlag(dynamicComposite2, AbstractComposite.ComponentEquality.GREATER_THAN_EQUAL);
            }
            byteBuffer2 = dynamicComposite2.serialize();
        }
        while (j >= roundLong && j <= roundLong2 && treeSet.size() < i) {
            while (true) {
                List columns = ((ColumnSlice) HFactory.createSliceQuery(keyspace, be, be, be).setColumnFamily(QueuesCF.PROPERTY_INDEX.getColumnFamily()).setKey(ConversionUtils.bytebuffer(CassandraPersistenceUtils.key(uuid, Long.valueOf(j), querySlice.getPropertyName()))).setRange(byteBuffer, byteBuffer2, false, 10000).execute().get()).getColumns();
                Iterator it = columns.iterator();
                while (it.hasNext()) {
                    UUID uuid4 = (UUID) DynamicComposite.fromByteBuffer(((ByteBuffer) ((HColumn) it.next()).getName()).duplicate()).get(2, ue);
                    if (UUIDComparator.staticCompare(uuid4, uuid2) >= 0 && UUIDComparator.staticCompare(uuid4, uuid3) <= 0) {
                        add(treeSet, uuid4, z, i);
                    }
                }
                if (columns.size() < 10000) {
                    break;
                }
                byteBuffer = ((ByteBuffer) ((HColumn) columns.get(columns.size() - 1)).getName()).duplicate();
            }
            j = z ? j - 86400000 : j + 86400000;
        }
        return treeSet;
    }

    static TreeSet<UUID> add(TreeSet<UUID> treeSet, UUID uuid, boolean z, int i) {
        if (treeSet == null) {
            treeSet = new TreeSet<>(new UUIDComparator());
        }
        if (uuid == null) {
            return treeSet;
        }
        if (treeSet.size() < i) {
            treeSet.add(uuid);
        } else if (z) {
            if (UUIDComparator.staticCompare(uuid, treeSet.first()) > 0) {
                treeSet.pollFirst();
                treeSet.add(uuid);
            }
        } else if (UUIDComparator.staticCompare(uuid, treeSet.last()) < 0) {
            treeSet.pollLast();
            treeSet.add(uuid);
        }
        return treeSet;
    }

    static TreeSet<UUID> add(TreeSet<UUID> treeSet, TreeSet<UUID> treeSet2, boolean z, int i) {
        if (treeSet2 == null) {
            return treeSet;
        }
        Iterator<UUID> it = treeSet2.iterator();
        while (it.hasNext()) {
            treeSet = add(treeSet, it.next(), z, i);
        }
        return treeSet;
    }

    static TreeSet<UUID> mergeOr(TreeSet<UUID> treeSet, TreeSet<UUID> treeSet2, boolean z, int i) {
        TreeSet<UUID> treeSet3 = new TreeSet<>(new UUIDComparator());
        if (treeSet == null && treeSet2 == null) {
            return treeSet3;
        }
        if (treeSet == null) {
            return treeSet2;
        }
        if (treeSet2 == null) {
            return treeSet;
        }
        add(treeSet3, treeSet, z, i);
        add(treeSet3, treeSet2, z, i);
        return treeSet3;
    }

    static TreeSet<UUID> mergeAnd(TreeSet<UUID> treeSet, TreeSet<UUID> treeSet2, boolean z, int i) {
        TreeSet<UUID> treeSet3 = new TreeSet<>(new UUIDComparator());
        if (treeSet != null && treeSet2 != null) {
            Iterator<UUID> it = treeSet2.iterator();
            while (it.hasNext()) {
                UUID next = it.next();
                if (treeSet.contains(treeSet2)) {
                    add(treeSet3, next, z, i);
                }
            }
            return treeSet3;
        }
        return treeSet3;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v152, types: [java.util.List] */
    @Override // org.usergrid.mq.QueueManager
    public QueueResults getFromQueue(String str, QueueQuery queueQuery) {
        int i;
        if (queueQuery == null) {
            queueQuery = new QueueQuery();
        }
        Keyspace applicationKeyspace = this.cass.getApplicationKeyspace(this.applicationId);
        String normalizeQueuePath = Queue.normalizeQueuePath(str);
        if (normalizeQueuePath == null) {
            normalizeQueuePath = "/";
        }
        logger.info("QueueManagerFactoryImpl.getFromQueue: {}", normalizeQueuePath);
        UUID queueId = Queue.getQueueId(normalizeQueuePath);
        UUID consumerId = queueQuery.getConsumerId();
        if (consumerId == null && queueQuery.getPosition() == QueuePosition.CONSUMER) {
            consumerId = UUIDUtils.newTimeUUID();
        }
        boolean z = false;
        UUID lastMessageId = queueQuery.getLastMessageId();
        int previousCount = queueQuery.getPreviousCount();
        int nextCount = queueQuery.getNextCount();
        int limit = queueQuery.getLimit();
        if (queueQuery.getPosition() == QueuePosition.LAST) {
            if (queueQuery.getLastTimestamp() == 0 && lastMessageId == null) {
                lastMessageId = getConsumerQueuePosition(applicationKeyspace, queueId, queueId);
                z = lastMessageId != null;
            }
        } else if (queueQuery.getPosition() == QueuePosition.CONSUMER) {
            if (queueQuery.getLastTimestamp() == 0 && lastMessageId == null) {
                lastMessageId = getConsumerQueuePosition(applicationKeyspace, consumerId, queueId);
                z = lastMessageId != null;
            }
        } else if (queueQuery.getPosition() == QueuePosition.START) {
            lastMessageId = null;
            nextCount = Math.max(nextCount, limit);
        } else if (queueQuery.getPosition() == QueuePosition.END) {
            lastMessageId = null;
            previousCount = Math.max(previousCount, limit);
        }
        QueueBounds queueBounds = getQueueBounds(applicationKeyspace, queueId);
        TreeSet<UUID> treeSet = new TreeSet<>(new UUIDComparator());
        if (queueQuery.hasFilterPredicates()) {
            List<QueryProcessor.QuerySlice> slices = new QueryProcessor(queueQuery).getSlices();
            TreeSet<UUID> treeSet2 = null;
            if (previousCount > 0) {
                if (slices.size() > 1) {
                    previousCount = 10000;
                }
                Iterator<QueryProcessor.QuerySlice> it = slices.iterator();
                while (it.hasNext()) {
                    TreeSet<UUID> treeSet3 = null;
                    try {
                        treeSet3 = searchQueueRange(applicationKeyspace, queueId, queueBounds, null, lastMessageId, null, true, it.next(), previousCount);
                    } catch (Exception e) {
                        logger.error("Error during search", (Throwable) e);
                    }
                    if (treeSet2 != null) {
                        mergeAnd(treeSet2, treeSet3, true, 10000);
                    } else {
                        treeSet2 = treeSet3;
                    }
                }
            }
            TreeSet<UUID> treeSet4 = null;
            if (slices.size() > 1) {
                nextCount = 10000;
            }
            for (QueryProcessor.QuerySlice querySlice : slices) {
                TreeSet<UUID> treeSet5 = null;
                UUID uuid = lastMessageId;
                if (z) {
                    try {
                        i = nextCount + 1;
                    } catch (Exception e2) {
                        logger.error("Error during search", (Throwable) e2);
                    }
                } else {
                    i = nextCount;
                }
                treeSet5 = searchQueueRange(applicationKeyspace, queueId, queueBounds, null, uuid, null, false, querySlice, i);
                if (treeSet4 != null) {
                    mergeAnd(treeSet4, treeSet5, true, 10000);
                } else {
                    treeSet4 = treeSet5;
                }
            }
        } else {
            if (previousCount > 0) {
                treeSet = getQueueRange(applicationKeyspace, queueId, queueBounds, treeSet, lastMessageId, null, true, previousCount);
            }
            treeSet = getQueueRange(applicationKeyspace, queueId, queueBounds, treeSet, lastMessageId, null, false, z ? nextCount + 1 : nextCount);
        }
        if (z && queueQuery.isUpdate()) {
            treeSet.remove(lastMessageId);
        }
        ArrayList arrayList = new ArrayList(treeSet);
        UUIDUtils.sortReversed(arrayList);
        int limit2 = queueQuery.getLimit();
        if (limit2 > 0 && limit2 < arrayList.size()) {
            arrayList = arrayList.subList(0, limit2);
        }
        Rows rows = (Rows) HFactory.createMultigetSliceQuery(applicationKeyspace, ue, se, be).setColumnFamily(QueuesCF.MESSAGE_PROPERTIES.getColumnFamily()).setKeys(arrayList).setRange(null, null, false, ALL_COUNT).execute().get();
        ArrayList arrayList2 = new ArrayList();
        Iterator it2 = rows.iterator();
        while (it2.hasNext()) {
            Message deserializeMessage = CassandraMQUtils.deserializeMessage(((Row) it2.next()).getColumnSlice().getColumns());
            if (deserializeMessage != null) {
                arrayList2.add(deserializeMessage);
            }
        }
        Message.sortReversed(arrayList2);
        UUID uuid2 = null;
        if (arrayList2.size() > 0) {
            uuid2 = ((Message) arrayList2.get(0)).getUuid();
        } else if (lastMessageId != null) {
            uuid2 = lastMessageId;
        }
        if (uuid2 != null && queueQuery.isUpdate()) {
            Mutator createMutator = HFactory.createMutator(applicationKeyspace, ue);
            if (queueQuery.getPosition() == QueuePosition.LAST) {
                createMutator.addInsertion((Mutator) queueId, QueuesCF.CONSUMERS.getColumnFamily(), HFactory.createColumn(queueId, uuid2, this.cass.createTimestamp(), ue, ue));
            } else if (queueQuery.getPosition() == QueuePosition.CONSUMER) {
                createMutator.addInsertion((Mutator) consumerId, QueuesCF.CONSUMERS.getColumnFamily(), HFactory.createColumn(queueId, uuid2, this.cass.createTimestamp(), ue, ue));
            }
            CassandraPersistenceUtils.batchExecute(createMutator, 5);
        }
        return new QueueResults(normalizeQueuePath, queueId, arrayList2, uuid2, consumerId);
    }

    public void batchSubscribeToQueue(Mutator<ByteBuffer> mutator, String str, UUID uuid, String str2, UUID uuid2, long j) {
        mutator.addInsertion((Mutator<ByteBuffer>) ConversionUtils.bytebuffer(uuid), QueuesCF.QUEUE_SUBSCRIBERS.getColumnFamily(), HFactory.createColumn(str2, uuid2, j, se, ue));
        mutator.addInsertion((Mutator<ByteBuffer>) ConversionUtils.bytebuffer(uuid2), QueuesCF.QUEUE_SUBSCRIPTIONS.getColumnFamily(), HFactory.createColumn(str, uuid, j, se, ue));
    }

    @Override // org.usergrid.mq.QueueManager
    public QueueSet subscribeToQueue(String str, String str2) {
        String normalizeQueuePath = Queue.normalizeQueuePath(str);
        UUID queueId = Queue.getQueueId(normalizeQueuePath);
        String normalizeQueuePath2 = Queue.normalizeQueuePath(str2);
        UUID queueId2 = Queue.getQueueId(normalizeQueuePath2);
        UUID newTimeUUID = UUIDUtils.newTimeUUID();
        long timestampInMicros = UUIDUtils.getTimestampInMicros(newTimeUUID);
        Mutator<ByteBuffer> createMutator = HFactory.createMutator(this.cass.getApplicationKeyspace(this.applicationId), be);
        batchSubscribeToQueue(createMutator, normalizeQueuePath, queueId, normalizeQueuePath2, queueId2, timestampInMicros);
        try {
            Queue queue = getQueue(normalizeQueuePath2, queueId2);
            if (queue != null) {
                batchUpdateQueuePropertiesIndexes(createMutator, queueId, normalizeQueuePath2, queueId2, queue.getProperties(), newTimeUUID);
            }
        } catch (Exception e) {
            logger.error("Unable to update index", (Throwable) e);
        }
        CassandraPersistenceUtils.batchExecute(createMutator, 5);
        return new QueueSet().addQueue(normalizeQueuePath2, queueId2);
    }

    public void batchUnsubscribeFromQueue(Mutator<ByteBuffer> mutator, String str, UUID uuid, String str2, UUID uuid2, long j) {
        mutator.addDeletion(ConversionUtils.bytebuffer(uuid), QueuesCF.QUEUE_SUBSCRIBERS.getColumnFamily(), str2, se, j);
        mutator.addDeletion(ConversionUtils.bytebuffer(uuid2), QueuesCF.QUEUE_SUBSCRIPTIONS.getColumnFamily(), str, se, j);
    }

    @Override // org.usergrid.mq.QueueManager
    public QueueSet unsubscribeFromQueue(String str, String str2) {
        String normalizeQueuePath = Queue.normalizeQueuePath(str);
        UUID queueId = Queue.getQueueId(normalizeQueuePath);
        String normalizeQueuePath2 = Queue.normalizeQueuePath(str2);
        UUID queueId2 = Queue.getQueueId(normalizeQueuePath2);
        UUID newTimeUUID = UUIDUtils.newTimeUUID();
        long timestampInMicros = UUIDUtils.getTimestampInMicros(newTimeUUID);
        Mutator<ByteBuffer> createMutator = HFactory.createMutator(this.cass.getApplicationKeyspace(this.applicationId), be);
        batchUnsubscribeFromQueue(createMutator, normalizeQueuePath, queueId, normalizeQueuePath2, queueId2, timestampInMicros);
        try {
            batchUpdateQueuePropertiesIndexes(createMutator, queueId, normalizeQueuePath2, queueId2, MapUtils.emptyMapWithKeys(getQueue(normalizeQueuePath2, queueId2).getProperties()), newTimeUUID);
        } catch (Exception e) {
            logger.error("Unable to update index", (Throwable) e);
        }
        CassandraPersistenceUtils.batchExecute(createMutator, 5);
        return new QueueSet().addQueue(normalizeQueuePath2, queueId2);
    }

    @Override // org.usergrid.mq.QueueManager
    public QueueSet getSubscribers(String str, String str2, int i) {
        UUID queueId = Queue.getQueueId(str);
        Keyspace applicationKeyspace = this.cass.getApplicationKeyspace(this.applicationId);
        if (str2 != null) {
            i++;
        }
        List columns = ((ColumnSlice) HFactory.createSliceQuery(applicationKeyspace, ue, se, ue).setKey(queueId).setColumnFamily(QueuesCF.QUEUE_SUBSCRIBERS.getColumnFamily()).setRange(Queue.normalizeQueuePath(str2), null, false, i + 1).execute().get()).getColumns();
        QueueSet queueSet = new QueueSet();
        int min = Math.min(i, columns.size());
        if (columns != null) {
            for (int i2 = str2 != null ? 1 : 0; i2 < min; i2++) {
                HColumn hColumn = (HColumn) columns.get(i2);
                queueSet.addQueue((String) hColumn.getName(), (UUID) hColumn.getValue());
            }
        }
        if (columns.size() > i) {
            queueSet.setMore(true);
        }
        return queueSet;
    }

    @Override // org.usergrid.mq.QueueManager
    public QueueSet getSubscriptions(String str, String str2, int i) {
        UUID queueId = Queue.getQueueId(str);
        Keyspace applicationKeyspace = this.cass.getApplicationKeyspace(this.applicationId);
        if (str2 != null) {
            i++;
        }
        List columns = ((ColumnSlice) HFactory.createSliceQuery(applicationKeyspace, ue, se, ue).setKey(queueId).setColumnFamily(QueuesCF.QUEUE_SUBSCRIPTIONS.getColumnFamily()).setRange(Queue.normalizeQueuePath(str2), null, false, i + 1).execute().get()).getColumns();
        QueueSet queueSet = new QueueSet();
        int min = Math.min(i, columns.size());
        if (columns != null) {
            for (int i2 = str2 != null ? 1 : 0; i2 < min; i2++) {
                HColumn hColumn = (HColumn) columns.get(i2);
                queueSet.addQueue((String) hColumn.getName(), (UUID) hColumn.getValue());
            }
        }
        if (columns.size() > i) {
            queueSet.setMore(true);
        }
        return queueSet;
    }

    @Override // org.usergrid.mq.QueueManager
    public QueueSet addSubscribersToQueue(String str, List<String> list) {
        String normalizeQueuePath = Queue.normalizeQueuePath(str);
        UUID queueId = Queue.getQueueId(normalizeQueuePath);
        UUID newTimeUUID = UUIDUtils.newTimeUUID();
        long timestampInMicros = UUIDUtils.getTimestampInMicros(newTimeUUID);
        Mutator<ByteBuffer> createMutator = HFactory.createMutator(this.cass.getApplicationKeyspace(this.applicationId), be);
        QueueSet queueSet = new QueueSet();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            String normalizeQueuePath2 = Queue.normalizeQueuePath(it.next());
            UUID queueId2 = Queue.getQueueId(normalizeQueuePath2);
            batchSubscribeToQueue(createMutator, normalizeQueuePath, queueId, normalizeQueuePath2, queueId2, timestampInMicros);
            try {
                batchUpdateQueuePropertiesIndexes(createMutator, queueId, normalizeQueuePath2, queueId2, getQueue(normalizeQueuePath2, queueId2).getProperties(), newTimeUUID);
            } catch (Exception e) {
                logger.error("Unable to update index", (Throwable) e);
            }
            queueSet.addQueue(normalizeQueuePath2, queueId2);
        }
        CassandraPersistenceUtils.batchExecute(createMutator, 5);
        return queueSet;
    }

    @Override // org.usergrid.mq.QueueManager
    public QueueSet removeSubscribersFromQueue(String str, List<String> list) {
        String normalizeQueuePath = Queue.normalizeQueuePath(str);
        UUID queueId = Queue.getQueueId(normalizeQueuePath);
        UUID newTimeUUID = UUIDUtils.newTimeUUID();
        long timestampInMicros = UUIDUtils.getTimestampInMicros(newTimeUUID);
        Mutator<ByteBuffer> createMutator = HFactory.createMutator(this.cass.getApplicationKeyspace(this.applicationId), be);
        QueueSet queueSet = new QueueSet();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            String normalizeQueuePath2 = Queue.normalizeQueuePath(it.next());
            UUID queueId2 = Queue.getQueueId(normalizeQueuePath2);
            batchUnsubscribeFromQueue(createMutator, normalizeQueuePath, queueId, normalizeQueuePath2, queueId2, timestampInMicros);
            try {
                batchUpdateQueuePropertiesIndexes(createMutator, queueId, normalizeQueuePath2, queueId2, MapUtils.emptyMapWithKeys(getQueue(normalizeQueuePath2, queueId2).getProperties()), newTimeUUID);
            } catch (Exception e) {
                logger.error("Unable to update index", (Throwable) e);
            }
            queueSet.addQueue(normalizeQueuePath2, queueId2);
        }
        CassandraPersistenceUtils.batchExecute(createMutator, 5);
        return queueSet;
    }

    @Override // org.usergrid.mq.QueueManager
    public QueueSet subscribeToQueues(String str, List<String> list) {
        String normalizeQueuePath = Queue.normalizeQueuePath(str);
        UUID queueId = Queue.getQueueId(normalizeQueuePath);
        UUID newTimeUUID = UUIDUtils.newTimeUUID();
        long timestampInMicros = UUIDUtils.getTimestampInMicros(newTimeUUID);
        Mutator<ByteBuffer> createMutator = HFactory.createMutator(this.cass.getApplicationKeyspace(this.applicationId), be);
        QueueSet queueSet = new QueueSet();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            String normalizeQueuePath2 = Queue.normalizeQueuePath(it.next());
            UUID queueId2 = Queue.getQueueId(normalizeQueuePath2);
            batchSubscribeToQueue(createMutator, normalizeQueuePath2, queueId2, normalizeQueuePath, queueId, timestampInMicros);
            try {
                batchUpdateQueuePropertiesIndexes(createMutator, queueId2, normalizeQueuePath, queueId, getQueue(normalizeQueuePath, queueId).getProperties(), newTimeUUID);
            } catch (Exception e) {
                logger.error("Unable to update index", (Throwable) e);
            }
            queueSet.addQueue(normalizeQueuePath2, queueId2);
        }
        CassandraPersistenceUtils.batchExecute(createMutator, 5);
        return queueSet;
    }

    @Override // org.usergrid.mq.QueueManager
    public QueueSet unsubscribeFromQueues(String str, List<String> list) {
        String normalizeQueuePath = Queue.normalizeQueuePath(str);
        UUID queueId = Queue.getQueueId(normalizeQueuePath);
        UUID newTimeUUID = UUIDUtils.newTimeUUID();
        long timestampInMicros = UUIDUtils.getTimestampInMicros(newTimeUUID);
        Mutator<ByteBuffer> createMutator = HFactory.createMutator(this.cass.getApplicationKeyspace(this.applicationId), be);
        QueueSet queueSet = new QueueSet();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            String normalizeQueuePath2 = Queue.normalizeQueuePath(it.next());
            UUID queueId2 = Queue.getQueueId(normalizeQueuePath2);
            batchUnsubscribeFromQueue(createMutator, normalizeQueuePath2, queueId2, normalizeQueuePath, queueId, timestampInMicros);
            try {
                batchUpdateQueuePropertiesIndexes(createMutator, queueId2, normalizeQueuePath, queueId, MapUtils.emptyMapWithKeys(getQueue(normalizeQueuePath, queueId).getProperties()), newTimeUUID);
            } catch (Exception e) {
                logger.error("Unable to update index", (Throwable) e);
            }
            queueSet.addQueue(normalizeQueuePath2, queueId2);
        }
        CassandraPersistenceUtils.batchExecute(createMutator, 5);
        return queueSet;
    }

    @Override // org.usergrid.mq.QueueManager
    public void incrementAggregateQueueCounters(String str, String str2, String str3, long j) {
        long createTimestamp = this.cass.createTimestamp();
        Mutator<ByteBuffer> createMutator = HFactory.createMutator(this.cass.getApplicationKeyspace(this.applicationId), be);
        this.counterUtils.batchIncrementAggregateCounters(createMutator, this.applicationId, null, null, Queue.getQueueId(str), str2, str3, j, createTimestamp);
        CassandraPersistenceUtils.batchExecute(createMutator, 5);
    }

    public AggregateCounterSet getAggregateCounters(UUID uuid, String str, String str2, CounterResolution counterResolution, long j, long j2, boolean z) {
        long round = counterResolution.round(j);
        long round2 = counterResolution.round(j2);
        long j3 = round;
        SliceCounterQuery createCounterSliceQuery = HFactory.createCounterSliceQuery(this.cass.getApplicationKeyspace(this.applicationId), se, le);
        createCounterSliceQuery.setColumnFamily(ApplicationCF.APPLICATION_AGGREGATE_COUNTERS.getColumnFamily());
        createCounterSliceQuery.setRange(Long.valueOf(round), Long.valueOf(round2), false, ALL_COUNT);
        QueryResult<CounterSlice<N>> execute = createCounterSliceQuery.setKey(this.counterUtils.getAggregateCounterRow(str2, null, null, uuid, str, counterResolution)).execute();
        ArrayList arrayList = new ArrayList();
        for (HCounterColumn hCounterColumn : ((CounterSlice) execute.get()).getColumns()) {
            AggregateCounter aggregateCounter = new AggregateCounter(((Long) hCounterColumn.getName()).longValue(), hCounterColumn.getValue().longValue());
            if (z && counterResolution != CounterResolution.ALL) {
                while (aggregateCounter.getTimestamp() != j3) {
                    arrayList.add(new AggregateCounter(j3, 0L));
                    j3 = counterResolution.next(j3);
                }
                j3 = counterResolution.next(j3);
            }
            arrayList.add(aggregateCounter);
        }
        if (z && counterResolution != CounterResolution.ALL) {
            while (j3 <= round2) {
                arrayList.add(new AggregateCounter(j3, 0L));
                j3 = counterResolution.next(j3);
            }
        }
        return new AggregateCounterSet(str2, uuid, str, arrayList);
    }

    public List<AggregateCounterSet> getAggregateCounters(UUID uuid, CounterQuery counterQuery) throws Exception {
        CounterResolution resolution = counterQuery.getResolution();
        if (resolution == null) {
            resolution = CounterResolution.ALL;
        }
        long longValue = counterQuery.getStartTime() != null ? counterQuery.getStartTime().longValue() : 0L;
        long longValue2 = counterQuery.getFinishTime() != null ? counterQuery.getFinishTime().longValue() : 0L;
        boolean isPad = counterQuery.isPad();
        if (longValue <= 0) {
            longValue = 0;
        }
        if (longValue2 <= 0 || longValue2 < longValue) {
            longValue2 = System.currentTimeMillis();
        }
        long round = resolution.round(longValue);
        long round2 = resolution.round(longValue2);
        if (isPad && resolution != CounterResolution.ALL && (round2 - round) / resolution.interval() > 1000) {
            round2 = resolution.round(round + (resolution.interval() * 1000));
        }
        List<Query.CounterFilterPredicate> counterFilters = counterQuery.getCounterFilters();
        if (counterFilters == null) {
            return null;
        }
        HashMap hashMap = new HashMap();
        Keyspace applicationKeyspace = this.cass.getApplicationKeyspace(this.applicationId);
        for (Query.CounterFilterPredicate counterFilterPredicate : counterFilters) {
            CounterUtils.AggregateCounterSelection aggregateCounterSelection = new CounterUtils.AggregateCounterSelection(counterFilterPredicate.getName(), null, null, uuid, counterFilterPredicate.getCategory());
            hashMap.put(aggregateCounterSelection.getRow(resolution), aggregateCounterSelection);
        }
        MultigetSliceCounterQuery createMultigetSliceCounterQuery = HFactory.createMultigetSliceCounterQuery(applicationKeyspace, se, le);
        createMultigetSliceCounterQuery.setColumnFamily(ApplicationCF.APPLICATION_AGGREGATE_COUNTERS.getColumnFamily());
        createMultigetSliceCounterQuery.setRange(Long.valueOf(round), Long.valueOf(round2), false, ALL_COUNT);
        QueryResult<CounterRows<K, N>> execute = createMultigetSliceCounterQuery.setKeys(hashMap.keySet()).execute();
        ArrayList arrayList = new ArrayList();
        for (CounterRow counterRow : (CounterRows) execute.get()) {
            long j = round;
            ArrayList arrayList2 = new ArrayList();
            for (HCounterColumn hCounterColumn : counterRow.getColumnSlice().getColumns()) {
                AggregateCounter aggregateCounter = new AggregateCounter(((Long) hCounterColumn.getName()).longValue(), hCounterColumn.getValue().longValue());
                if (isPad && resolution != CounterResolution.ALL) {
                    while (aggregateCounter.getTimestamp() != j) {
                        arrayList2.add(new AggregateCounter(j, 0L));
                        j = resolution.next(j);
                    }
                    j = resolution.next(j);
                }
                arrayList2.add(aggregateCounter);
            }
            if (isPad && resolution != CounterResolution.ALL) {
                while (j <= round2) {
                    arrayList2.add(new AggregateCounter(j, 0L));
                    j = resolution.next(j);
                }
            }
            CounterUtils.AggregateCounterSelection aggregateCounterSelection2 = (CounterUtils.AggregateCounterSelection) hashMap.get(counterRow.getKey());
            arrayList.add(new AggregateCounterSet(aggregateCounterSelection2.getName(), uuid, aggregateCounterSelection2.getCategory(), arrayList2));
        }
        Collections.sort(arrayList, new Comparator<AggregateCounterSet>() { // from class: org.usergrid.mq.cassandra.QueueManagerImpl.1
            @Override // java.util.Comparator
            public int compare(AggregateCounterSet aggregateCounterSet, AggregateCounterSet aggregateCounterSet2) {
                return aggregateCounterSet.getName().compareTo(aggregateCounterSet2.getName());
            }
        });
        return arrayList;
    }

    @Override // org.usergrid.mq.QueueManager
    public Results getAggregateQueueCounters(String str, String str2, String str3, CounterResolution counterResolution, long j, long j2, boolean z) {
        return Results.fromCounters(getAggregateCounters(Queue.getQueueId(str), str2, str3, counterResolution, j, j2, z));
    }

    @Override // org.usergrid.mq.QueueManager
    public Results getAggregateQueueCounters(String str, CounterQuery counterQuery) throws Exception {
        return Results.fromCounters(getAggregateCounters(Queue.getQueueId(str), counterQuery));
    }

    @Override // org.usergrid.mq.QueueManager
    public void incrementQueueCounters(String str, Map<String, Long> map) {
        long createTimestamp = this.cass.createTimestamp();
        Mutator<ByteBuffer> createMutator = HFactory.createMutator(this.cass.getApplicationKeyspace(this.applicationId), be);
        this.counterUtils.batchIncrementQueueCounters(createMutator, Queue.getQueueId(str), map, createTimestamp, this.applicationId);
        CassandraPersistenceUtils.batchExecute(createMutator, 5);
    }

    @Override // org.usergrid.mq.QueueManager
    public void incrementQueueCounter(String str, String str2, long j) {
        long createTimestamp = this.cass.createTimestamp();
        Mutator<ByteBuffer> createMutator = HFactory.createMutator(this.cass.getApplicationKeyspace(this.applicationId), be);
        this.counterUtils.batchIncrementQueueCounter(createMutator, Queue.getQueueId(str), str2, j, createTimestamp, this.applicationId);
        CassandraPersistenceUtils.batchExecute(createMutator, 5);
    }

    public Map<String, Long> getQueueCounters(UUID uuid) throws Exception {
        HashMap hashMap = new HashMap();
        SliceCounterQuery createCounterSliceQuery = HFactory.createCounterSliceQuery(this.cass.getApplicationKeyspace(this.applicationId), ue, se);
        createCounterSliceQuery.setColumnFamily(QueuesCF.COUNTERS.getColumnFamily());
        createCounterSliceQuery.setRange(null, null, false, ALL_COUNT);
        for (HCounterColumn hCounterColumn : ((CounterSlice) createCounterSliceQuery.setKey(uuid).execute().get()).getColumns()) {
            hashMap.put(hCounterColumn.getName(), hCounterColumn.getValue());
        }
        return hashMap;
    }

    @Override // org.usergrid.mq.QueueManager
    public Map<String, Long> getQueueCounters(String str) throws Exception {
        return getQueueCounters(Queue.getQueueId(str));
    }

    @Override // org.usergrid.mq.QueueManager
    public Set<String> getQueueCounterNames(String str) throws Exception {
        HashSet hashSet = new HashSet();
        SliceQuery createSliceQuery = HFactory.createSliceQuery(this.cass.getApplicationKeyspace(this.applicationId), se, se, be);
        createSliceQuery.setColumnFamily(QueuesCF.QUEUE_DICTIONARIES.toString());
        createSliceQuery.setKey(CassandraPersistenceUtils.key(Queue.getQueueId(str), Schema.DICTIONARY_COUNTERS).toString());
        createSliceQuery.setRange(null, null, false, ALL_COUNT);
        Iterator it = ((ColumnSlice) createSliceQuery.execute().get()).getColumns().iterator();
        while (it.hasNext()) {
            hashSet.add(((HColumn) it.next()).getName());
        }
        return hashSet;
    }

    public Queue getQueue(String str, UUID uuid) {
        SliceQuery createSliceQuery = HFactory.createSliceQuery(this.cass.getApplicationKeyspace(this.applicationId), ue, se, be);
        createSliceQuery.setColumnFamily(QueuesCF.QUEUE_PROPERTIES.getColumnFamily());
        createSliceQuery.setKey(uuid);
        createSliceQuery.setRange(null, null, false, ALL_COUNT);
        return CassandraMQUtils.deserializeQueue(((ColumnSlice) createSliceQuery.execute().get()).getColumns());
    }

    @Override // org.usergrid.mq.QueueManager
    public Queue getQueue(String str) {
        return getQueue(str, Queue.getQueueId(str));
    }

    @Override // org.usergrid.mq.QueueManager
    public Queue updateQueue(String str, Queue queue) {
        queue.setPath(str);
        UUID newTimeUUID = UUIDUtils.newTimeUUID();
        long timestampInMicros = UUIDUtils.getTimestampInMicros(newTimeUUID);
        Mutator<ByteBuffer> createMutator = HFactory.createMutator(this.cass.getApplicationKeyspace(this.applicationId), be);
        CassandraMQUtils.addQueueToMutator(createMutator, queue, timestampInMicros);
        try {
            batchUpdateQueuePropertiesIndexes(createMutator, str, queue.getUuid(), queue.getProperties(), newTimeUUID);
        } catch (Exception e) {
            logger.error("Unable to update queue", (Throwable) e);
        }
        createMutator.addInsertion((Mutator<ByteBuffer>) ConversionUtils.bytebuffer(queue.getUuid()), QueuesCF.QUEUE_PROPERTIES.getColumnFamily(), HFactory.createColumn("created", Long.valueOf(timestampInMicros / 1000), Long.MAX_VALUE - timestampInMicros, se, le));
        createMutator.addInsertion((Mutator<ByteBuffer>) ConversionUtils.bytebuffer(queue.getUuid()), QueuesCF.QUEUE_PROPERTIES.getColumnFamily(), HFactory.createColumn("modified", Long.valueOf(timestampInMicros / 1000), timestampInMicros, se, le));
        CassandraPersistenceUtils.batchExecute(createMutator, 5);
        return queue;
    }

    @Override // org.usergrid.mq.QueueManager
    public Queue updateQueue(String str, Map<String, Object> map) {
        return updateQueue(str, new Queue(map));
    }

    public void batchUpdateQueuePropertiesIndexes(Mutator<ByteBuffer> mutator, String str, UUID uuid, Map<String, Object> map, UUID uuid2) throws Exception {
        QueueSet subscriptions = getSubscriptions(str, null, ALL_COUNT);
        if (subscriptions != null) {
            for (Map.Entry<String, Object> entry : map.entrySet()) {
                if (!Queue.QUEUE_PROPERTIES.containsKey(entry.getKey())) {
                    QueueIndexUpdate batchStartQueueIndexUpdate = batchStartQueueIndexUpdate(mutator, str, uuid, entry.getKey(), entry.getValue(), uuid2);
                    Iterator<QueueSet.QueueInfo> it = subscriptions.getQueues().iterator();
                    while (it.hasNext()) {
                        batchUpdateQueueIndex(batchStartQueueIndexUpdate, it.next().getUuid());
                    }
                }
            }
        }
    }

    public void batchUpdateQueuePropertiesIndexes(Mutator<ByteBuffer> mutator, UUID uuid, String str, UUID uuid2, Map<String, Object> map, UUID uuid3) throws Exception {
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            if (!Queue.QUEUE_PROPERTIES.containsKey(entry.getKey())) {
                batchUpdateQueueIndex(batchStartQueueIndexUpdate(mutator, str, uuid2, entry.getKey(), entry.getValue(), uuid3), uuid);
            }
        }
    }

    public QueueIndexUpdate batchUpdateQueueIndex(QueueIndexUpdate queueIndexUpdate, UUID uuid) throws Exception {
        logger.info("batchUpdateQueueIndex");
        Mutator<ByteBuffer> batch = queueIndexUpdate.getBatch();
        CassandraPersistenceUtils.key(uuid, queueIndexUpdate.getEntryName());
        for (QueueIndexUpdate.QueueIndexEntry queueIndexEntry : queueIndexUpdate.getPrevEntries()) {
            if (queueIndexEntry.getValue() != null) {
                batch.addDeletion(ConversionUtils.bytebuffer(CassandraPersistenceUtils.key(uuid, queueIndexEntry.getPath())), QueuesCF.PROPERTY_INDEX.getColumnFamily(), queueIndexEntry.getIndexComposite(), dce, queueIndexUpdate.getTimestamp());
            } else {
                logger.error("Unexpected condition - deserialized property value is null");
            }
        }
        if (queueIndexUpdate.getNewEntries().size() > 0) {
            for (QueueIndexUpdate.QueueIndexEntry queueIndexEntry2 : queueIndexUpdate.getNewEntries()) {
                batch.addInsertion((Mutator<ByteBuffer>) ConversionUtils.bytebuffer(CassandraPersistenceUtils.key(uuid, queueIndexEntry2.getPath())), QueuesCF.PROPERTY_INDEX.getColumnFamily(), HFactory.createColumn(queueIndexEntry2.getIndexComposite(), ByteBuffer.allocate(0), queueIndexUpdate.getTimestamp(), dce, be));
            }
        }
        Iterator<String> it = queueIndexUpdate.getIndexesSet().iterator();
        while (it.hasNext()) {
            batch.addInsertion((Mutator<ByteBuffer>) ConversionUtils.bytebuffer(CassandraPersistenceUtils.key(uuid, DICTIONARY_SUBSCRIBER_INDEXES)), QueuesCF.QUEUE_DICTIONARIES.getColumnFamily(), HFactory.createColumn(it.next(), ByteBuffer.allocate(0), queueIndexUpdate.getTimestamp(), se, be));
        }
        return queueIndexUpdate;
    }

    public QueueIndexUpdate batchStartQueueIndexUpdate(Mutator<ByteBuffer> mutator, String str, UUID uuid, String str2, Object obj, UUID uuid2) throws Exception {
        long timestampInMicros = UUIDUtils.getTimestampInMicros(uuid2);
        QueueIndexUpdate queueIndexUpdate = new QueueIndexUpdate(mutator, str, uuid, str2, obj, uuid2);
        List<HColumn> columns = ((ColumnSlice) HFactory.createSliceQuery(this.cass.getApplicationKeyspace(this.applicationId), ue, be, be).setColumnFamily(QueuesCF.PROPERTY_INDEX_ENTRIES.getColumnFamily()).setKey(uuid).setRange(DynamicComposite.toByteBuffer(str2), CompositeUtils.setGreaterThanEqualityFlag(new DynamicComposite(str2)).serialize(), false, 1000).execute().get()).getColumns();
        if (logger.isInfoEnabled()) {
            logger.info("Found {} previous index entries for {} of entity {}", new Object[]{Integer.valueOf(columns.size()), str2, uuid});
        }
        for (HColumn hColumn : columns) {
            DynamicComposite fromByteBuffer = DynamicComposite.fromByteBuffer(((ByteBuffer) hColumn.getName()).duplicate());
            Object obj2 = fromByteBuffer.get(2);
            UUID uuid3 = (UUID) fromByteBuffer.get(3);
            String str3 = fromByteBuffer.size() > 4 ? (String) fromByteBuffer.get(4) : null;
            if (obj2 != null) {
                String str4 = str2;
                if (str3 != null && str3.length() > 0) {
                    str4 = str2 + "." + str3;
                }
                queueIndexUpdate.addPrevEntry(str4, obj2, uuid3);
                mutator.addDeletion(ConversionUtils.bytebuffer(uuid), QueuesCF.PROPERTY_INDEX_ENTRIES.getColumnFamily(), ((ByteBuffer) hColumn.getName()).duplicate(), be, timestampInMicros);
            } else {
                logger.error("Unexpected condition - deserialized property value is null");
            }
        }
        if (QueueIndexUpdate.validIndexableValueOrJson(obj)) {
            List<Map.Entry<String, Object>> keyValueList = IndexUtils.getKeyValueList(str2, obj, false);
            for (Map.Entry<String, Object> entry : keyValueList) {
                if (QueueIndexUpdate.validIndexableValue(entry.getValue())) {
                    queueIndexUpdate.addNewEntry(entry.getKey(), QueueIndexUpdate.toIndexableValue(entry.getValue()));
                }
            }
            for (Map.Entry<String, Object> entry2 : keyValueList) {
                String key = entry2.getKey();
                if (key.startsWith(str2 + ".")) {
                    key = key.substring(str2.length() + 1);
                } else if (key.startsWith(str2)) {
                    key = key.substring(str2.length());
                }
                mutator.addInsertion((Mutator<ByteBuffer>) ConversionUtils.bytebuffer(uuid), QueuesCF.PROPERTY_INDEX_ENTRIES.getColumnFamily(), HFactory.createColumn(DynamicComposite.toByteBuffer(str2, Byte.valueOf(QueueIndexUpdate.indexValueCode(obj)), QueueIndexUpdate.toIndexableValue(obj), queueIndexUpdate.getTimestampUuid(), key), ByteBuffer.allocate(0), timestampInMicros, be, be));
                queueIndexUpdate.addIndex(entry2.getKey());
            }
            queueIndexUpdate.addIndex(str2);
        }
        return queueIndexUpdate;
    }

    public QueueSet searchQueueIndex(UUID uuid, QueryProcessor.QuerySlice querySlice, int i) throws Exception {
        ByteBuffer byteBuffer = null;
        if (querySlice.getCursor() != null) {
            byteBuffer = querySlice.getCursor();
        } else if (querySlice.getStart() != null) {
            DynamicComposite dynamicComposite = new DynamicComposite(Byte.valueOf(querySlice.getStart().getCode()), querySlice.getStart().getValue());
            if (!querySlice.getStart().isInclusive()) {
                CompositeUtils.setEqualityFlag(dynamicComposite, AbstractComposite.ComponentEquality.GREATER_THAN_EQUAL);
            }
            byteBuffer = dynamicComposite.serialize();
        }
        ByteBuffer byteBuffer2 = null;
        if (querySlice.getFinish() != null) {
            DynamicComposite dynamicComposite2 = new DynamicComposite(Byte.valueOf(querySlice.getFinish().getCode()), querySlice.getFinish().getValue());
            if (querySlice.getFinish().isInclusive()) {
                CompositeUtils.setEqualityFlag(dynamicComposite2, AbstractComposite.ComponentEquality.GREATER_THAN_EQUAL);
            }
            byteBuffer2 = dynamicComposite2.serialize();
        }
        if (querySlice.isReversed() && byteBuffer != null && byteBuffer2 != null) {
            ByteBuffer byteBuffer3 = byteBuffer;
            byteBuffer = byteBuffer2;
            byteBuffer2 = byteBuffer3;
        }
        List columns = ((ColumnSlice) HFactory.createSliceQuery(this.cass.getApplicationKeyspace(this.applicationId), be, be, be).setColumnFamily(QueuesCF.PROPERTY_INDEX.getColumnFamily()).setKey(ConversionUtils.bytebuffer(CassandraPersistenceUtils.key(uuid, querySlice.getPropertyName()))).setRange(byteBuffer, byteBuffer2, querySlice.isReversed(), i).execute().get()).getColumns();
        QueueSet queueSet = new QueueSet();
        Iterator it = columns.iterator();
        while (it.hasNext()) {
            DynamicComposite fromByteBuffer = DynamicComposite.fromByteBuffer((ByteBuffer) ((HColumn) it.next()).getName());
            queueSet.addQueue((String) fromByteBuffer.get(3, se), (UUID) fromByteBuffer.get(2, ue));
        }
        return queueSet;
    }

    @Override // org.usergrid.mq.QueueManager
    public QueueSet searchSubscribers(String str, Query query) {
        if (query == null) {
            query = new Query();
        }
        String normalizeQueuePath = Queue.normalizeQueuePath(str);
        UUID queueId = Queue.getQueueId(normalizeQueuePath);
        if (!query.hasFilterPredicates() && !query.hasSortPredicates()) {
            return getSubscribers(normalizeQueuePath, null, query.getLimit());
        }
        QueueSet queueSet = null;
        String str2 = null;
        List<QueryProcessor.QuerySlice> slices = new QueryProcessor(query).getSlices();
        int limit = query.getLimit() + 1;
        if (slices.size() > 1) {
            limit = 10000;
        }
        for (QueryProcessor.QuerySlice querySlice : slices) {
            QueueSet queueSet2 = null;
            try {
                queueSet2 = searchQueueIndex(queueId, querySlice, limit);
            } catch (Exception e) {
                logger.error("Error during search", (Throwable) e);
            }
            if (queueSet2 != null) {
                if (queueSet2.size() > query.getLimit()) {
                    queueSet2.setCursorToLastResult();
                }
                if (queueSet2.getCursor() != null) {
                    String str3 = str2 != null ? str2 + "|" : "";
                    int hashCode = querySlice.hashCode();
                    logger.info("Cursor hash code: {} ", Integer.valueOf(hashCode));
                    str2 = str3 + hashCode + ":" + queueSet2.getCursor();
                }
                if (queueSet != null) {
                    queueSet.and(queueSet2);
                } else {
                    queueSet = queueSet2;
                }
            }
        }
        return queueSet;
    }

    @Override // org.usergrid.mq.QueueManager
    public QueueSet getQueues(String str, int i) {
        return getSubscribers("/", str, i);
    }

    @Override // org.usergrid.mq.QueueManager
    public QueueSet getChildQueues(String str, String str2, int i) {
        return null;
    }

    @Override // org.usergrid.mq.QueueManager
    public UUID getNewConsumerId() {
        return null;
    }
}
