/*
 * Decompiled with CFR 0.152.
 */
package org.heigit.bigspatialdata.oshdb.api.mapreducer.backend;

import com.google.common.collect.Streams;
import com.google.common.primitives.Ints;
import java.io.IOException;
import java.io.Serializable;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteFutureTimeoutException;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.heigit.bigspatialdata.oshdb.api.db.OSHDBDatabase;
import org.heigit.bigspatialdata.oshdb.api.db.OSHDBIgnite;
import org.heigit.bigspatialdata.oshdb.api.generic.function.SerializableBiFunction;
import org.heigit.bigspatialdata.oshdb.api.generic.function.SerializableBinaryOperator;
import org.heigit.bigspatialdata.oshdb.api.generic.function.SerializableFunction;
import org.heigit.bigspatialdata.oshdb.api.generic.function.SerializableSupplier;
import org.heigit.bigspatialdata.oshdb.api.mapreducer.MapReducer;
import org.heigit.bigspatialdata.oshdb.api.mapreducer.backend.Kernels;
import org.heigit.bigspatialdata.oshdb.api.mapreducer.backend.MapReducerIgniteScanQuery;
import org.heigit.bigspatialdata.oshdb.api.object.OSHDBMapReducible;
import org.heigit.bigspatialdata.oshdb.api.object.OSMContribution;
import org.heigit.bigspatialdata.oshdb.api.object.OSMEntitySnapshot;
import org.heigit.bigspatialdata.oshdb.grid.GridOSHEntity;
import org.heigit.bigspatialdata.oshdb.index.XYGridTree;
import org.heigit.bigspatialdata.oshdb.osm.OSMType;
import org.heigit.bigspatialdata.oshdb.util.CellId;
import org.heigit.bigspatialdata.oshdb.util.TableNames;
import org.heigit.bigspatialdata.oshdb.util.celliterator.CellIterator;
import org.heigit.bigspatialdata.oshdb.util.exceptions.OSHDBTimeoutException;
import org.jetbrains.annotations.NotNull;
import org.json.simple.parser.ParseException;

public class MapReducerIgniteAffinityCall<X>
extends MapReducer<X>
implements Kernels.CancelableProcessStatus {
    private long executionStartTimeMillis;

    public MapReducerIgniteAffinityCall(OSHDBDatabase oshdb, Class<? extends OSHDBMapReducible> forClass) {
        super(oshdb, forClass);
    }

    private MapReducerIgniteAffinityCall(MapReducerIgniteAffinityCall obj) {
        super(obj);
    }

    @Override
    @NotNull
    protected MapReducer<X> copy() {
        return new MapReducerIgniteAffinityCall<X>(this);
    }

    @Override
    public boolean isCancelable() {
        return true;
    }

    @Override
    public boolean isActive() {
        if (this.timeout != null && System.currentTimeMillis() - this.executionStartTimeMillis > this.timeout) {
            throw new OSHDBTimeoutException();
        }
        return true;
    }

    @Nonnull
    private static SerializableFunction<XYGridTree.CellIdRange, LongStream> cellIdRangeToCellIds() {
        return cellIdRange -> {
            int level = cellIdRange.getStart().getZoomLevel();
            long from = CellId.getLevelId((int)level, (long)cellIdRange.getStart().getId());
            long to = CellId.getLevelId((int)level, (long)cellIdRange.getEnd().getId());
            return LongStream.rangeClosed(from, to);
        };
    }

    private static <T> T asyncGetHandleTimeouts(IgniteFuture<T> async, Long timeout) throws OSHDBTimeoutException {
        try {
            if (timeout == null) {
                return (T)async.get();
            }
            return (T)async.get(timeout.longValue());
        }
        catch (IgniteFutureTimeoutException e) {
            throw new OSHDBTimeoutException();
        }
        catch (IgniteException e) {
            if (e.getCause().getCause() instanceof OSHDBTimeoutException) {
                throw (OSHDBTimeoutException)e.getCause().getCause();
            }
            throw e;
        }
    }

    private <S> S reduce(Kernels.CellProcessor<S> cellProcessor, SerializableSupplier<S> identitySupplier, SerializableBinaryOperator<S> combiner) throws ParseException, SQLException, IOException {
        this.executionStartTimeMillis = System.currentTimeMillis();
        CellIterator cellIterator = new CellIterator(this.tstamps.get(), this.bboxFilter, this.getPolyFilter(), this.getTagInterpreter(), this.getPreFilter(), this.getFilter(), false);
        Iterable<XYGridTree.CellIdRange> cellIdRanges = this.getCellIdRanges();
        OSHDBIgnite oshdb = (OSHDBIgnite)this.oshdb;
        Ignite ignite = oshdb.getIgnite();
        IgniteCompute compute = ignite.compute();
        IgniteRunnable onClose = oshdb.onClose().orElse((IgniteRunnable & Serializable)() -> {});
        return (S)this.typeFilter.stream().map((? super T osmType) -> {
            assert (TableNames.forOSMType((OSMType)osmType).isPresent());
            String cacheName = ((TableNames)TableNames.forOSMType((OSMType)osmType).get()).toString(this.oshdb.prefix());
            IgniteCache cache = ignite.cache(cacheName);
            return Streams.stream((Iterable)cellIdRanges).flatMapToLong(MapReducerIgniteAffinityCall.cellIdRangeToCellIds()).parallel().filter((long ignored) -> this.isActive()).mapToObj(cellLongId -> MapReducerIgniteAffinityCall.asyncGetHandleTimeouts(compute.affinityCallAsync(cacheName, (Object)cellLongId, (IgniteCallable & Serializable)() -> {
                GridOSHEntity oshEntityCell = (GridOSHEntity)cache.localPeek((Object)cellLongId, new CachePeekMode[0]);
                Object ret = oshEntityCell == null ? identitySupplier.get() : cellProcessor.apply(oshEntityCell, cellIterator);
                onClose.run();
                return ret;
            }), this.timeout)).reduce(identitySupplier.get(), combiner);
        }).reduce(identitySupplier.get(), combiner);
    }

    private Stream<X> stream(Kernels.CellProcessor<Stream<X>> cellProcessor) throws ParseException, SQLException, IOException {
        this.executionStartTimeMillis = System.currentTimeMillis();
        CellIterator cellIterator = new CellIterator(this.tstamps.get(), this.bboxFilter, this.getPolyFilter(), this.getTagInterpreter(), this.getPreFilter(), this.getFilter(), false);
        Iterable<XYGridTree.CellIdRange> cellIdRanges = this.getCellIdRanges();
        OSHDBIgnite oshdb = (OSHDBIgnite)this.oshdb;
        Ignite ignite = oshdb.getIgnite();
        IgniteCompute compute = ignite.compute();
        IgniteRunnable onClose = oshdb.onClose().orElse((IgniteRunnable & Serializable)() -> {});
        Stream result = Stream.empty();
        for (OSMType osmType : this.typeFilter) {
            assert (TableNames.forOSMType((OSMType)osmType).isPresent());
            String cacheName = ((TableNames)TableNames.forOSMType((OSMType)osmType).get()).toString(this.oshdb.prefix());
            IgniteCache cache = ignite.cache(cacheName);
            int maxNumCells = 0;
            for (XYGridTree.CellIdRange cellIdRange : cellIdRanges) {
                maxNumCells = (int)((long)maxNumCells + (cellIdRange.getEnd().getId() - cellIdRange.getStart().getId()));
            }
            GetMatchingKeysPreflight preflight = maxNumCells > cache.size(new CachePeekMode[0]) ? new GetMatchingKeysPreflightScanQuery(cacheName, MapReducerIgniteAffinityCall.cellIdRangeToCellIds(), cellIdRanges, cellProcessor, cellIterator) : new GetMatchingKeysPreflightLocalPeek(cacheName, MapReducerIgniteAffinityCall.cellIdRangeToCellIds(), cellIdRanges, cellProcessor, cellIterator);
            List cellsWithData = ((Collection)MapReducerIgniteAffinityCall.asyncGetHandleTimeouts(compute.broadcastAsync((IgniteCallable)preflight), this.timeout)).stream().flatMap(Collection::stream).collect(Collectors.toList());
            Collections.shuffle(cellsWithData);
            Stream resultForType = cellsWithData.parallelStream().filter((? super T ignored) -> this.isActive()).map((? super T cellLongId) -> (Collection)MapReducerIgniteAffinityCall.asyncGetHandleTimeouts(compute.affinityCallAsync(cacheName, cellLongId, (IgniteCallable & Serializable)() -> {
                GridOSHEntity oshEntityCell = (GridOSHEntity)cache.localPeek(cellLongId, new CachePeekMode[0]);
                Collection<Object> ret = oshEntityCell == null ? Collections.emptyList() : (Collection)((Stream)cellProcessor.apply(oshEntityCell, cellIterator)).collect(Collectors.toList());
                onClose.run();
                return ret;
            }), this.timeout)).flatMap(Collection::stream);
            result = Stream.concat(result, resultForType);
        }
        return result;
    }

    @Override
    protected <R, S> S mapReduceCellsOSMContribution(SerializableFunction<OSMContribution, R> mapper, SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner) throws Exception {
        return this.reduce(Kernels.getOSMContributionCellReducer(mapper, identitySupplier, accumulator, this), identitySupplier, combiner);
    }

    @Override
    protected <R, S> S flatMapReduceCellsOSMContributionGroupedById(SerializableFunction<List<OSMContribution>, Iterable<R>> mapper, SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner) throws Exception {
        return this.reduce(Kernels.getOSMContributionGroupingCellReducer(mapper, identitySupplier, accumulator, this), identitySupplier, combiner);
    }

    @Override
    protected <R, S> S mapReduceCellsOSMEntitySnapshot(SerializableFunction<OSMEntitySnapshot, R> mapper, SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner) throws Exception {
        return this.reduce(Kernels.getOSMEntitySnapshotCellReducer(mapper, identitySupplier, accumulator, this), identitySupplier, combiner);
    }

    @Override
    protected <R, S> S flatMapReduceCellsOSMEntitySnapshotGroupedById(SerializableFunction<List<OSMEntitySnapshot>, Iterable<R>> mapper, SerializableSupplier<S> identitySupplier, SerializableBiFunction<S, R, S> accumulator, SerializableBinaryOperator<S> combiner) throws Exception {
        return this.reduce(Kernels.getOSMEntitySnapshotGroupingCellReducer(mapper, identitySupplier, accumulator, this), identitySupplier, combiner);
    }

    @Override
    protected Stream<X> mapStreamCellsOSMContribution(SerializableFunction<OSMContribution, X> mapper) throws Exception {
        return this.stream(Kernels.getOSMContributionCellStreamer(mapper, this));
    }

    @Override
    protected Stream<X> flatMapStreamCellsOSMContributionGroupedById(SerializableFunction<List<OSMContribution>, Iterable<X>> mapper) throws Exception {
        return this.stream(Kernels.getOSMContributionGroupingCellStreamer(mapper, this));
    }

    @Override
    protected Stream<X> mapStreamCellsOSMEntitySnapshot(SerializableFunction<OSMEntitySnapshot, X> mapper) throws Exception {
        return this.stream(Kernels.getOSMEntitySnapshotCellStreamer(mapper, this));
    }

    @Override
    protected Stream<X> flatMapStreamCellsOSMEntitySnapshotGroupedById(SerializableFunction<List<OSMEntitySnapshot>, Iterable<X>> mapper) throws Exception {
        return this.stream(Kernels.getOSMEntitySnapshotGroupingCellStreamer(mapper, this));
    }

    static class GetMatchingKeysPreflightScanQuery
    extends GetMatchingKeysPreflight {
        private final Map<Integer, TreeMap<Long, XYGridTree.CellIdRange>> cellIdRangesByLevel = new HashMap<Integer, TreeMap<Long, XYGridTree.CellIdRange>>();

        GetMatchingKeysPreflightScanQuery(String cacheName, Function<XYGridTree.CellIdRange, LongStream> cellIdRangeToCellIds, Iterable<XYGridTree.CellIdRange> cellIdRanges, Kernels.CellProcessor<? extends Stream<?>> cellProcessor, CellIterator cellIterator) {
            super(cacheName, cellIdRangeToCellIds, cellIdRanges, cellProcessor, cellIterator);
            for (XYGridTree.CellIdRange cellIdRange : cellIdRanges) {
                int level = cellIdRange.getStart().getZoomLevel();
                if (!this.cellIdRangesByLevel.containsKey(level)) {
                    this.cellIdRangesByLevel.put(level, new TreeMap());
                }
                this.cellIdRangesByLevel.get(level).put(cellIdRange.getStart().getId(), cellIdRange);
            }
        }

        public Collection<Long> call() {
            IgniteCache localCache = this.ignite.cache(this.cacheName).withKeepBinary();
            List myPartitions = Ints.asList((int[])this.ignite.affinity(this.cacheName).primaryPartitions(this.ignite.cluster().localNode()));
            Collections.shuffle(myPartitions);
            return myPartitions.parallelStream().map(part -> {
                try (QueryCursor cursor = localCache.query((Query)new ScanQuery((IgniteBiPredicate & Serializable)(key, cell) -> MapReducerIgniteScanQuery.cellKeyInRange(key, this.cellIdRangesByLevel)).setPartition(part), (IgniteClosure & Serializable)cacheEntry -> {
                    Object data = cacheEntry.getValue();
                    GridOSHEntity oshEntityCell = data instanceof BinaryObject ? (GridOSHEntity)((BinaryObject)data).deserialize() : (GridOSHEntity)data;
                    Stream cellStream = (Stream)this.cellProcessor.apply(oshEntityCell, this.cellIterator);
                    if (cellStream.anyMatch(ignored -> true)) {
                        return Optional.of(cacheEntry.getKey());
                    }
                    return Optional.empty();
                });){
                    LinkedList acc = new LinkedList();
                    for (Optional entry : cursor) {
                        entry.ifPresent(acc::add);
                    }
                    LinkedList linkedList = acc;
                    return linkedList;
                }
            }).flatMap(Collection::stream).collect(Collectors.toList());
        }
    }

    static class GetMatchingKeysPreflightLocalPeek
    extends GetMatchingKeysPreflight {
        GetMatchingKeysPreflightLocalPeek(String cacheName, Function<XYGridTree.CellIdRange, LongStream> cellIdRangeToCellIds, Iterable<XYGridTree.CellIdRange> cellIdRanges, Kernels.CellProcessor<? extends Stream<?>> cellProcessor, CellIterator cellIterator) {
            super(cacheName, cellIdRangeToCellIds, cellIdRanges, cellProcessor, cellIterator);
        }

        public Collection<Long> call() {
            IgniteCache localCache = this.ignite.cache(this.cacheName);
            return Streams.stream((Iterable)this.cellIdRanges).flatMapToLong(this.cellIdRangeToCellIds).parallel().filter(cellLongId -> {
                GridOSHEntity cell = (GridOSHEntity)localCache.localPeek((Object)cellLongId, new CachePeekMode[0]);
                return cell != null && ((Stream)this.cellProcessor.apply(cell, this.cellIterator)).anyMatch(ignored -> true);
            }).boxed().collect(Collectors.toList());
        }
    }

    static abstract class GetMatchingKeysPreflight
    implements IgniteCallable<Collection<Long>> {
        @IgniteInstanceResource
        Ignite ignite;
        final String cacheName;
        final Function<XYGridTree.CellIdRange, LongStream> cellIdRangeToCellIds;
        final Iterable<XYGridTree.CellIdRange> cellIdRanges;
        final Kernels.CellProcessor<? extends Stream<?>> cellProcessor;
        final CellIterator cellIterator;

        private GetMatchingKeysPreflight() {
            throw new IllegalStateException("utility class");
        }

        GetMatchingKeysPreflight(String cacheName, Function<XYGridTree.CellIdRange, LongStream> cellIdRangeToCellIds, Iterable<XYGridTree.CellIdRange> cellIdRanges, Kernels.CellProcessor<? extends Stream<?>> cellProcessor, CellIterator cellIterator) {
            this.cacheName = cacheName;
            this.cellIdRangeToCellIds = cellIdRangeToCellIds;
            this.cellIdRanges = cellIdRanges;
            this.cellProcessor = cellProcessor;
            this.cellIterator = cellIterator;
        }
    }
}

