/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.visor.consistency;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.ReadRepairStrategy;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheConsistencyViolationEvent;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.near.consistency.IgniteIrreparableConsistencyViolationException;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.consistency.AbstractConsistencyTask;
import org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTaskArg;
import org.apache.ignite.internal.visor.consistency.VisorConsistencyStatusTask;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.LoggerResource;

public class VisorConsistencyRepairTask
extends AbstractConsistencyTask<VisorConsistencyRepairTaskArg, String> {
    private static final long serialVersionUID = 0L;
    public static final String NOTHING_FOUND = "Consistency violations were NOT found";
    public static final String CONSISTENCY_VIOLATIONS_FOUND = "Consistency violations were FOUND";
    public static final String CONSISTENCY_VIOLATIONS_RECORDED = "Cache consistency violations recorded.";

    @Override
    protected VisorJob<VisorConsistencyRepairTaskArg, String> job(VisorConsistencyRepairTaskArg arg) {
        return new VisorConsistencyRepairJob(arg, this.debug);
    }

    private static class VisorConsistencyRepairJob
    extends VisorJob<VisorConsistencyRepairTaskArg, String> {
        private static final long serialVersionUID = 0L;
        @LoggerResource
        protected IgniteLogger log;
        private final Set<CacheConsistencyViolationEvent> evts = new GridConcurrentHashSet<CacheConsistencyViolationEvent>();

        protected VisorConsistencyRepairJob(VisorConsistencyRepairTaskArg arg, boolean debug) {
            super(arg, debug);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected String run(VisorConsistencyRepairTaskArg arg) throws IgniteException {
            GridCacheContext cacheCtx;
            String cacheOrGrpName = arg.cacheOrGroupName();
            ReadRepairStrategy strategy = arg.strategy();
            int p = arg.part();
            int batchSize = 128;
            int statusDelay = 60000;
            int cacheOrGrpId = CU.cacheId(cacheOrGrpName);
            CacheGroupContext grpCtx = this.ignite.context().cache().cacheGroup(cacheOrGrpId);
            if (grpCtx == null && (cacheCtx = this.ignite.context().cache().context().cacheContext(cacheOrGrpId)) != null) {
                grpCtx = cacheCtx.group();
            }
            if (grpCtx == null) {
                if (this.ignite.context().cache().cacheGroupDescriptor(cacheOrGrpId) != null || this.ignite.context().cache().cacheDescriptor(cacheOrGrpName) != null) {
                    return null;
                }
                throw new IgniteException("Cache (or cache group) not found [name=" + cacheOrGrpName + "]");
            }
            if (!this.ignite.context().event().isRecordable(135)) {
                throw new UnsupportedOperationException("Consistency violation events recording is disabled on cluster.");
            }
            GridDhtLocalPartition part = grpCtx.topology().localPartition(p);
            if (part == null) {
                return null;
            }
            this.log.info("Consistency check started [grp=" + grpCtx.cacheOrGroupName() + ", part=" + p + ", strategy=" + (Object)((Object)strategy) + "]");
            String statusKey = "[node=" + this.ignite.localNode() + ", cacheGroup=" + grpCtx.cacheOrGroupName() + ", part=" + p + "]";
            if (VisorConsistencyStatusTask.MAP.putIfAbsent(statusKey, "0/" + part.fullSize()) != null) {
                throw new IllegalStateException("Consistency check already started [grp=" + grpCtx.cacheOrGroupName() + ", part=" + p + "]");
            }
            long processed = 0L;
            long checked = 0L;
            long statusTs = 0L;
            part.reserve();
            try {
                CacheConsistencyViolationEventListener lsnr = new CacheConsistencyViolationEventListener(grpCtx.caches().stream().map(GridCacheContext::name).collect(Collectors.toSet()));
                this.ignite.events().localListen(lsnr, 135);
                try {
                    HashMap<Integer, PerCacheBatch> batches = new HashMap<Integer, PerCacheBatch>();
                    GridCursor<? extends CacheDataRow> cursor = grpCtx.offheap().dataStore(part).cursor();
                    while (cursor.next() && !this.isCancelled()) {
                        CacheDataRow row = cursor.get();
                        ++processed;
                        PerCacheBatch batch = batches.computeIfAbsent(row.cacheId(), cacheId -> {
                            String cacheName = cacheId != 0 ? this.ignite.context().cache().cacheDescriptor((int)cacheId).cacheName() : cacheOrGrpName;
                            return new PerCacheBatch(this.ignite.cache(cacheName).withKeepBinary().withReadRepair(strategy));
                        });
                        batch.keys.add(row.key());
                        if (batch.keys.size() == batchSize) {
                            this.repair(batch.cache, batch.keys);
                            batch.keys.clear();
                            VisorConsistencyStatusTask.MAP.put(statusKey, (checked += (long)batch.keys.size()) + "/" + part.fullSize());
                        }
                        assert (batch.keys.size() < batchSize);
                        if (System.currentTimeMillis() < statusTs) continue;
                        statusTs = System.currentTimeMillis() + (long)statusDelay;
                        this.log.info("Consistency check progress [grp=" + grpCtx.cacheOrGroupName() + ", caches=" + batches.values().stream().map(b -> ((PerCacheBatch)b).cache.getName()).collect(Collectors.toList()) + ", part=" + p + ", checked=" + checked + ", processed =" + processed + "/" + part.fullSize() + "]");
                    }
                    for (PerCacheBatch batch : batches.values()) {
                        assert (batch.keys.size() < batchSize);
                        this.repair(batch.cache, batch.keys);
                        checked += (long)batch.keys.size();
                        batch.keys.clear();
                    }
                    this.log.info("Consistency check " + (this.isCancelled() ? "cancelled" : "finished") + "[grp=" + grpCtx.cacheOrGroupName() + ", caches=" + batches.values().stream().map(b -> ((PerCacheBatch)b).cache.getName()).collect(Collectors.toList()) + ", part=" + p + ", checked=" + checked + ", processed =" + processed + "/" + part.fullSize() + "]");
                }
                finally {
                    this.ignite.events().stopLocalListen(lsnr, new int[0]);
                }
            }
            catch (IgniteCheckedException e) {
                throw new IgniteException("Partition repair attempt failed.", e);
            }
            finally {
                part.release();
                VisorConsistencyStatusTask.MAP.remove(statusKey);
            }
            if (!this.evts.isEmpty()) {
                return this.processEvents(p, checked);
            }
            return "Consistency violations were NOT found [processed=" + checked + "]\n";
        }

        private void repair(IgniteCache<Object, Object> cache, Set<Object> keys) {
            block2: {
                try {
                    cache.getAll(keys);
                }
                catch (CacheException e) {
                    if (e.getCause() instanceof IgniteIrreparableConsistencyViolationException || this.isCancelled()) break block2;
                    throw new IgniteException("Read repair attempt failed.", e);
                }
            }
        }

        private String processEvents(int part, long cnt) {
            int found = 0;
            int repaired = 0;
            StringBuilder sb = new StringBuilder();
            for (CacheConsistencyViolationEvent evt : this.evts) {
                for (Map.Entry<Object, CacheConsistencyViolationEvent.EntriesInfo> entry : evt.getEntries().entrySet()) {
                    Object key = entry.getKey();
                    if (entry.getValue().partition() != part) continue;
                    ++found;
                    sb.append("Key: ").append(key).append(" (cache: ").append(evt.getCacheName()).append(", partition: ").append(entry.getValue().partition()).append(", strategy: ").append((Object)evt.getStrategy()).append(", id: ").append(evt.id()).append(", timestamp: ").append(evt.timestamp()).append(", node: ").append(evt.node()).append(")").append("\n");
                    if (evt.getRepairedEntries().containsKey(key)) {
                        sb.append(" Repaired: ").append(evt.getRepairedEntries().get(key)).append("\n");
                    } else {
                        sb.append(" [Was NOT repaired!]").append("\n");
                    }
                    for (Map.Entry<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> mapping : entry.getValue().getMapping().entrySet()) {
                        ClusterNode node = mapping.getKey();
                        CacheConsistencyViolationEvent.EntryInfo info = mapping.getValue();
                        sb.append("  Node: ").append(node).append("\n").append("    Value: ").append(info.getValue()).append("\n").append("    Version: ").append(info.getVersion()).append("\n").append("    On primary: ").append(info.isPrimary()).append("\n");
                        if (info.getVersion() != null) {
                            sb.append("    Other cluster version: ").append(info.getVersion().otherClusterVersion()).append("\n");
                        }
                        if (!info.isCorrect()) continue;
                        sb.append("    [CORRECT value!]").append("\n");
                    }
                    if (!evt.getRepairedEntries().containsKey(key)) continue;
                    ++repaired;
                }
            }
            String res = sb.toString();
            if (!res.isEmpty()) {
                this.log.warning("Cache consistency violations recorded.\n" + res);
                return "Consistency violations were FOUND [found=" + found + ", repaired=" + repaired + ", processed=" + cnt + "]";
            }
            return "Consistency violations were NOT found [processed=" + cnt + "]\n";
        }

        private static class PerCacheBatch {
            private final IgniteCache<Object, Object> cache;
            private final Set<Object> keys;

            public PerCacheBatch(IgniteCache<Object, Object> cache) {
                this.cache = cache;
                this.keys = new HashSet<Object>();
            }
        }

        private class CacheConsistencyViolationEventListener
        implements IgnitePredicate<CacheConsistencyViolationEvent> {
            private static final long serialVersionUID = 0L;
            private final Set<String> cacheNames;

            private CacheConsistencyViolationEventListener(Set<String> cacheNames) {
                this.cacheNames = cacheNames;
            }

            @Override
            public boolean apply(CacheConsistencyViolationEvent evt) {
                assert (evt instanceof CacheConsistencyViolationEvent);
                if (!this.cacheNames.contains(evt.getCacheName())) {
                    return true;
                }
                VisorConsistencyRepairJob.this.evts.add(evt);
                return true;
            }
        }
    }
}

