/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.store;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.cache.Cache;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.util.deque.FastSizeDeque;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;

public class GridCacheWriteBehindStore<K, V>
implements CacheStore<K, V>,
LifecycleAware {
    public static final int DFLT_INITIAL_CAPACITY = 1024;
    public static final float CACHE_OVERFLOW_RATIO = 1.5f;
    public static final int DFLT_CONCUR_LVL = 64;
    private int initCap = 1024;
    private int concurLvl = 64;
    private int cacheMaxSize = 10240;
    private int cacheCriticalSize;
    private int flushThreadCnt = 1;
    private boolean flushThreadCntIsPowerOfTwo;
    private long cacheFlushFreq = 5000L;
    private int batchSize = 512;
    private final String igniteInstanceName;
    private final String cacheName;
    private final CacheStore<K, V> store;
    private ConcurrentLinkedHashMap<K, StatefulValue<K, V>> writeCache;
    private Flusher[] flushThreads;
    private boolean writeCoalescing = true;
    private AtomicBoolean stopping = new AtomicBoolean(true);
    private AtomicInteger cacheTotalOverflowCntr = new AtomicInteger();
    private AtomicInteger cacheOverflowCntr = new AtomicInteger();
    private AtomicInteger retryEntriesCnt = new AtomicInteger();
    private final IgniteLogger log;
    private final CacheStoreManager storeMgr;
    private final Lock flushLock = new ReentrantLock();
    private Condition canFlush = this.flushLock.newCondition();

    public GridCacheWriteBehindStore(CacheStoreManager storeMgr, String igniteInstanceName, String cacheName, IgniteLogger log, CacheStore<K, V> store) {
        this.storeMgr = storeMgr;
        this.igniteInstanceName = igniteInstanceName;
        this.cacheName = cacheName;
        this.log = log;
        this.store = store;
    }

    public void setInitialCapacity(int initCap) {
        this.initCap = initCap;
    }

    public void setConcurrencyLevel(int concurLvl) {
        this.concurLvl = concurLvl;
    }

    public void setFlushSize(int cacheMaxSize) {
        this.cacheMaxSize = cacheMaxSize;
    }

    public int getWriteBehindFlushSize() {
        return this.cacheMaxSize;
    }

    public void setFlushThreadCount(int flushThreadCnt) {
        this.flushThreadCnt = flushThreadCnt;
        this.flushThreadCntIsPowerOfTwo = U.isPow2(flushThreadCnt);
    }

    public int getWriteBehindFlushThreadCount() {
        return this.flushThreadCnt;
    }

    public void setWriteCoalescing(boolean writeCoalescing) {
        this.writeCoalescing = writeCoalescing;
    }

    public boolean getWriteCoalescing() {
        return this.writeCoalescing;
    }

    public void setFlushFrequency(long cacheFlushFreq) {
        this.cacheFlushFreq = cacheFlushFreq;
    }

    public long getWriteBehindFlushFrequency() {
        return this.cacheFlushFreq;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    public int getWriteBehindStoreBatchSize() {
        return this.batchSize;
    }

    public int getWriteBehindBufferSize() {
        if (this.writeCoalescing) {
            return this.writeCache.sizex();
        }
        int size = 0;
        for (Flusher f : this.flushThreads) {
            size += f.size();
        }
        return size;
    }

    public CacheStore<K, V> store() {
        return this.store;
    }

    @Override
    public void start() {
        assert (this.cacheFlushFreq != 0L || this.cacheMaxSize != 0);
        if (this.stopping.compareAndSet(true, false)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Starting write-behind store for cache '" + this.cacheName + '\'');
            }
            this.cacheCriticalSize = (int)((float)this.cacheMaxSize * 1.5f);
            if (this.cacheCriticalSize == 0) {
                this.cacheCriticalSize = 16384;
            }
            this.flushThreads = new Flusher[this.flushThreadCnt];
            if (this.writeCoalescing) {
                this.writeCache = new ConcurrentLinkedHashMap(this.initCap, 0.75f, this.concurLvl);
            }
            for (int i = 0; i < this.flushThreads.length; ++i) {
                this.flushThreads[i] = new Flusher(this.igniteInstanceName, "flusher-" + i, this.log);
                this.flushThreads[i].start();
            }
        }
    }

    public int getWriteBehindTotalCriticalOverflowCount() {
        return this.cacheTotalOverflowCntr.get();
    }

    public int getWriteBehindCriticalOverflowCount() {
        return this.cacheOverflowCntr.get();
    }

    public int getWriteBehindErrorRetryCount() {
        return this.retryEntriesCnt.get();
    }

    @Override
    public void stop() {
        if (this.stopping.compareAndSet(false, true)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Stopping write-behind store for cache '" + this.cacheName + '\'');
            }
            for (Flusher f : this.flushThreads) {
                if (f.isEmpty()) continue;
                f.wakeUp();
            }
            boolean graceful = true;
            for (Flusher worker : this.flushThreads) {
                graceful &= U.join(worker, this.log);
            }
            if (!graceful) {
                this.log.warning("Write behind store shutdown was aborted.");
            }
        }
    }

    public void forceFlush() throws IgniteCheckedException {
        for (Flusher f : this.flushThreads) {
            if (f.isEmpty()) continue;
            f.wakeUp();
        }
    }

    @Override
    public void loadCache(IgniteBiInClosure<K, V> clo, Object ... args) {
        this.store.loadCache(clo, args);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<K, V> loadAll(Iterable<? extends K> keys) {
        Map loaded0;
        if (this.log.isDebugEnabled()) {
            this.log.debug(S.toString("Store load all", "keys", keys, true));
        }
        HashMap loaded = new HashMap();
        ArrayList<K> remaining = null;
        for (K key : keys) {
            StatefulValue val = this.writeCoalescing ? this.writeCache.get(key) : (StatefulValue)this.flusher(key).flusherWriteMap.get(key);
            if (val != null) {
                val.readLock().lock();
                try {
                    Object value;
                    StoreOperation op;
                    if (this.writeCoalescing && val.nextOperation() != null) {
                        op = val.nextOperation();
                        value = op == StoreOperation.PUT ? (Object)val.nextEntry().getValue() : null;
                    } else {
                        op = val.operation();
                        Object v = value = op == StoreOperation.PUT ? (Object)val.entry().getValue() : null;
                    }
                    if (op == StoreOperation.PUT) {
                        loaded.put(key, value);
                        continue;
                    }
                    assert (op == StoreOperation.RMV) : op;
                    continue;
                }
                finally {
                    val.readLock().unlock();
                    continue;
                }
            }
            if (remaining == null) {
                remaining = new ArrayList<K>();
            }
            remaining.add(key);
        }
        if (remaining != null && !remaining.isEmpty() && (loaded0 = this.store.loadAll(remaining)) != null) {
            loaded.putAll(loaded0);
        }
        return loaded;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public V load(K key) {
        StatefulValue val;
        if (this.log.isDebugEnabled()) {
            this.log.debug(S.toString("Store load", "key", key, true));
        }
        if ((val = this.writeCoalescing ? this.writeCache.get(key) : (StatefulValue)this.flusher(key).flusherWriteMap.get(key)) != null) {
            val.readLock().lock();
            try {
                StoreOperation op;
                V value = this.writeCoalescing && val.nextOperation() != null ? ((op = val.nextOperation()) == StoreOperation.PUT ? (V)val.nextEntry().getValue() : null) : ((op = val.operation()) == StoreOperation.PUT ? (V)val.entry().getValue() : null);
                switch (op) {
                    case PUT: {
                        V v = value;
                        return v;
                    }
                    case RMV: {
                        V v = null;
                        return v;
                    }
                }
                assert (false) : "Unexpected operation: " + (Object)((Object)StatefulValue.access$500(val));
            }
            finally {
                val.readLock().unlock();
            }
        }
        return this.store.load(key);
    }

    @Override
    public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) {
        for (Cache.Entry<K, V> entry : entries) {
            this.write(entry);
        }
    }

    @Override
    public void write(Cache.Entry<? extends K, ? extends V> entry) {
        try {
            if (this.log.isDebugEnabled()) {
                this.log.debug(S.toString("Store put", "key", entry.getKey(), true, "val", entry.getValue(), true));
            }
            this.updateCache(entry.getKey(), entry, StoreOperation.PUT);
        }
        catch (IgniteInterruptedCheckedException e) {
            throw new CacheWriterException(U.convertExceptionNoWrap(e));
        }
    }

    @Override
    public void deleteAll(Collection<?> keys) {
        for (Object key : keys) {
            this.delete(key);
        }
    }

    @Override
    public void delete(Object key) {
        try {
            if (this.log.isDebugEnabled()) {
                this.log.debug(S.toString("Store remove", "key", key, true));
            }
            this.updateCache(key, null, StoreOperation.RMV);
        }
        catch (IgniteInterruptedCheckedException e) {
            throw new CacheWriterException(U.convertExceptionNoWrap(e));
        }
    }

    @Override
    public void sessionEnd(boolean commit) {
    }

    public String toString() {
        return S.toString(GridCacheWriteBehindStore.class, this);
    }

    private void updateCache(K key, @Nullable Cache.Entry<? extends K, ? extends V> val, StoreOperation operation) throws IgniteInterruptedCheckedException {
        StatefulValue newVal = new StatefulValue(val, operation);
        if (this.writeCoalescing) {
            this.putToWriteCache(key, newVal);
        } else {
            this.flusher(key).putToFlusherWriteCache(key, newVal);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void putToWriteCache(K key, StatefulValue<K, V> newVal) throws IgniteInterruptedCheckedException {
        int cacheSize;
        StatefulValue<K, V> prev;
        assert (this.writeCoalescing) : "Unexpected write coalescing.";
        while ((prev = this.writeCache.putIfAbsent(key, newVal)) != null) {
            prev.writeLock().lock();
            try {
                if (((StatefulValue)prev).status() == ValueStatus.PENDING || ((StatefulValue)prev).status() == ValueStatus.PENDING_AND_UPDATED) {
                    ((StatefulValue)prev).setNext(((StatefulValue)newVal).val, ((StatefulValue)newVal).storeOperation);
                    ((StatefulValue)prev).status(ValueStatus.PENDING_AND_UPDATED);
                    break;
                }
                if (((StatefulValue)prev).status() == ValueStatus.FLUSHED) continue;
                if (((StatefulValue)prev).status() == ValueStatus.RETRY) {
                    this.retryEntriesCnt.decrementAndGet();
                }
                assert (((StatefulValue)prev).status() == ValueStatus.NEW || ((StatefulValue)prev).status() == ValueStatus.RETRY);
                ((StatefulValue)prev).update(((StatefulValue)newVal).val, ((StatefulValue)newVal).operation(), ValueStatus.NEW);
                break;
            }
            finally {
                prev.writeLock().unlock();
            }
        }
        if ((cacheSize = this.getWriteBehindBufferSize()) > this.cacheCriticalSize) {
            this.flushSingleValue();
        } else if (this.cacheMaxSize > 0 && cacheSize > this.cacheMaxSize) {
            this.wakeUp();
        }
    }

    private Flusher flusher(K key) {
        return this.flushThreads[this.resolveFlusherByKeyHash(key.hashCode())];
    }

    int resolveFlusherByKeyHash(int hash) {
        int h = hash ^ hash >>> 16;
        return this.flushThreadCntIsPowerOfTwo ? h & this.flushThreadCnt - 1 : U.hashToIndex(h, this.flushThreadCnt);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flushSingleValue() {
        this.cacheOverflowCntr.incrementAndGet();
        try {
            for (Map.Entry<K, StatefulValue<K, V>> e : this.writeCache.entrySet()) {
                Map<K, StatefulValue<K, V>> batch;
                StatefulValue<K, V> val = e.getValue();
                val.writeLock().lock();
                try {
                    ValueStatus status = ((StatefulValue)val).status();
                    if (this.acquired(status)) continue;
                    if (((StatefulValue)val).status() == ValueStatus.RETRY) {
                        this.retryEntriesCnt.decrementAndGet();
                    }
                    assert (this.retryEntriesCnt.get() >= 0);
                    ((StatefulValue)val).status(ValueStatus.PENDING);
                    batch = Collections.singletonMap(e.getKey(), val);
                }
                finally {
                    val.writeLock().unlock();
                    continue;
                }
                if (batch.isEmpty()) continue;
                this.applyBatch(batch, false, null);
                this.cacheTotalOverflowCntr.incrementAndGet();
                return;
            }
        }
        finally {
            this.cacheOverflowCntr.decrementAndGet();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes, Flusher flusher) {
        assert (valMap.size() <= this.batchSize);
        assert (!valMap.isEmpty());
        StoreOperation operation = null;
        LinkedHashMap<K, Cache.Entry> batch = U.newLinkedHashMap(valMap.size());
        for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) {
            StatefulValue<K, V> statefulValue = e.getValue();
            statefulValue.readLock().lock();
            try {
                if (operation == null) {
                    operation = ((StatefulValue)statefulValue).operation();
                }
                assert (operation == ((StatefulValue)statefulValue).operation());
                assert (((StatefulValue)statefulValue).status() == ValueStatus.PENDING || ((StatefulValue)statefulValue).status() == ValueStatus.PENDING_AND_UPDATED);
                batch.put(e.getKey(), ((StatefulValue)statefulValue).entry());
            }
            finally {
                statefulValue.readLock().unlock();
            }
        }
        boolean result = this.updateStore(operation, batch, initSes, flusher);
        if (result) {
            for (Map.Entry entry : valMap.entrySet()) {
                StatefulValue val = (StatefulValue)entry.getValue();
                val.writeLock().lock();
                try {
                    if (this.writeCoalescing) {
                        if (val.status() == ValueStatus.PENDING_AND_UPDATED) {
                            val.update(val.nextEntry(), val.nextOperation(), ValueStatus.NEW);
                            val.setNext(null, null);
                        } else {
                            val.status(ValueStatus.FLUSHED);
                            StatefulValue<K, V> prev = this.writeCache.remove(entry.getKey());
                            assert (prev == val) : "Map value for key " + entry.getKey() + " was updated during flush";
                        }
                        val.signalFlushed();
                        continue;
                    }
                    val.status(ValueStatus.FLUSHED);
                    Flusher f = this.flusher(entry.getKey());
                    f.flusherWriteMap.remove(entry.getKey(), entry.getValue());
                    val.signalFlushed();
                }
                finally {
                    val.writeLock().unlock();
                }
            }
        } else {
            for (StatefulValue statefulValue : valMap.values()) {
                statefulValue.writeLock().lock();
                try {
                    if (statefulValue.status() == ValueStatus.PENDING_AND_UPDATED) {
                        statefulValue.update(statefulValue.nextEntry(), statefulValue.nextOperation(), ValueStatus.NEW);
                        statefulValue.setNext(null, null);
                    } else {
                        statefulValue.status(ValueStatus.RETRY);
                        this.retryEntriesCnt.incrementAndGet();
                    }
                    statefulValue.signalFlushed();
                }
                finally {
                    statefulValue.writeLock().unlock();
                }
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean updateStore(StoreOperation operation, Map<K, Cache.Entry<? extends K, ? extends V>> vals, boolean initSes, Flusher flusher) {
        boolean bl;
        block18: {
            if (this.storeMgr != null) {
                if (initSes) {
                    this.storeMgr.writeBehindSessionInit();
                } else {
                    this.storeMgr.writeBehindCacheStoreSessionListenerStart();
                }
            }
            boolean threwEx = true;
            try {
                switch (operation) {
                    case PUT: {
                        this.store.writeAll(vals.values());
                        break;
                    }
                    case RMV: {
                        this.store.deleteAll(vals.keySet());
                        break;
                    }
                    default: {
                        assert (false) : "Unexpected operation: " + (Object)((Object)operation);
                        break;
                    }
                }
                threwEx = false;
                bl = true;
                if (!initSes || this.storeMgr == null) break block18;
            }
            catch (Throwable throwable) {
                try {
                    if (initSes && this.storeMgr != null) {
                        this.storeMgr.writeBehindSessionEnd(threwEx);
                    }
                    throw throwable;
                }
                catch (Exception e) {
                    boolean overflow;
                    LT.warn(this.log, e, "Unable to update underlying store: " + this.store, false, false);
                    if (this.writeCoalescing) {
                        overflow = this.writeCache.sizex() > this.cacheCriticalSize || this.stopping.get();
                    } else {
                        boolean bl2 = overflow = flusher.isOverflowed() || this.stopping.get();
                    }
                    if (overflow) {
                        for (Map.Entry<K, Cache.Entry<K, V>> entry : vals.entrySet()) {
                            Object val = entry.getValue() != null ? entry.getValue().getValue() : null;
                            this.log.error("Failed to update store (value will be lost as current buffer size is greater than 'cacheCriticalSize' or node has been stopped before store was repaired) [" + (GridToStringBuilder.includeSensitive() ? "key=" + entry.getKey() + ", val=" + val + ", " : "") + "op=" + (Object)((Object)operation) + "]");
                        }
                        return true;
                    }
                    return false;
                }
            }
            this.storeMgr.writeBehindSessionEnd(threwEx);
        }
        return bl;
    }

    private void wakeUp() {
        this.flushLock.lock();
        try {
            this.canFlush.signalAll();
        }
        finally {
            this.flushLock.unlock();
        }
    }

    Map<K, StatefulValue<K, V>> writeCache() {
        return this.writeCache;
    }

    Map<K, StatefulValue<K, V>>[] flusherMaps() {
        Map[] result = new Map[this.flushThreadCnt];
        for (int i = 0; i < this.flushThreadCnt; ++i) {
            result[i] = this.flushThreads[i].flusherWriteMap;
        }
        return result;
    }

    private boolean acquired(ValueStatus status) {
        return status == ValueStatus.PENDING || status == ValueStatus.FLUSHED || status == ValueStatus.PENDING_AND_UPDATED;
    }

    private static class StatefulValue<K, V>
    extends ReentrantReadWriteLock {
        private static final long serialVersionUID = 0L;
        @GridToStringInclude(sensitive=true)
        private Cache.Entry<? extends K, ? extends V> val;
        @GridToStringInclude(sensitive=true)
        private Cache.Entry<? extends K, ? extends V> nextVal;
        private StoreOperation storeOperation;
        private StoreOperation nextStoreOperation;
        private ValueStatus valStatus;
        private Condition flushCond = this.writeLock().newCondition();

        private StatefulValue(Cache.Entry<? extends K, ? extends V> val, StoreOperation storeOperation) {
            assert (storeOperation == StoreOperation.PUT || storeOperation == StoreOperation.RMV);
            this.val = val;
            this.storeOperation = storeOperation;
            this.valStatus = ValueStatus.NEW;
        }

        private Cache.Entry<? extends K, ? extends V> nextEntry() {
            return this.nextVal;
        }

        private StoreOperation nextOperation() {
            return this.nextStoreOperation;
        }

        private Cache.Entry<? extends K, ? extends V> entry() {
            return this.val;
        }

        private StoreOperation operation() {
            return this.storeOperation;
        }

        private ValueStatus status() {
            return this.valStatus;
        }

        private void status(ValueStatus valStatus) {
            this.valStatus = valStatus;
        }

        private void update(@Nullable Cache.Entry<? extends K, ? extends V> val, StoreOperation storeOperation, ValueStatus valStatus) {
            this.val = val;
            this.storeOperation = storeOperation;
            this.valStatus = valStatus;
        }

        private void setNext(@Nullable Cache.Entry<? extends K, ? extends V> val, StoreOperation storeOperation) {
            this.nextVal = val;
            this.nextStoreOperation = storeOperation;
        }

        private void waitForFlush() throws IgniteInterruptedCheckedException {
            U.await(this.flushCond);
        }

        private void signalFlushed() {
            this.flushCond.signalAll();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof StatefulValue)) {
                return false;
            }
            StatefulValue other = (StatefulValue)o;
            return F.eq(this.val, other.val) && F.eq((Object)this.valStatus, (Object)other.valStatus);
        }

        public int hashCode() {
            int res = this.val != null ? this.val.hashCode() : 0;
            res = 31 * res + this.valStatus.hashCode();
            return res;
        }

        @Override
        public String toString() {
            return S.toString(StatefulValue.class, this);
        }
    }

    private static enum BatchingResult {
        ADDED,
        SKIPPED,
        NEW_BATCH;

    }

    private static enum ValueStatus {
        NEW,
        PENDING,
        PENDING_AND_UPDATED,
        RETRY,
        FLUSHED;

    }

    private static enum StoreOperation {
        PUT,
        RMV;

    }

    private class Flusher
    extends GridWorker {
        private final FastSizeDeque<IgniteBiTuple<K, StatefulValue<K, V>>> queue;
        private final ConcurrentHashMap<K, StatefulValue<K, V>> flusherWriteMap;
        private final int flusherCacheCriticalSize;
        private volatile boolean parked;
        protected Thread thread;
        protected long cacheFlushFreqNanos;
        private final Lock flusherWriterLock;
        private Condition flusherWriterCanWrite;

        protected Flusher(String igniteInstanceName, String name, IgniteLogger log) {
            super(igniteInstanceName, name, log);
            this.cacheFlushFreqNanos = GridCacheWriteBehindStore.this.cacheFlushFreq * 1000L * 1000L;
            this.flusherWriterLock = new ReentrantLock();
            this.flusherWriterCanWrite = this.flusherWriterLock.newCondition();
            this.flusherCacheCriticalSize = GridCacheWriteBehindStore.this.cacheCriticalSize / GridCacheWriteBehindStore.this.flushThreadCnt;
            assert (this.flusherCacheCriticalSize > GridCacheWriteBehindStore.this.batchSize);
            if (GridCacheWriteBehindStore.this.writeCoalescing) {
                this.queue = null;
                this.flusherWriteMap = null;
            } else {
                this.queue = new FastSizeDeque(new ConcurrentLinkedDeque());
                this.flusherWriteMap = new ConcurrentHashMap(GridCacheWriteBehindStore.this.initCap, 0.75f, GridCacheWriteBehindStore.this.concurLvl);
            }
        }

        protected void start() {
            this.thread = new IgniteThread(this);
            this.thread.start();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void putToFlusherWriteCache(K key, StatefulValue<K, V> newVal) throws IgniteInterruptedCheckedException {
            assert (!GridCacheWriteBehindStore.this.writeCoalescing) : "Unexpected write coalescing.";
            if (this.queue.sizex() > this.flusherCacheCriticalSize) {
                while (this.queue.sizex() > this.flusherCacheCriticalSize) {
                    this.wakeUp();
                    this.flusherWriterLock.lock();
                    try {
                        while (this.queue.sizex() >= this.flusherCacheCriticalSize && !GridCacheWriteBehindStore.this.stopping.get()) {
                            if (GridCacheWriteBehindStore.this.cacheFlushFreq > 0L) {
                                this.flusherWriterCanWrite.await(GridCacheWriteBehindStore.this.cacheFlushFreq, TimeUnit.MILLISECONDS);
                                continue;
                            }
                            this.flusherWriterCanWrite.await();
                        }
                        GridCacheWriteBehindStore.this.cacheTotalOverflowCntr.incrementAndGet();
                    }
                    catch (InterruptedException e) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Caught interrupted exception: " + e);
                        }
                        Thread.currentThread().interrupt();
                    }
                    finally {
                        this.flusherWriterLock.unlock();
                    }
                }
                GridCacheWriteBehindStore.this.cacheTotalOverflowCntr.incrementAndGet();
            }
            this.queue.add(F.t(key, newVal));
            this.flusherWriteMap.put(key, newVal);
        }

        public boolean isOverflowed() {
            if (GridCacheWriteBehindStore.this.writeCoalescing) {
                return GridCacheWriteBehindStore.this.writeCache.sizex() > GridCacheWriteBehindStore.this.cacheCriticalSize;
            }
            return this.queue.sizex() > this.flusherCacheCriticalSize;
        }

        public int size() {
            return GridCacheWriteBehindStore.this.writeCoalescing ? GridCacheWriteBehindStore.this.writeCache.sizex() : this.queue.sizex();
        }

        public boolean isEmpty() {
            return GridCacheWriteBehindStore.this.writeCoalescing ? GridCacheWriteBehindStore.this.writeCache.isEmpty() : this.queue.isEmpty();
        }

        @Override
        protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
            if (GridCacheWriteBehindStore.this.writeCoalescing) {
                while (!GridCacheWriteBehindStore.this.stopping.get() || GridCacheWriteBehindStore.this.writeCache.sizex() > 0) {
                    this.awaitOperationsAvailableCoalescing();
                    this.flushCacheCoalescing();
                }
            } else {
                while (!GridCacheWriteBehindStore.this.stopping.get() || this.queue.sizex() > 0) {
                    this.awaitOperationsAvailableNonCoalescing();
                    this.flushCacheNonCoalescing();
                }
            }
        }

        private void awaitOperationsAvailableCoalescing() throws InterruptedException {
            GridCacheWriteBehindStore.this.flushLock.lock();
            try {
                do {
                    if (GridCacheWriteBehindStore.this.writeCache.sizex() > GridCacheWriteBehindStore.this.cacheMaxSize && GridCacheWriteBehindStore.this.cacheMaxSize != 0) continue;
                    if (GridCacheWriteBehindStore.this.cacheFlushFreq > 0L) {
                        GridCacheWriteBehindStore.this.canFlush.await(GridCacheWriteBehindStore.this.cacheFlushFreq, TimeUnit.MILLISECONDS);
                        continue;
                    }
                    GridCacheWriteBehindStore.this.canFlush.await();
                } while (GridCacheWriteBehindStore.this.writeCache.sizex() == 0 && !GridCacheWriteBehindStore.this.stopping.get());
            }
            finally {
                GridCacheWriteBehindStore.this.flushLock.unlock();
            }
        }

        private void awaitOperationsAvailableNonCoalescing() throws InterruptedException {
            if (this.queue.sizex() >= GridCacheWriteBehindStore.this.batchSize) {
                return;
            }
            this.parked = true;
            try {
                do {
                    if (this.queue.sizex() >= GridCacheWriteBehindStore.this.batchSize) {
                        return;
                    }
                    if (GridCacheWriteBehindStore.this.cacheFlushFreq > 0L) {
                        LockSupport.parkNanos(this.cacheFlushFreqNanos);
                    } else {
                        LockSupport.park();
                    }
                    if (this.queue.sizex() > 0) {
                        return;
                    }
                    if (!Thread.interrupted()) continue;
                    throw new InterruptedException();
                } while (!GridCacheWriteBehindStore.this.stopping.get());
                return;
            }
            finally {
                this.parked = false;
            }
        }

        public void wakeUp() {
            if (this.parked) {
                LockSupport.unpark(this.thread);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Unable to fully structure code
         */
        private void flushCacheCoalescing() {
            prevOperation = null;
            pending = U.newLinkedHashMap(GridCacheWriteBehindStore.access$1700(GridCacheWriteBehindStore.this));
lbl3:
            // 7 sources

            block7: for (Map.Entry<K, V> e : GridCacheWriteBehindStore.access$2300(GridCacheWriteBehindStore.this).entrySet()) {
                val = (StatefulValue)e.getValue();
                if (!val.writeLock().tryLock()) continue;
                try {
                    addRes = this.tryAddStatefulValue(pending, prevOperation, e.getKey(), val);
                    switch (1.$SwitchMap$org$apache$ignite$internal$processors$cache$store$GridCacheWriteBehindStore$BatchingResult[addRes.ordinal()]) {
                        case 1: {
                            StatefulValue.access$1100(val, ValueStatus.PENDING);
                            val.writeLock().unlock();
                            GridCacheWriteBehindStore.access$2700(GridCacheWriteBehindStore.this, pending, true, null);
                            pending = U.newLinkedHashMap(GridCacheWriteBehindStore.access$1700(GridCacheWriteBehindStore.this));
                            pending.put(e.getKey(), val);
                            prevOperation = StatefulValue.access$300(val);
                            ** break;
                        }
                        case 2: {
                            prevOperation = StatefulValue.access$300(val);
                            ** break;
                        }
                        default: {
                            if (!Flusher.$assertionsDisabled && addRes != BatchingResult.SKIPPED) {
                                throw new AssertionError((Object)("Unexpected result: " + (Object)addRes));
                            }
                            continue block7;
                        }
                    }
                }
                finally {
                    if (!val.writeLock().isHeldByCurrentThread()) continue;
                    val.writeLock().unlock();
                }
            }
            if (!pending.isEmpty()) {
                GridCacheWriteBehindStore.access$2700(GridCacheWriteBehindStore.this, pending, true, null);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void flushCacheNonCoalescing() {
            while (!this.queue.isEmpty()) {
                IgniteBiTuple tuple;
                LinkedHashMap pending = U.newLinkedHashMap(GridCacheWriteBehindStore.this.batchSize);
                StoreOperation prevOperation = null;
                boolean needNewBatch = false;
                block9: while (!needNewBatch && (tuple = this.queue.peek()) != null) {
                    BatchingResult addRes = this.tryAddStatefulValue(pending, prevOperation, tuple.getKey(), tuple.getValue());
                    switch (addRes) {
                        case ADDED: {
                            prevOperation = tuple.getValue().operation();
                            this.queue.poll();
                            continue block9;
                        }
                        case SKIPPED: {
                            assert (false) : "Unexpected result: " + (Object)((Object)addRes);
                            continue block9;
                        }
                        case NEW_BATCH: {
                            needNewBatch = true;
                            prevOperation = null;
                            continue block9;
                        }
                    }
                    assert (false) : "Unexpected result: " + (Object)((Object)addRes);
                }
                boolean applied = GridCacheWriteBehindStore.this.applyBatch(pending, true, this);
                if (applied) {
                    this.flusherWriterLock.lock();
                    try {
                        this.flusherWriterCanWrite.signalAll();
                        continue;
                    }
                    finally {
                        this.flusherWriterLock.unlock();
                        continue;
                    }
                }
                ArrayList pendingList = new ArrayList(pending.entrySet());
                for (int i = pendingList.size() - 1; i >= 0; --i) {
                    this.queue.addFirst(F.t(pendingList.get(i).getKey(), pendingList.get(i).getValue()));
                }
            }
        }

        public BatchingResult tryAddStatefulValue(Map<K, StatefulValue<K, V>> pending, StoreOperation prevOperation, K key, StatefulValue<K, V> val) {
            ValueStatus status = val.status();
            assert (!pending.isEmpty() || prevOperation == null) : "prev operation cannot be " + (Object)((Object)prevOperation) + " if prev map is empty!";
            if (GridCacheWriteBehindStore.this.acquired(status)) {
                return BatchingResult.SKIPPED;
            }
            if (!GridCacheWriteBehindStore.this.writeCoalescing && pending.containsKey(key)) {
                return BatchingResult.NEW_BATCH;
            }
            if (status == ValueStatus.RETRY) {
                GridCacheWriteBehindStore.this.retryEntriesCnt.decrementAndGet();
            }
            assert (GridCacheWriteBehindStore.this.retryEntriesCnt.get() >= 0);
            if (pending.size() == GridCacheWriteBehindStore.this.batchSize) {
                return BatchingResult.NEW_BATCH;
            }
            if (prevOperation != val.operation() && prevOperation != null) {
                return BatchingResult.NEW_BATCH;
            }
            val.status(ValueStatus.PENDING);
            pending.put(key, val);
            return BatchingResult.ADDED;
        }
    }
}

