/*
 * Decompiled with CFR 0.152.
 */
package org.heigit.bigspatialdata.oshdb.api.mapreducer.backend;

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.compute.ComputeTaskNoResultCache;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.heigit.bigspatialdata.oshdb.api.generic.function.SerializableBinaryOperator;
import org.heigit.bigspatialdata.oshdb.api.generic.function.SerializableSupplier;
import org.heigit.bigspatialdata.oshdb.api.mapreducer.backend.Kernels;

@ComputeTaskNoResultCache
class OSHDBIgniteMapReduceComputeTask<T, R>
extends ComputeTaskAdapter<T, R>
implements Serializable {
    private final CancelableIgniteMapReduceJob job;
    private final SerializableBinaryOperator<R> combiner;
    private final IgniteRunnable onClose;
    private R resultAccumulator;

    public OSHDBIgniteMapReduceComputeTask(CancelableIgniteMapReduceJob job, SerializableSupplier<R> identitySupplier, SerializableBinaryOperator<R> combiner, IgniteRunnable onClose) {
        this.job = job;
        this.combiner = combiner;
        this.resultAccumulator = identitySupplier.get();
        this.onClose = onClose;
    }

    public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, T arg) throws IgniteException {
        HashMap map = new HashMap(subgrid.size());
        subgrid.forEach(node -> map.put(new ComputeJob(){
            @IgniteInstanceResource
            private Ignite ignite;

            public void cancel() {
                OSHDBIgniteMapReduceComputeTask.this.job.cancel();
            }

            public Object execute() throws IgniteException {
                Object result = OSHDBIgniteMapReduceComputeTask.this.job.execute(this.ignite);
                OSHDBIgniteMapReduceComputeTask.this.onClose.run();
                return result;
            }
        }, node));
        return map;
    }

    public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
        Object data = res.getData();
        this.resultAccumulator = this.combiner.apply(this.resultAccumulator, data);
        return ComputeJobResultPolicy.WAIT;
    }

    public R reduce(List<ComputeJobResult> results) throws IgniteException {
        return this.resultAccumulator;
    }

    static interface CancelableIgniteMapReduceJob<S>
    extends Serializable,
    Kernels.CancelableProcessStatus {
        public void cancel();

        public S execute(Ignite var1);
    }
}

