/*
 * Decompiled with CFR 0.152.
 */
package org.heigit.bigspatialdata.oshdb.api.mapreducer.backend;

import com.google.common.collect.Streams;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.compute.ComputeTaskTimeoutException;
import org.apache.ignite.lang.IgniteFutureTimeoutException;
import org.apache.ignite.lang.IgniteRunnable;
import org.heigit.bigspatialdata.oshdb.api.db.OSHDBDatabase;
import org.heigit.bigspatialdata.oshdb.api.db.OSHDBIgnite;
import org.heigit.bigspatialdata.oshdb.api.generic.function.SerializableBiFunction;
import org.heigit.bigspatialdata.oshdb.api.generic.function.SerializableBinaryOperator;
import org.heigit.bigspatialdata.oshdb.api.generic.function.SerializableFunction;
import org.heigit.bigspatialdata.oshdb.api.generic.function.SerializableSupplier;
import org.heigit.bigspatialdata.oshdb.api.mapreducer.MapReducer;
import org.heigit.bigspatialdata.oshdb.api.mapreducer.backend.Kernels;
import org.heigit.bigspatialdata.oshdb.api.mapreducer.backend.OSHDBIgniteMapReduceComputeTask;
import org.heigit.bigspatialdata.oshdb.api.object.OSHDBMapReducible;
import org.heigit.bigspatialdata.oshdb.api.object.OSMContribution;
import org.heigit.bigspatialdata.oshdb.api.object.OSMEntitySnapshot;
import org.heigit.bigspatialdata.oshdb.grid.GridOSHEntity;
import org.heigit.bigspatialdata.oshdb.index.XYGridTree;
import org.heigit.bigspatialdata.oshdb.util.CellId;
import org.heigit.bigspatialdata.oshdb.util.OSHDBBoundingBox;
import org.heigit.bigspatialdata.oshdb.util.OSHDBTimestamp;
import org.heigit.bigspatialdata.oshdb.util.TableNames;
import org.heigit.bigspatialdata.oshdb.util.celliterator.CellIterator;
import org.heigit.bigspatialdata.oshdb.util.exceptions.OSHDBTimeoutException;
import org.heigit.bigspatialdata.oshdb.util.taginterpreter.TagInterpreter;
import org.jetbrains.annotations.NotNull;
import org.locationtech.jts.geom.Geometry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MapReducerIgniteLocalPeek<X>
extends MapReducer<X> {
    public MapReducerIgniteLocalPeek(OSHDBDatabase oshdb, Class<? extends OSHDBMapReducible> forClass) {
        super(oshdb, forClass);
    }

    private MapReducerIgniteLocalPeek(MapReducerIgniteLocalPeek obj) {
        super(obj);
    }

    @Override
    @NotNull
    protected MapReducer<X> copy() {
        return new MapReducerIgniteLocalPeek<X>(this);
    }

    private List<String> cacheNames(String prefix) {
        return this.typeFilter.stream().map(TableNames::forOSMType).filter(Optional::isPresent).map(Optional::get).map((? super T tn) -> tn.toString(prefix)).collect(Collectors.toList());
    }

    @Override
    public boolean isCancelable() {
        return true;
    }

    @Override
    protected <R, S> S mapReduceCellsOSMContribution(SerializableFunction<OSMContribution, R> mapper, SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner) throws Exception {
        return MapReducerIgniteLocalPeek.mapReduceOnIgniteCache((OSHDBIgnite)this.oshdb, identitySupplier, combiner, new MapReduceCellsOSMContributionOnIgniteCacheComputeJob(this.getTagInterpreter(), this.cacheNames(this.oshdb.prefix()), this.getCellIdRanges(), (SortedSet<OSHDBTimestamp>)this.tstamps.get(), this.bboxFilter, this.getPolyFilter(), this.getPreFilter(), this.getFilter(), mapper, identitySupplier, accumulator, combiner));
    }

    @Override
    protected <R, S> S flatMapReduceCellsOSMContributionGroupedById(SerializableFunction<List<OSMContribution>, Iterable<R>> mapper, SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner) throws Exception {
        return MapReducerIgniteLocalPeek.mapReduceOnIgniteCache((OSHDBIgnite)this.oshdb, identitySupplier, combiner, new FlatMapReduceCellsOSMContributionOnIgniteCacheComputeJob(this.getTagInterpreter(), this.cacheNames(this.oshdb.prefix()), this.getCellIdRanges(), (SortedSet<OSHDBTimestamp>)this.tstamps.get(), this.bboxFilter, this.getPolyFilter(), this.getPreFilter(), this.getFilter(), mapper, identitySupplier, accumulator, combiner));
    }

    @Override
    protected <R, S> S mapReduceCellsOSMEntitySnapshot(SerializableFunction<OSMEntitySnapshot, R> mapper, SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner) throws Exception {
        return MapReducerIgniteLocalPeek.mapReduceOnIgniteCache((OSHDBIgnite)this.oshdb, identitySupplier, combiner, new MapReduceCellsOSMEntitySnapshotOnIgniteCacheComputeJob(this.getTagInterpreter(), this.cacheNames(this.oshdb.prefix()), this.getCellIdRanges(), (SortedSet<OSHDBTimestamp>)this.tstamps.get(), this.bboxFilter, this.getPolyFilter(), this.getPreFilter(), this.getFilter(), mapper, identitySupplier, accumulator, combiner));
    }

    @Override
    protected <R, S> S flatMapReduceCellsOSMEntitySnapshotGroupedById(SerializableFunction<List<OSMEntitySnapshot>, Iterable<R>> mapper, SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner) throws Exception {
        return MapReducerIgniteLocalPeek.mapReduceOnIgniteCache((OSHDBIgnite)this.oshdb, identitySupplier, combiner, new FlatMapReduceCellsOSMEntitySnapshotOnIgniteCacheComputeJob(this.getTagInterpreter(), this.cacheNames(this.oshdb.prefix()), this.getCellIdRanges(), (SortedSet<OSHDBTimestamp>)this.tstamps.get(), this.bboxFilter, this.getPolyFilter(), this.getPreFilter(), this.getFilter(), mapper, identitySupplier, accumulator, combiner));
    }

    private static <V, R, M, S, P extends Geometry> S mapReduceOnIgniteCache(OSHDBIgnite oshdb, SerializableSupplier<S> identitySupplier, SerializableBinaryOperator<S> combiner, MapReduceCellsOnIgniteCacheComputeJob<V, R, M, S, P> computeJob) {
        Ignite ignite = oshdb.getIgnite();
        IgniteCompute compute = ignite.compute();
        ComputeTaskFuture asyncResult = compute.executeAsync(new OSHDBIgniteMapReduceComputeTask(computeJob, identitySupplier, combiner, oshdb.onClose().orElse((IgniteRunnable & Serializable)() -> {})), null);
        if (!oshdb.timeoutInMilliseconds().isPresent()) {
            return (S)asyncResult.get();
        }
        try {
            return (S)asyncResult.get(oshdb.timeoutInMilliseconds().getAsLong());
        }
        catch (ComputeTaskTimeoutException | IgniteFutureTimeoutException e) {
            asyncResult.cancel();
            throw new OSHDBTimeoutException();
        }
    }

    private static class FlatMapReduceCellsOSMEntitySnapshotOnIgniteCacheComputeJob<R, S, P extends Geometry>
    extends MapReduceCellsOnIgniteCacheComputeJob<List<OSMEntitySnapshot>, R, Iterable<R>, S, P> {
        FlatMapReduceCellsOSMEntitySnapshotOnIgniteCacheComputeJob(TagInterpreter tagInterpreter, List<String> cacheNames, Iterable<XYGridTree.CellIdRange> cellIdRanges, SortedSet<OSHDBTimestamp> tstamps, OSHDBBoundingBox bbox, P poly, CellIterator.OSHEntityFilter preFilter, CellIterator.OSMEntityFilter filter, SerializableFunction<List<OSMEntitySnapshot>, Iterable<R>> mapper, SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner) {
            super(tagInterpreter, cacheNames, cellIdRanges, tstamps, bbox, poly, preFilter, filter, mapper, identitySupplier, accumulator, combiner);
        }

        @Override
        public S execute(Ignite node) {
            return super.execute(node, Kernels.getOSMEntitySnapshotGroupingCellReducer(this.mapper, this.identitySupplier, this.accumulator, this));
        }
    }

    private static class MapReduceCellsOSMEntitySnapshotOnIgniteCacheComputeJob<R, S, P extends Geometry>
    extends MapReduceCellsOnIgniteCacheComputeJob<OSMEntitySnapshot, R, R, S, P> {
        MapReduceCellsOSMEntitySnapshotOnIgniteCacheComputeJob(TagInterpreter tagInterpreter, List<String> cacheNames, Iterable<XYGridTree.CellIdRange> cellIdRanges, SortedSet<OSHDBTimestamp> tstamps, OSHDBBoundingBox bbox, P poly, CellIterator.OSHEntityFilter preFilter, CellIterator.OSMEntityFilter filter, SerializableFunction<OSMEntitySnapshot, R> mapper, SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner) {
            super(tagInterpreter, cacheNames, cellIdRanges, tstamps, bbox, poly, preFilter, filter, mapper, identitySupplier, accumulator, combiner);
        }

        @Override
        public S execute(Ignite node) {
            return super.execute(node, Kernels.getOSMEntitySnapshotCellReducer(this.mapper, this.identitySupplier, this.accumulator, this));
        }
    }

    private static class FlatMapReduceCellsOSMContributionOnIgniteCacheComputeJob<R, S, P extends Geometry>
    extends MapReduceCellsOnIgniteCacheComputeJob<List<OSMContribution>, R, Iterable<R>, S, P> {
        FlatMapReduceCellsOSMContributionOnIgniteCacheComputeJob(TagInterpreter tagInterpreter, List<String> cacheNames, Iterable<XYGridTree.CellIdRange> cellIdRanges, SortedSet<OSHDBTimestamp> tstamps, OSHDBBoundingBox bbox, P poly, CellIterator.OSHEntityFilter preFilter, CellIterator.OSMEntityFilter filter, SerializableFunction<List<OSMContribution>, Iterable<R>> mapper, SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner) {
            super(tagInterpreter, cacheNames, cellIdRanges, tstamps, bbox, poly, preFilter, filter, mapper, identitySupplier, accumulator, combiner);
        }

        @Override
        public S execute(Ignite node) {
            return super.execute(node, Kernels.getOSMContributionGroupingCellReducer(this.mapper, this.identitySupplier, this.accumulator, this));
        }
    }

    private static class MapReduceCellsOSMContributionOnIgniteCacheComputeJob<R, S, P extends Geometry>
    extends MapReduceCellsOnIgniteCacheComputeJob<OSMContribution, R, R, S, P> {
        MapReduceCellsOSMContributionOnIgniteCacheComputeJob(TagInterpreter tagInterpreter, List<String> cacheNames, Iterable<XYGridTree.CellIdRange> cellIdRanges, SortedSet<OSHDBTimestamp> tstamps, OSHDBBoundingBox bbox, P poly, CellIterator.OSHEntityFilter preFilter, CellIterator.OSMEntityFilter filter, SerializableFunction<OSMContribution, R> mapper, SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner) {
            super(tagInterpreter, cacheNames, cellIdRanges, tstamps, bbox, poly, preFilter, filter, mapper, identitySupplier, accumulator, combiner);
        }

        @Override
        public S execute(Ignite node) {
            return super.execute(node, Kernels.getOSMContributionCellReducer(this.mapper, this.identitySupplier, this.accumulator, this));
        }
    }

    private static abstract class MapReduceCellsOnIgniteCacheComputeJob<V, R, M, S, P extends Geometry>
    implements OSHDBIgniteMapReduceComputeTask.CancelableIgniteMapReduceJob<S> {
        private static final Logger LOG = LoggerFactory.getLogger(MapReduceCellsOnIgniteCacheComputeJob.class);
        private boolean notCanceled = true;
        final List<String> cacheNames;
        final Iterable<XYGridTree.CellIdRange> cellIdRanges;
        final OSHDBBoundingBox bbox;
        final CellIterator cellIterator;
        final SerializableFunction<V, M> mapper;
        final SerializableSupplier<S> identitySupplier;
        final SerializableBiFunction<S, R, S> accumulator;
        final SerializableBinaryOperator<S> combiner;

        MapReduceCellsOnIgniteCacheComputeJob(TagInterpreter tagInterpreter, List<String> cacheNames, Iterable<XYGridTree.CellIdRange> cellIdRanges, SortedSet<OSHDBTimestamp> tstamps, OSHDBBoundingBox bbox, P poly, CellIterator.OSHEntityFilter preFilter, CellIterator.OSMEntityFilter filter, SerializableFunction<V, M> mapper, SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner) {
            this.cacheNames = cacheNames;
            this.cellIdRanges = cellIdRanges;
            this.bbox = bbox;
            this.cellIterator = new CellIterator(tstamps, bbox, poly, tagInterpreter, preFilter, filter, false);
            this.mapper = mapper;
            this.identitySupplier = identitySupplier;
            this.accumulator = accumulator;
            this.combiner = combiner;
        }

        @Override
        public void cancel() {
            LOG.info("compute job canceled");
            this.notCanceled = false;
        }

        @Override
        public boolean isActive() {
            return this.notCanceled;
        }

        S execute(Ignite node, Kernels.CellProcessor<S> cellProcessor) {
            Set caches = this.cacheNames.stream().map(arg_0 -> ((Ignite)node).cache(arg_0)).collect(Collectors.toSet());
            return (S)((Stream)Streams.stream((Iterator)new CellKeysIterator(this.cellIdRanges)).parallel()).filter(ignored -> this.isActive()).flatMap(cellKey -> caches.stream().filter(ignored -> this.isActive()).map(cache -> (GridOSHEntity)cache.localPeek(cellKey, new CachePeekMode[0]))).filter(Objects::nonNull).filter(ignored -> this.isActive()).map(cell -> cellProcessor.apply(cell, this.cellIterator)).reduce(this.identitySupplier.get(), this.combiner);
        }

        private class CellKeysIterator
        implements Iterator<Long> {
            private final Iterator<Long> cellIds;
            ArrayList<Long> buffer;
            final int bufferSize = 102400 * ForkJoinPool.commonPool().getParallelism();

            CellKeysIterator(Iterable<XYGridTree.CellIdRange> cellIdRanges) {
                this.cellIds = Streams.stream(cellIdRanges).filter(ignored -> MapReduceCellsOnIgniteCacheComputeJob.this.isActive()).flatMap(cellIdRange -> {
                    int level = cellIdRange.getStart().getZoomLevel();
                    long fromId = cellIdRange.getStart().getId();
                    long toId = cellIdRange.getEnd().getId();
                    return LongStream.rangeClosed(fromId, toId).map(id -> CellId.getLevelId((int)level, (long)id)).boxed();
                }).iterator();
                this.buffer = new ArrayList(this.bufferSize);
            }

            private void fillBuffer() {
                this.buffer.clear();
                while (this.buffer.size() < this.bufferSize && this.cellIds.hasNext()) {
                    this.buffer.add(this.cellIds.next());
                }
                Collections.shuffle(this.buffer);
            }

            @Override
            public boolean hasNext() {
                if (this.buffer.size() > 0) {
                    return true;
                }
                if (MapReduceCellsOnIgniteCacheComputeJob.this.isActive() && this.cellIds.hasNext()) {
                    this.fillBuffer();
                    return true;
                }
                return false;
            }

            @Override
            public Long next() {
                return this.buffer.remove(this.buffer.size() - 1);
            }
        }
    }
}

