/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.verify;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.verify.GridNotIdleException;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyException;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility;
import org.apache.ignite.internal.processors.cache.verify.NoMatchingCachesException;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskArg;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

@GridInternal
public class VerifyBackupPartitionsTaskV2
extends ComputeTaskAdapter<VisorIdleVerifyTaskArg, IdleVerifyResultV2> {
    public static final IgniteProductVersion V2_SINCE_VER = IgniteProductVersion.fromString("2.5.3");
    @LoggerResource
    private IgniteLogger log;
    private static final long serialVersionUID = 0L;

    @Override
    @NotNull
    public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, VisorIdleVerifyTaskArg arg) throws IgniteException {
        HashMap<VerifyBackupPartitionsJobV2, ClusterNode> jobs = new HashMap<VerifyBackupPartitionsJobV2, ClusterNode>();
        for (ClusterNode node : subgrid) {
            jobs.put(new VerifyBackupPartitionsJobV2(arg), node);
        }
        return jobs;
    }

    @Override
    @Nullable
    public IdleVerifyResultV2 reduce(List<ComputeJobResult> results) throws IgniteException {
        HashMap<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes = new HashMap<PartitionKeyV2, List<PartitionHashRecordV2>>();
        HashMap<ClusterNode, Exception> exceptions = new HashMap<ClusterNode, Exception>();
        this.reduceResults(results, clusterHashes, exceptions);
        if (results.size() != exceptions.size()) {
            return this.checkConflicts(clusterHashes, exceptions);
        }
        return new IdleVerifyResultV2(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), exceptions);
    }

    @Override
    public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
        try {
            ComputeJobResultPolicy superRes = super.result(res, rcvd);
            if (superRes == ComputeJobResultPolicy.FAILOVER) {
                superRes = ComputeJobResultPolicy.WAIT;
                if (this.log != null) {
                    this.log.warning("VerifyBackupPartitionsJobV2 failed on node [consistentId=" + res.getNode().consistentId() + "]", res.getException());
                }
            }
            return superRes;
        }
        catch (IgniteException e) {
            return ComputeJobResultPolicy.WAIT;
        }
    }

    private IdleVerifyResultV2 checkConflicts(Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes, Map<ClusterNode, Exception> exceptions) {
        HashMap<PartitionKeyV2, List<PartitionHashRecordV2>> hashConflicts = new HashMap<PartitionKeyV2, List<PartitionHashRecordV2>>();
        HashMap<PartitionKeyV2, List<PartitionHashRecordV2>> updateCntrConflicts = new HashMap<PartitionKeyV2, List<PartitionHashRecordV2>>();
        HashMap<PartitionKeyV2, List<PartitionHashRecordV2>> movingParts = new HashMap<PartitionKeyV2, List<PartitionHashRecordV2>>();
        HashMap<PartitionKeyV2, List<PartitionHashRecordV2>> lostParts = new HashMap<PartitionKeyV2, List<PartitionHashRecordV2>>();
        for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> e : clusterHashes.entrySet()) {
            Integer partHash = null;
            Long updateCntr = null;
            for (PartitionHashRecordV2 record : e.getValue()) {
                if (record.partitionState() == PartitionHashRecordV2.PartitionState.MOVING) {
                    movingParts.computeIfAbsent(e.getKey(), k -> new ArrayList()).add(record);
                    continue;
                }
                if (record.partitionState() == PartitionHashRecordV2.PartitionState.LOST) {
                    lostParts.computeIfAbsent(e.getKey(), k -> new ArrayList()).add(record);
                    continue;
                }
                if (partHash == null) {
                    partHash = record.partitionHash();
                    updateCntr = record.updateCounter();
                    continue;
                }
                if (record.updateCounter() != updateCntr.longValue()) {
                    updateCntrConflicts.putIfAbsent(e.getKey(), e.getValue());
                }
                if (record.partitionHash() == partHash.intValue()) continue;
                hashConflicts.putIfAbsent(e.getKey(), e.getValue());
            }
        }
        return new IdleVerifyResultV2(updateCntrConflicts, hashConflicts, movingParts, lostParts, exceptions);
    }

    private void reduceResults(List<ComputeJobResult> results, Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes, Map<ClusterNode, Exception> exceptions) {
        for (ComputeJobResult res : results) {
            if (res.getException() != null) {
                exceptions.put(res.getNode(), res.getException());
                continue;
            }
            Map nodeHashes = (Map)res.getData();
            for (Map.Entry e : nodeHashes.entrySet()) {
                List records = clusterHashes.computeIfAbsent((PartitionKeyV2)e.getKey(), (Function<PartitionKeyV2, List<PartitionHashRecordV2>>)((Function<PartitionKeyV2, List>)k -> new ArrayList()));
                records.add(e.getValue());
            }
        }
    }

    private static class VerifyBackupPartitionsJobV2
    extends ComputeJobAdapter {
        private static final long serialVersionUID = 0L;
        @IgniteInstanceResource
        private IgniteEx ignite;
        @LoggerResource
        private IgniteLogger log;
        private VisorIdleVerifyTaskArg arg;
        private final AtomicInteger completionCntr = new AtomicInteger(0);

        public VerifyBackupPartitionsJobV2(VisorIdleVerifyTaskArg arg) {
            this.arg = arg;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Map<PartitionKeyV2, PartitionHashRecordV2> execute() throws IgniteException {
            Set<Integer> grpIds = this.getGroupIds();
            this.completionCntr.set(0);
            final AtomicBoolean cpFlag = new AtomicBoolean();
            GridCacheDatabaseSharedManager db = null;
            DbCheckpointListener lsnr = null;
            if (this.arg.checkCrc() && this.ignite.context().cache().context().database() instanceof GridCacheDatabaseSharedManager) {
                db = (GridCacheDatabaseSharedManager)this.ignite.context().cache().context().database();
                lsnr = new DbCheckpointListener(){

                    @Override
                    public void onMarkCheckpointBegin(DbCheckpointListener.Context ctx) {
                    }

                    @Override
                    public void onCheckpointBegin(DbCheckpointListener.Context ctx) {
                        if (ctx.hasPages()) {
                            cpFlag.set(true);
                        }
                    }

                    @Override
                    public void beforeCheckpointBegin(DbCheckpointListener.Context ctx) throws IgniteCheckedException {
                    }
                };
                db.addCheckpointListener(lsnr);
            }
            try {
                if (this.arg.checkCrc() && IdleVerifyUtility.isCheckpointNow(db)) {
                    throw new GridNotIdleException("Checkpoint with dirty pages started! Cluster not idle!");
                }
                List<Future<Map<PartitionKeyV2, PartitionHashRecordV2>>> partHashCalcFuts = this.calcPartitionHashAsync(grpIds, cpFlag);
                HashMap<PartitionKeyV2, PartitionHashRecordV2> res = new HashMap<PartitionKeyV2, PartitionHashRecordV2>();
                ArrayList<IgniteException> exceptions = new ArrayList<IgniteException>();
                long lastProgressLogTs = U.currentTimeMillis();
                int i = 0;
                while (i < partHashCalcFuts.size()) {
                    Future<Map<PartitionKeyV2, PartitionHashRecordV2>> fut = partHashCalcFuts.get(i);
                    try {
                        Map<PartitionKeyV2, PartitionHashRecordV2> partHash = fut.get(100L, TimeUnit.MILLISECONDS);
                        res.putAll(partHash);
                        ++i;
                    }
                    catch (InterruptedException | ExecutionException e) {
                        if (e.getCause() instanceof IgniteException && !(e.getCause() instanceof GridNotIdleException)) {
                            exceptions.add((IgniteException)e.getCause());
                            ++i;
                            continue;
                        }
                        for (int j = i + 1; j < partHashCalcFuts.size(); ++j) {
                            partHashCalcFuts.get(j).cancel(false);
                        }
                        if (e instanceof InterruptedException) {
                            throw new IgniteInterruptedException((InterruptedException)e);
                        }
                        throw new IgniteException(e.getCause());
                    }
                    catch (TimeoutException ignored) {
                        if (U.currentTimeMillis() - lastProgressLogTs <= 180000L) continue;
                        lastProgressLogTs = U.currentTimeMillis();
                        this.log.warning("idle_verify is still running, processed " + this.completionCntr.get() + " of " + partHashCalcFuts.size() + " local partitions");
                    }
                }
                if (!F.isEmpty(exceptions)) {
                    throw new IdleVerifyException(exceptions);
                }
                HashMap<PartitionKeyV2, PartitionHashRecordV2> hashMap = res;
                if (db != null && lsnr != null) {
                    db.removeCheckpointListener(lsnr);
                }
                return hashMap;
            }
            catch (Throwable throwable) {
                if (db != null && lsnr != null) {
                    db.removeCheckpointListener(lsnr);
                }
                throw throwable;
            }
        }

        private List<Future<Map<PartitionKeyV2, PartitionHashRecordV2>>> calcPartitionHashAsync(Set<Integer> grpIds, AtomicBoolean cpFlag) {
            ArrayList<Future<Map<PartitionKeyV2, PartitionHashRecordV2>>> partHashCalcFutures = new ArrayList<Future<Map<PartitionKeyV2, PartitionHashRecordV2>>>();
            for (Integer grpId : grpIds) {
                CacheGroupContext grpCtx = this.ignite.context().cache().cacheGroup(grpId);
                if (grpCtx == null) continue;
                List<GridDhtLocalPartition> parts = grpCtx.topology().localPartitions();
                for (GridDhtLocalPartition part : parts) {
                    partHashCalcFutures.add(this.calculatePartitionHashAsync(grpCtx, part, cpFlag));
                }
            }
            return partHashCalcFutures;
        }

        private Set<Integer> getGroupIds() {
            Collection<CacheGroupContext> cacheGroups = this.ignite.context().cache().cacheGroups();
            Set<Integer> grpIds = new CachesFiltering(cacheGroups).filter(this::filterByCacheNames).filter(this::filterByCacheFilter).filter(this::filterByExcludeCaches).result();
            if (F.isEmpty(grpIds)) {
                throw new NoMatchingCachesException();
            }
            return grpIds;
        }

        private void filterByExcludeCaches(Set<CacheGroupContext> cachesToFilter) {
            if (!F.isEmpty(this.arg.excludeCaches())) {
                HashSet<Pattern> excludedNamesPatterns = new HashSet<Pattern>();
                for (String excluded : this.arg.excludeCaches()) {
                    excludedNamesPatterns.add(Pattern.compile(excluded));
                }
                cachesToFilter.removeIf(grp -> this.doesGrpMatchOneOfPatterns((CacheGroupContext)grp, (Set<Pattern>)excludedNamesPatterns));
            }
        }

        private void filterByCacheFilter(Set<CacheGroupContext> cachesToFilter) {
            cachesToFilter.removeIf(grp -> !this.doesGrpMatchFilter((CacheGroupContext)grp));
        }

        private void filterByCacheNames(Set<CacheGroupContext> cachesToFilter) {
            if (!F.isEmpty(this.arg.caches())) {
                HashSet<Pattern> cacheNamesPatterns = new HashSet<Pattern>();
                for (String cacheNameRegexp : this.arg.caches()) {
                    cacheNamesPatterns.add(Pattern.compile(cacheNameRegexp));
                }
                cachesToFilter.removeIf(grp -> !this.doesGrpMatchOneOfPatterns((CacheGroupContext)grp, (Set<Pattern>)cacheNamesPatterns));
            }
        }

        private boolean doesGrpMatchFilter(CacheGroupContext grp) {
            for (GridCacheContext cacheCtx : grp.caches()) {
                DynamicCacheDescriptor desc = this.ignite.context().cache().cacheDescriptor(cacheCtx.name());
                if (desc.cacheConfiguration().getCacheMode() == CacheMode.LOCAL || !this.isCacheMatchFilter(desc)) continue;
                return true;
            }
            return false;
        }

        private boolean doesGrpMatchOneOfPatterns(CacheGroupContext grp, Set<Pattern> patterns) {
            for (Pattern pattern : patterns) {
                if (grp.name() != null && pattern.matcher(grp.name()).matches()) {
                    return true;
                }
                for (GridCacheContext cacheCtx : grp.caches()) {
                    if (cacheCtx.name() == null || !pattern.matcher(cacheCtx.name()).matches()) continue;
                    return true;
                }
            }
            return false;
        }

        private boolean isCacheMatchFilter(DynamicCacheDescriptor desc) {
            DataStorageConfiguration dsCfg = this.ignite.context().config().getDataStorageConfiguration();
            CacheConfiguration cc = desc.cacheConfiguration();
            switch (this.arg.cacheFilterEnum()) {
                case DEFAULT: {
                    return desc.cacheType().userCache() || !F.isEmpty(this.arg.caches());
                }
                case USER: {
                    return desc.cacheType().userCache();
                }
                case SYSTEM: {
                    return !desc.cacheType().userCache();
                }
                case NOT_PERSISTENT: {
                    return desc.cacheType().userCache() && !GridCacheUtils.isPersistentCache(cc, dsCfg);
                }
                case PERSISTENT: {
                    return desc.cacheType().userCache() && GridCacheUtils.isPersistentCache(cc, dsCfg);
                }
                case ALL: {
                    return true;
                }
            }
            throw new IgniteException("Illegal cache filter: " + (Object)((Object)this.arg.cacheFilterEnum()));
        }

        private Future<Map<PartitionKeyV2, PartitionHashRecordV2>> calculatePartitionHashAsync(CacheGroupContext grpCtx, GridDhtLocalPartition part, AtomicBoolean cpFlag) {
            return ForkJoinPool.commonPool().submit(() -> this.calculatePartitionHash(grpCtx, part, cpFlag));
        }

        private Map<PartitionKeyV2, PartitionHashRecordV2> calculatePartitionHash(CacheGroupContext grpCtx, GridDhtLocalPartition part, AtomicBoolean cpFlag) {
            long partSize;
            if (!part.reserve()) {
                return Collections.emptyMap();
            }
            int partHash = 0;
            long updateCntrBefore = part.updateCounter();
            PartitionKeyV2 partKey = new PartitionKeyV2(grpCtx.groupId(), part.id(), grpCtx.cacheOrGroupName());
            Object consId = this.ignite.context().discovery().localNode().consistentId();
            boolean isPrimary = part.primary(grpCtx.topology().readyTopologyVersion());
            try {
                if (part.state() == GridDhtPartitionState.MOVING || part.state() == GridDhtPartitionState.LOST) {
                    PartitionHashRecordV2 movingHashRecord = new PartitionHashRecordV2(partKey, isPrimary, consId, partHash, updateCntrBefore, part.state() == GridDhtPartitionState.MOVING ? Long.MIN_VALUE : 0L, part.state() == GridDhtPartitionState.MOVING ? PartitionHashRecordV2.PartitionState.MOVING : PartitionHashRecordV2.PartitionState.LOST);
                    Map<PartitionKeyV2, PartitionHashRecordV2> map = Collections.singletonMap(partKey, movingHashRecord);
                    return map;
                }
                if (part.state() != GridDhtPartitionState.OWNING) {
                    Map<PartitionKeyV2, PartitionHashRecordV2> movingHashRecord = Collections.emptyMap();
                    return movingHashRecord;
                }
                partSize = part.dataStore().fullSize();
                if (this.arg.checkCrc()) {
                    this.checkPartitionCrc(grpCtx, part, cpFlag);
                }
                GridIterator<CacheDataRow> it = grpCtx.offheap().partitionIterator(part.id());
                while (it.hasNextX()) {
                    CacheDataRow row = it.nextX();
                    partHash += row.key().hashCode();
                    partHash += Arrays.hashCode(row.value().valueBytes(grpCtx.cacheObjectContext()));
                }
                long updateCntrAfter = part.updateCounter();
                if (updateCntrBefore != updateCntrAfter) {
                    throw new GridNotIdleException("Update counter of partition [grpId=" + grpCtx.groupId() + ", partId=" + part.id() + "] changed during hash calculation [before=" + updateCntrBefore + ", after=" + updateCntrAfter + "]");
                }
            }
            catch (IgniteCheckedException e) {
                U.error(this.log, "Can't calculate partition hash [grpId=" + grpCtx.groupId() + ", partId=" + part.id() + "]", e);
                throw new IgniteException("Can't calculate partition hash [grpId=" + grpCtx.groupId() + ", partId=" + part.id() + "]", e);
            }
            finally {
                part.release();
            }
            PartitionHashRecordV2 partRec = new PartitionHashRecordV2(partKey, isPrimary, consId, partHash, updateCntrBefore, partSize, PartitionHashRecordV2.PartitionState.OWNING);
            this.completionCntr.incrementAndGet();
            return Collections.singletonMap(partKey, partRec);
        }

        private void checkPartitionCrc(CacheGroupContext grpCtx, GridDhtLocalPartition part, AtomicBoolean cpFlag) {
            if (grpCtx.persistenceEnabled()) {
                FilePageStore pageStore = null;
                try {
                    FilePageStoreManager pageStoreMgr = (FilePageStoreManager)this.ignite.context().cache().context().pageStore();
                    if (pageStoreMgr == null) {
                        return;
                    }
                    pageStore = (FilePageStore)pageStoreMgr.getStore(grpCtx.groupId(), part.id());
                    IdleVerifyUtility.checkPartitionsPageCrcSum(pageStore, grpCtx, part.id(), (byte)1, cpFlag);
                }
                catch (GridNotIdleException e) {
                    throw e;
                }
                catch (AssertionError | Exception e) {
                    if (cpFlag.get()) {
                        throw new GridNotIdleException("Checkpoint with dirty pages started! Cluster not idle!", (Throwable)e);
                    }
                    String msg = new SB("CRC check of partition: ").a(part.id()).a(", for cache group ").a(grpCtx.cacheOrGroupName()).a(" failed.").a(pageStore != null ? " file: " + pageStore.getFileAbsolutePath() : "").toString();
                    this.log.error(msg, (Throwable)e);
                    throw new IgniteException(msg, (Throwable)e);
                }
            }
        }

        private class CachesFiltering {
            private final Set<CacheGroupContext> filteredCacheGroups;

            public CachesFiltering(Collection<CacheGroupContext> cacheGroups) {
                this.filteredCacheGroups = new HashSet<CacheGroupContext>(cacheGroups);
            }

            public CachesFiltering filter(IgniteInClosure<Set<CacheGroupContext>> closure) {
                closure.apply(this.filteredCacheGroups);
                return this;
            }

            public Set<Integer> result() {
                HashSet<Integer> res = new HashSet<Integer>();
                for (CacheGroupContext cacheGrp : this.filteredCacheGroups) {
                    res.add(cacheGrp.groupId());
                }
                return res;
            }
        }
    }
}

