/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.mvcc;

import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
import org.apache.ignite.internal.processors.cache.mvcc.MvccTxEntry;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxKey;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;

public class MvccCachingManager
extends GridCacheSharedManagerAdapter {
    public static final int DFLT_MVCC_TX_SIZE_CACHING_THRESHOLD = 20000;
    public static final int TX_SIZE_THRESHOLD = IgniteSystemProperties.getInteger("IGNITE_MVCC_TX_SIZE_CACHING_THRESHOLD", 20000);
    private final Map<GridCacheVersion, EnlistBuffer> enlistCache = new ConcurrentHashMap<GridCacheVersion, EnlistBuffer>();
    private final Map<TxKey, AtomicInteger> cntrs = new ConcurrentHashMap<TxKey, AtomicInteger>();

    public void addEnlisted(KeyCacheObject key, @Nullable CacheObject val, long ttl, long expireTime, GridCacheVersion ver, CacheObject oldVal, boolean primary, AffinityTopologyVersion topVer, MvccVersion mvccVer, int cacheId, IgniteInternalTx tx, IgniteUuid futId, int batchNum) throws IgniteCheckedException {
        assert (key != null);
        assert (mvccVer != null);
        assert (tx != null);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Added entry to mvcc cache: [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ", primary=" + primary + ", mvccVer=" + mvccVer + ", cacheId=" + cacheId + ", ver=" + ver + ']');
        }
        if (!tx.txState().useMvccCaching(cacheId)) {
            return;
        }
        AtomicInteger cntr = this.cntrs.computeIfAbsent(new TxKey(mvccVer.coordinatorVersion(), mvccVer.counter()), v -> new AtomicInteger());
        if (cntr.incrementAndGet() > TX_SIZE_THRESHOLD) {
            throw new IgniteCheckedException("Transaction is too large. Consider reducing transaction size or turning off continuous queries and datacenter replication [size=" + cntr.get() + ", txXid=" + ver + ']');
        }
        MvccTxEntry e = new MvccTxEntry(key, val, ttl, expireTime, ver, oldVal, primary, topVer, mvccVer, cacheId);
        EnlistBuffer cached = this.enlistCache.computeIfAbsent(ver, v -> new EnlistBuffer());
        cached.add(primary ? null : futId, primary ? -1 : batchNum, e);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTxFinished(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException {
        Collection<PartitionUpdateCountersMessage> cntrsColl;
        if (this.log.isDebugEnabled()) {
            this.log.debug("Transaction finished: [commit=" + commit + ", tx=" + tx + ']');
        }
        if (tx.system() || tx.internal() || tx.mvccSnapshot() == null) {
            return;
        }
        this.cntrs.remove(new TxKey(tx.mvccSnapshot().coordinatorVersion(), tx.mvccSnapshot().counter()));
        EnlistBuffer buf = this.enlistCache.remove(tx.xidVersion());
        Map<Integer, Map<KeyCacheObject, MvccTxEntry>> allCached = buf == null ? null : buf.getCached();
        TxCounters txCntrs = tx.txCounters(false);
        Collection<PartitionUpdateCountersMessage> collection = cntrsColl = txCntrs == null ? null : txCntrs.updateCounters();
        if (txCntrs == null || F.isEmpty(cntrsColl)) {
            return;
        }
        GridIntList cacheIds = tx.txState().cacheIds();
        assert (cacheIds != null);
        for (int i = 0; i < cacheIds.size(); ++i) {
            int cacheId = cacheIds.get(i);
            GridCacheContext ctx0 = this.cctx.cacheContext(cacheId);
            assert (ctx0 != null);
            ctx0.group().listenerLock().readLock().lock();
            try {
                boolean hasListeners = ctx0.hasContinuousQueryListeners(tx);
                boolean drEnabled = ctx0.isDrEnabled();
                if (!hasListeners && !drEnabled) continue;
                Map<KeyCacheObject, MvccTxEntry> cached = allCached == null ? null : allCached.get(cacheId);
                Map<Integer, Map<Integer, T2<AtomicLong, Long>>> cntrsMap = this.countersPerPartition(cntrsColl);
                Map<Integer, T2<AtomicLong, Long>> cntrPerCache = cntrsMap.get(cacheId);
                if (F.isEmpty(cntrPerCache)) continue;
                boolean fakeEntries = false;
                if (F.isEmpty(cached)) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Transaction updates were not cached fully (this can happen when listener started during the transaction execution). [tx=" + tx + ']');
                    }
                    if (!hasListeners) continue;
                    cached = this.createFakeCachedEntries(cntrPerCache, tx, cacheId);
                    fakeEntries = true;
                }
                if (F.isEmpty(cached)) continue;
                for (Map.Entry<KeyCacheObject, MvccTxEntry> entry : cached.entrySet()) {
                    Map<UUID, CacheContinuousQueryListener> lsnrCol;
                    MvccTxEntry e = entry.getValue();
                    assert (e.key().partition() != -1);
                    assert (cntrPerCache != null);
                    assert (e.cacheId() == cacheId);
                    T2<AtomicLong, Long> cntr = cntrPerCache.get(e.key().partition());
                    long resCntr = ((AtomicLong)cntr.getKey()).incrementAndGet();
                    assert (resCntr <= (Long)cntr.getValue());
                    e.updateCounter(resCntr);
                    if (ctx0.group().sharedGroup()) {
                        ctx0.group().onPartitionCounterUpdate(cacheId, e.key().partition(), resCntr, tx.topologyVersion(), tx.local());
                    }
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Process cached entry:" + e);
                    }
                    if (ctx0.isDrEnabled() && !fakeEntries) {
                        ctx0.dr().replicate(e.key(), e.value(), e.ttl(), e.expireTime(), e.version(), tx.local() ? GridDrType.DR_PRIMARY : GridDrType.DR_BACKUP, e.topologyVersion());
                    }
                    CacheContinuousQueryManager contQryMgr = ctx0.continuousQueries();
                    if (!ctx0.continuousQueries().notifyContinuousQueries(tx) || F.isEmpty(lsnrCol = this.continuousQueryListeners(ctx0, tx))) continue;
                    contQryMgr.onEntryUpdated(lsnrCol, e.key(), commit ? e.value() : null, commit ? e.oldValue() : null, false, e.key().partition(), tx.local(), false, e.updateCounter(), null, e.topologyVersion());
                }
                continue;
            }
            finally {
                ctx0.group().listenerLock().readLock().unlock();
            }
        }
    }

    private Map<Integer, Map<Integer, T2<AtomicLong, Long>>> countersPerPartition(Collection<PartitionUpdateCountersMessage> cntrsColl) {
        HashMap<Integer, Map<Integer, T2<AtomicLong, Long>>> cntrsMap = new HashMap<Integer, Map<Integer, T2<AtomicLong, Long>>>();
        for (PartitionUpdateCountersMessage msg : cntrsColl) {
            for (int i = 0; i < msg.size(); ++i) {
                Map cntrPerPart = cntrsMap.computeIfAbsent(msg.cacheId(), k -> new HashMap());
                T2<AtomicLong, Long> prev = cntrPerPart.put(msg.partition(i), new T2<AtomicLong, Long>(new AtomicLong(msg.initialCounter(i)), msg.initialCounter(i) + msg.updatesCount(i)));
                assert (prev == null);
            }
        }
        return cntrsMap;
    }

    private Map<KeyCacheObject, MvccTxEntry> createFakeCachedEntries(Map<Integer, T2<AtomicLong, Long>> cntrPerCache, IgniteInternalTx tx, int cacheId) {
        HashMap<KeyCacheObject, MvccTxEntry> fakeCached = new HashMap<KeyCacheObject, MvccTxEntry>();
        for (Map.Entry<Integer, T2<AtomicLong, Long>> e : cntrPerCache.entrySet()) {
            int part = e.getKey();
            long startCntr = ((AtomicLong)e.getValue().get1()).get();
            long endCntr = ((AtomicLong)e.getValue().get1()).get() + (Long)e.getValue().get2();
            for (long i = startCntr; i < endCntr; ++i) {
                KeyCacheObjectImpl fakeKey = new KeyCacheObjectImpl("", null, part);
                MvccTxEntry fakeEntry = new MvccTxEntry(fakeKey, null, 0L, 0L, tx.xidVersion(), null, tx.local(), tx.topologyVersion(), tx.mvccSnapshot(), cacheId);
                fakeCached.put(fakeKey, fakeEntry);
            }
        }
        return fakeCached;
    }

    public Map<UUID, CacheContinuousQueryListener> continuousQueryListeners(GridCacheContext ctx0, @Nullable IgniteInternalTx tx) {
        return ctx0.continuousQueries().notifyContinuousQueries(tx) ? ctx0.continuousQueries().updateListeners(!ctx0.userCache(), false) : null;
    }

    private static class EnlistBuffer {
        private IgniteUuid lastFutId;
        @GridToStringInclude
        private Map<Integer, Map<KeyCacheObject, MvccTxEntry>> cached = new TreeMap<Integer, Map<KeyCacheObject, MvccTxEntry>>();
        @GridToStringInclude
        private SortedMap<Integer, Map<KeyCacheObject, MvccTxEntry>> pending;

        private EnlistBuffer() {
        }

        synchronized void add(IgniteUuid futId, int batchNum, MvccTxEntry e) {
            KeyCacheObject key = e.key();
            if (batchNum >= 0) {
                MvccTxEntry prev;
                if (this.lastFutId != null && !this.lastFutId.equals(futId)) {
                    this.lastFutId = futId;
                    this.flushPending();
                }
                if (this.pending == null) {
                    this.pending = new TreeMap<Integer, Map<KeyCacheObject, MvccTxEntry>>();
                }
                if ((prev = this.pending.computeIfAbsent(batchNum, k -> new LinkedHashMap()).put(key, e)) != null && prev.oldValue() != null) {
                    e.oldValue(prev.oldValue());
                }
            } else {
                assert (batchNum == -1);
                Map entriesForCache = this.cached.computeIfAbsent(e.cacheId(), k -> new LinkedHashMap());
                MvccTxEntry prev = entriesForCache.put(key, e);
                if (prev != null && prev.oldValue() != null) {
                    e.oldValue(prev.oldValue());
                }
            }
        }

        synchronized Map<Integer, Map<KeyCacheObject, MvccTxEntry>> getCached() {
            this.flushPending();
            return this.cached;
        }

        private void flushPending() {
            if (F.isEmpty(this.pending)) {
                return;
            }
            for (Map.Entry<Integer, Map<KeyCacheObject, MvccTxEntry>> entry : this.pending.entrySet()) {
                Map<KeyCacheObject, MvccTxEntry> vals = entry.getValue();
                for (Map.Entry<KeyCacheObject, MvccTxEntry> e : vals.entrySet()) {
                    Map entriesForCache = this.cached.computeIfAbsent(e.getValue().cacheId(), k -> new LinkedHashMap());
                    MvccTxEntry prev = entriesForCache.put(e.getKey(), e.getValue());
                    if (prev == null || prev.oldValue() == null) continue;
                    e.getValue().oldValue(prev.oldValue());
                }
            }
            this.pending.clear();
        }

        public String toString() {
            return S.toString(EnlistBuffer.class, this);
        }
    }
}

