/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.distributed.dht.topology;

import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.SystemProperty;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
import org.apache.ignite.internal.processors.cache.CacheStoppedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.EvictionContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;

public class PartitionsEvictManager
extends GridCacheSharedManagerAdapter {
    private static final int DEFAULT_SHOW_EVICTION_PROGRESS_FREQ_MS = 120000;
    @SystemProperty(value="Eviction progress frequency in milliseconds", type=Long.class, defaults="120000")
    public static final String SHOW_EVICTION_PROGRESS_FREQ = "SHOW_EVICTION_PROGRESS_FREQ";
    private final long evictionProgressFreqMs = IgniteSystemProperties.getLong("SHOW_EVICTION_PROGRESS_FREQ", 120000L);
    private long lastShowProgressTimeNanos = System.nanoTime() - U.millisToNanos(this.evictionProgressFreqMs);
    private final Map<Integer, GroupEvictionContext> evictionGroupsMap = new ConcurrentHashMap<Integer, GroupEvictionContext>();
    private final Map<Integer, Map<Integer, EvictReason>> logEvictPartByGrps = new HashMap<Integer, Map<Integer, EvictReason>>();
    private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
    private final Object mux = new Object();
    private volatile IgniteThreadPoolExecutor executor;
    private static final BiConsumer<EvictReason, CacheMetricsImpl> INCREMENT = new BiConsumer<EvictReason, CacheMetricsImpl>(){

        @Override
        public void accept(EvictReason reason, CacheMetricsImpl cacheMetrics) {
            if (reason == EvictReason.CLEARING) {
                cacheMetrics.incrementRebalanceClearingPartitions();
            } else {
                cacheMetrics.incrementEvictingPartitions();
            }
        }
    };
    private static final BiConsumer<EvictReason, CacheMetricsImpl> DECREMENT = new BiConsumer<EvictReason, CacheMetricsImpl>(){

        @Override
        public void accept(EvictReason reason, CacheMetricsImpl cacheMetrics) {
            if (reason == EvictReason.CLEARING) {
                cacheMetrics.decrementRebalanceClearingPartitions();
            } else {
                cacheMetrics.decrementEvictingPartitions();
            }
        }
    };

    public void onCacheGroupStarted(CacheGroupContext grp) {
    }

    public void onCacheGroupStopped(CacheGroupContext grp) {
        GroupEvictionContext grpEvictionCtx = this.evictionGroupsMap.computeIfAbsent(grp.groupId(), p -> new GroupEvictionContext(grp));
        grpEvictionCtx.stop(new CacheStoppedException(grp.cacheOrGroupName()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public IgniteInternalFuture<?> evictPartitionAsync(CacheGroupContext grp, GridDhtLocalPartition part, GridFutureAdapter<?> finishFut) {
        assert (Objects.nonNull(grp));
        assert (Objects.nonNull(part));
        if (!this.busyLock.readLock().tryLock()) {
            return new GridFinishedFuture(new NodeStoppingException("Node is stopping"));
        }
        try {
            EvictReason reason;
            int grpId = grp.groupId();
            if (this.cctx.cache().cacheGroup(grpId) == null) {
                GridFinishedFuture gridFinishedFuture = new GridFinishedFuture(new CacheStoppedException(grp.cacheOrGroupName()));
                return gridFinishedFuture;
            }
            GroupEvictionContext grpEvictionCtx = this.evictionGroupsMap.computeIfAbsent(grpId, k -> new GroupEvictionContext(grp));
            EvictReason evictReason = reason = part.state() == GridDhtPartitionState.RENTING ? EvictReason.EVICTION : EvictReason.CLEARING;
            if (this.log.isDebugEnabled()) {
                this.log.debug("The partition has been scheduled for clearing [grp=" + grp.cacheOrGroupName() + ", topVer=" + grp.topology().readyTopologyVersion() + ", id=" + part.id() + ", state=" + (Object)((Object)part.state()) + ", fullSize=" + part.fullSize() + ", reason=" + (Object)((Object)reason) + ']');
            }
            Object object = this.mux;
            synchronized (object) {
                PartitionEvictionTask task = new PartitionEvictionTask(part, grpEvictionCtx, reason, finishFut);
                this.logEvictPartByGrps.computeIfAbsent(grpId, i -> new HashMap()).put(part.id(), reason);
                grpEvictionCtx.totalTasks.incrementAndGet();
                this.updateMetrics(grp, reason, INCREMENT);
                this.executor.submit(task);
                this.showProgress();
                grpEvictionCtx.taskScheduled(task);
                GridFutureAdapter gridFutureAdapter = task.finishFut;
                return gridFutureAdapter;
            }
        }
        finally {
            this.busyLock.readLock().unlock();
        }
    }

    private void showProgress() {
        if (U.millisSinceNanos(this.lastShowProgressTimeNanos) >= this.evictionProgressFreqMs) {
            int size = this.executor.getQueue().size();
            if (this.log.isInfoEnabled()) {
                this.log.info("Eviction in progress [groups=" + this.evictionGroupsMap.keySet().size() + ", remainingPartsToEvict=" + size + ']');
                this.evictionGroupsMap.values().forEach(rec$ -> ((GroupEvictionContext)rec$).showProgress());
                if (!this.logEvictPartByGrps.isEmpty()) {
                    StringJoiner evictPartJoiner = new StringJoiner(", ");
                    this.logEvictPartByGrps.forEach((grpId, map) -> {
                        CacheGroupContext grpCtx = this.cctx.cache().cacheGroup((int)grpId);
                        String grpName = Objects.nonNull(grpCtx) ? grpCtx.cacheOrGroupName() : null;
                        evictPartJoiner.add("[grpId=" + grpId + ", grpName=" + grpName + ", " + this.toString((Map<Integer, EvictReason>)map) + ']');
                    });
                    this.log.info("Partitions have been scheduled for eviction: " + evictPartJoiner);
                    this.logEvictPartByGrps.clear();
                }
            }
            this.lastShowProgressTimeNanos = System.nanoTime();
        }
    }

    @Override
    protected void start0() throws IgniteCheckedException {
        super.start0();
        this.executor = (IgniteThreadPoolExecutor)this.cctx.kernalContext().pools().getRebalanceExecutorService();
    }

    @Override
    protected void onKernalStop0(boolean cancel) {
        super.onKernalStop0(cancel);
        this.busyLock.writeLock().lock();
        Collection<GroupEvictionContext> evictionGrps = this.evictionGroupsMap.values();
        NodeStoppingException ex = new NodeStoppingException("Node is stopping");
        for (GroupEvictionContext evictionGrp : evictionGrps) {
            evictionGrp.stop(ex);
        }
        this.executor = null;
    }

    private String toString(Map<Integer, EvictReason> evictParts) {
        assert (Objects.nonNull(evictParts));
        EnumMap<EvictReason, Collection> partByReason = new EnumMap<EvictReason, Collection>(EvictReason.class);
        for (Map.Entry<Integer, EvictReason> entry : evictParts.entrySet()) {
            partByReason.computeIfAbsent(entry.getValue(), b -> new ArrayList()).add(entry.getKey());
        }
        StringJoiner joiner = new StringJoiner(", ");
        partByReason.forEach((reason, partIds) -> joiner.add(reason.toString() + '=' + S.compact(partIds)));
        return joiner.toString();
    }

    public void cleanupRemovedGroup(int grpId) {
        this.evictionGroupsMap.remove(grpId);
    }

    public int total() {
        return this.evictionGroupsMap.values().stream().mapToInt(ctx -> ((GroupEvictionContext)ctx).totalTasks.get()).sum();
    }

    private void updateMetrics(CacheGroupContext grp, EvictReason reason, BiConsumer<EvictReason, CacheMetricsImpl> c) {
        for (GridCacheContext cctx : grp.caches()) {
            if (!cctx.statisticsEnabled()) continue;
            CacheMetricsImpl metrics = cctx.cache().metrics0();
            c.accept(reason, metrics);
        }
    }

    private static enum EvictReason {
        EVICTION,
        CLEARING;


        public String toString() {
            return this.name().toLowerCase();
        }
    }

    private class PartitionEvictionTask
    implements Runnable {
        private final GridDhtLocalPartition part;
        private final EvictReason reason;
        private final GroupEvictionContext grpEvictionCtx;
        private final GridFutureAdapter<?> finishFut;

        private PartitionEvictionTask(GridDhtLocalPartition part, GroupEvictionContext grpEvictionCtx, EvictReason reason, GridFutureAdapter<?> finishFut) {
            this.part = part;
            this.grpEvictionCtx = grpEvictionCtx;
            this.reason = reason;
            this.finishFut = finishFut;
        }

        @Override
        public void run() {
            if (!this.grpEvictionCtx.busyLock.readLock().tryLock()) {
                this.finishFut.onDone((Throwable)this.grpEvictionCtx.stopExRef.get());
                return;
            }
            try {
                long clearedEntities = this.part.clearAll(this.grpEvictionCtx);
                if (PartitionsEvictManager.this.log.isDebugEnabled()) {
                    PartitionsEvictManager.this.log.debug("The partition has been cleared [grp=" + this.part.group().cacheOrGroupName() + ", topVer=" + this.part.group().topology().readyTopologyVersion() + ", id=" + this.part.id() + ", state=" + (Object)((Object)this.part.state()) + ", cleared=" + clearedEntities + ", fullSize=" + this.part.fullSize() + ']');
                }
                this.finishFut.onDone();
            }
            catch (Throwable ex) {
                PartitionsEvictManager.this.updateMetrics(this.grpEvictionCtx.grp, this.reason, DECREMENT);
                this.finishFut.onDone(ex);
                if (PartitionsEvictManager.this.cctx.kernalContext().isStopping()) {
                    LT.warn(PartitionsEvictManager.this.log, ex, "Partition eviction has been cancelled (local node is stopping) [grp=" + this.grpEvictionCtx.grp.cacheOrGroupName() + ", readyVer=" + this.grpEvictionCtx.grp.topology().readyTopologyVersion() + ']', false, true);
                } else {
                    LT.error(PartitionsEvictManager.this.log, ex, "Partition eviction has failed [grp=" + this.grpEvictionCtx.grp.cacheOrGroupName() + ", part=" + this.part.id() + ']');
                    PartitionsEvictManager.this.cctx.kernalContext().failure().process(new FailureContext(FailureType.SYSTEM_WORKER_TERMINATION, ex));
                }
            }
            finally {
                this.grpEvictionCtx.busyLock.readLock().unlock();
            }
        }
    }

    private class GroupEvictionContext
    implements EvictionContext {
        private final CacheGroupContext grp;
        private AtomicReference<Exception> stopExRef = new AtomicReference();
        private AtomicInteger totalTasks = new AtomicInteger();
        private int taskInProgress;
        private ReadWriteLock busyLock = new ReentrantReadWriteLock();

        private GroupEvictionContext(CacheGroupContext grp) {
            this.grp = grp;
        }

        private synchronized void taskScheduled(PartitionEvictionTask task) {
            ++this.taskInProgress;
            GridFutureAdapter fut = task.finishFut;
            fut.listen(f -> {
                GroupEvictionContext groupEvictionContext = this;
                synchronized (groupEvictionContext) {
                    --this.taskInProgress;
                    this.totalTasks.decrementAndGet();
                    PartitionsEvictManager.this.updateMetrics(((PartitionEvictionTask)task).grpEvictionCtx.grp, task.reason, DECREMENT);
                }
            });
        }

        @Override
        public boolean shouldStop() {
            return this.stopExRef.get() != null;
        }

        void stop(Exception ex) {
            if (!this.stopExRef.compareAndSet(null, ex)) {
                return;
            }
            this.busyLock.writeLock().lock();
        }

        private void awaitFinish(Integer part, IgniteInternalFuture<?> fut) {
            block3: {
                try {
                    if (PartitionsEvictManager.this.log.isInfoEnabled()) {
                        PartitionsEvictManager.this.log.info("Await partition evict, grpName=" + this.grp.cacheOrGroupName() + ", grpId=" + this.grp.groupId() + ", partId=" + part);
                    }
                    fut.get();
                }
                catch (IgniteCheckedException e) {
                    if (!PartitionsEvictManager.this.log.isDebugEnabled()) break block3;
                    PartitionsEvictManager.this.log.warning("Failed to await partition eviction during stopping.", e);
                }
            }
        }

        private void showProgress() {
            if (PartitionsEvictManager.this.log.isInfoEnabled()) {
                PartitionsEvictManager.this.log.info("Group eviction in progress [grpName=" + this.grp.cacheOrGroupName() + ", grpId=" + this.grp.groupId() + ", remainingPartsToEvict=" + (this.totalTasks.get() - this.taskInProgress) + ", partsEvictInProgress=" + this.taskInProgress + ", totalParts=" + this.grp.topology().localPartitions().size() + "]");
            }
        }
    }
}

