/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.gds.core.concurrency;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import java.util.stream.BaseStream;
import java.util.stream.LongStream;
import org.jetbrains.annotations.Nullable;
import org.neo4j.gds.api.BatchNodeIterable;
import org.neo4j.gds.api.Graph;
import org.neo4j.gds.core.concurrency.Pools;
import org.neo4j.gds.core.concurrency.RunWithConcurrency;
import org.neo4j.gds.core.loading.HugeParallelGraphImporter;
import org.neo4j.gds.core.utils.BiLongConsumer;
import org.neo4j.gds.core.utils.LazyMappingCollection;
import org.neo4j.gds.core.utils.TerminationFlag;
import org.neo4j.gds.core.utils.collection.primitive.PrimitiveLongIterable;
import org.neo4j.gds.mem.BitUtil;
import org.neo4j.gds.utils.ExceptionUtil;
import org.neo4j.gds.utils.StringFormatting;

public final class ParallelUtil {
    public static final int DEFAULT_BATCH_SIZE = 10000;

    private ParallelUtil() {
    }

    public static <T extends BaseStream<?, T>, R> R parallelStream(T data, int concurrency, Function<T, R> fn) {
        ForkJoinPool pool = Pools.createForkJoinPool(concurrency);
        try {
            Object v = ((ForkJoinTask)pool.submit(() -> fn.apply(data.parallel()))).get();
            return (R)v;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
        finally {
            pool.shutdown();
        }
    }

    public static <T extends BaseStream<?, T>> void parallelStreamConsume(T data, int concurrency, TerminationFlag terminationFlag, Consumer<T> consumer) {
        ParallelUtil.parallelStream(data, concurrency, t -> {
            terminationFlag.assertRunning();
            consumer.accept(t);
            return null;
        });
    }

    public static <T extends BaseStream<?, T>> void parallelStreamConsume(T data, int concurrency, Consumer<T> consumer) {
        ParallelUtil.parallelStreamConsume(data, concurrency, TerminationFlag.RUNNING_TRUE, consumer);
    }

    public static void parallelForEachNode(Graph graph, int concurrency, LongConsumer consumer) {
        ParallelUtil.parallelForEachNode(graph.nodeCount(), concurrency, TerminationFlag.RUNNING_TRUE, consumer);
    }

    public static void parallelForEachNode(long nodeCount, int concurrency, LongConsumer consumer) {
        ParallelUtil.parallelForEachNode(nodeCount, concurrency, TerminationFlag.RUNNING_TRUE, consumer);
    }

    public static void parallelForEachNode(long nodeCount, int concurrency, TerminationFlag terminationFlag, LongConsumer consumer) {
        ParallelUtil.parallelStreamConsume(LongStream.range(0L, nodeCount), concurrency, terminationFlag, stream -> stream.forEach(consumer));
    }

    public static int threadCount(int batchSize, int elementCount) {
        return Math.toIntExact(ParallelUtil.threadCount((long)batchSize, (long)elementCount));
    }

    public static long threadCount(long batchSize, long elementCount) {
        if (batchSize <= 0L) {
            throw new IllegalArgumentException("Invalid batch size: " + batchSize);
        }
        if (batchSize >= elementCount) {
            return 1L;
        }
        return BitUtil.ceilDiv((long)elementCount, (long)batchSize);
    }

    public static int adjustedBatchSize(int nodeCount, int concurrency, int minBatchSize) {
        if (concurrency <= 0) {
            concurrency = nodeCount;
        }
        int targetBatchSize = ParallelUtil.threadCount(concurrency, nodeCount);
        return Math.max(minBatchSize, targetBatchSize);
    }

    public static int adjustedBatchSize(int nodeCount, int concurrency) {
        return ParallelUtil.adjustedBatchSize(nodeCount, concurrency, 10000);
    }

    public static long adjustedBatchSize(long nodeCount, int concurrency, long minBatchSize) {
        if (concurrency <= 0) {
            concurrency = (int)Math.min(nodeCount, Integer.MAX_VALUE);
        }
        long targetBatchSize = ParallelUtil.threadCount((long)concurrency, nodeCount);
        return Math.max(minBatchSize, targetBatchSize);
    }

    public static long adjustedBatchSize(long nodeCount, int concurrency, long minBatchSize, long maxBatchSize) {
        return Math.min(maxBatchSize, ParallelUtil.adjustedBatchSize(nodeCount, concurrency, minBatchSize));
    }

    public static long adjustedBatchSize(long nodeCount, long batchSize) {
        if (batchSize <= 0L) {
            batchSize = 1L;
        }
        batchSize = BitUtil.nextHighestPowerOfTwo((long)batchSize);
        while ((nodeCount + batchSize + 1L) / batchSize > Integer.MAX_VALUE) {
            batchSize <<= 1;
        }
        return batchSize;
    }

    public static boolean canRunInParallel(@Nullable ExecutorService executor) {
        return executor != null && !executor.isShutdown() && !executor.isTerminated();
    }

    @Deprecated(forRemoval=true)
    public static <T extends Runnable> void readParallel(int concurrency, int batchSize, BatchNodeIterable idMap, ExecutorService executor, HugeParallelGraphImporter<T> importer) {
        Collection<PrimitiveLongIterable> iterators = idMap.batchIterables(batchSize);
        int threads = iterators.size();
        if (!ParallelUtil.canRunInParallel(executor) || threads == 1) {
            long nodeOffset = 0L;
            for (PrimitiveLongIterable iterator : iterators) {
                T task = importer.newImporter(nodeOffset, iterator);
                task.run();
                nodeOffset += (long)batchSize;
            }
        } else {
            AtomicLong nodeOffset = new AtomicLong();
            Collection<Runnable> importers = LazyMappingCollection.of(iterators, it -> importer.newImporter(nodeOffset.getAndAdd(batchSize), (PrimitiveLongIterable)it));
            RunWithConcurrency.builder().concurrency(concurrency).tasks(importers).executor(executor).run();
        }
    }

    public static void readParallel(int concurrency, long size, ExecutorService executor, BiLongConsumer task) {
        long batchSize = ParallelUtil.threadCount((long)concurrency, size);
        if (!ParallelUtil.canRunInParallel(executor) || concurrency == 1) {
            for (long start = 0L; start < size; start += batchSize) {
                long end = Math.min(size, start + batchSize);
                task.apply(start, end);
            }
        } else {
            ArrayList<Runnable> threads = new ArrayList<Runnable>(concurrency);
            for (long start = 0L; start < size; start += batchSize) {
                long end = Math.min(size, start + batchSize);
                long finalStart = start;
                threads.add(() -> task.apply(finalStart, end));
            }
            ParallelUtil.run(threads, executor);
        }
    }

    public static Collection<Runnable> tasks(int concurrency, Supplier<? extends Runnable> newTask) {
        ArrayList<Runnable> tasks = new ArrayList<Runnable>();
        for (int i = 0; i < concurrency; ++i) {
            tasks.add(newTask.get());
        }
        return tasks;
    }

    public static Collection<Runnable> tasks(int concurrency, Function<Integer, ? extends Runnable> newTask) {
        ArrayList<Runnable> tasks = new ArrayList<Runnable>();
        for (int i = 0; i < concurrency; ++i) {
            tasks.add(newTask.apply(i));
        }
        return tasks;
    }

    public static void run(Runnable task, ExecutorService executor) {
        ParallelUtil.awaitTermination(Collections.singleton(executor.submit(task)));
    }

    public static void run(Collection<? extends Runnable> tasks, ExecutorService executor) {
        ParallelUtil.run(tasks, executor, null);
    }

    public static void run(Collection<? extends Runnable> tasks, ExecutorService executor, Collection<Future<?>> futures) {
        ParallelUtil.awaitTermination(ParallelUtil.run(tasks, true, executor, futures));
    }

    public static Collection<Future<?>> run(Collection<? extends Runnable> tasks, boolean allowSynchronousRun, ExecutorService executor, Collection<Future<?>> futures) {
        boolean noExecutor;
        boolean bl = noExecutor = !ParallelUtil.canRunInParallel(executor);
        if (allowSynchronousRun && (tasks.size() == 1 || noExecutor)) {
            tasks.forEach(Runnable::run);
            return Collections.emptyList();
        }
        if (noExecutor) {
            throw new IllegalStateException("No running executor provided and synchronous execution is not allowed");
        }
        if (futures == null) {
            futures = new ArrayList(tasks.size());
        } else {
            futures.clear();
        }
        for (Runnable runnable : tasks) {
            futures.add(executor.submit(runnable));
        }
        return futures;
    }

    static void runWithConcurrency(RunWithConcurrency params) {
        ParallelUtil.runWithConcurrency(params.concurrency(), params.tasks(), params.forceUsageOfExecutor(), params.waitNanos(), params.maxWaitRetries(), params.mayInterruptIfRunning(), params.terminationFlag(), params.executor());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void runWithConcurrency(int concurrency, Iterator<? extends Runnable> tasks, boolean forceUsageOfExecutor, long waitNanos, long maxWaitRetries, boolean mayInterruptIfRunning, TerminationFlag terminationFlag, @Nullable ExecutorService executor) {
        if (!ParallelUtil.canRunInParallel(executor) || concurrency <= 1 && !forceUsageOfExecutor) {
            while (tasks.hasNext()) {
                Runnable task = tasks.next();
                terminationFlag.assertRunning();
                task.run();
            }
            return;
        }
        CompletionService completionService = new CompletionService(executor, concurrency);
        PushbackIterator<Runnable> ts = new PushbackIterator<Runnable>(tasks);
        Throwable error = null;
        try {
            int i = concurrency;
            while (i-- > 0 && terminationFlag.running() && completionService.trySubmit(ts)) {
            }
            terminationFlag.assertRunning();
            int tries = 0;
            while (ts.hasNext()) {
                if (completionService.hasTasks()) {
                    try {
                        if (!completionService.awaitOrFail()) {
                            continue;
                        }
                    }
                    catch (ExecutionException e) {
                        error = ExceptionUtil.chain(error, e.getCause());
                    }
                    catch (CancellationException e) {
                        // empty catch block
                    }
                }
                terminationFlag.assertRunning();
                if (completionService.trySubmit(ts) || completionService.hasTasks()) continue;
                if ((long)(++tries) >= maxWaitRetries) {
                    throw new IllegalThreadStateException(StringFormatting.formatWithLocale((String)"Attempted to submit tasks for %d times with a %d nanosecond delay (%d milliseconds) between each attempt, but ran out of time", (Object[])new Object[]{tries, waitNanos, TimeUnit.NANOSECONDS.toMillis(waitNanos)}));
                }
                LockSupport.parkNanos(waitNanos);
            }
            while (completionService.hasTasks()) {
                terminationFlag.assertRunning();
                try {
                    completionService.awaitOrFail();
                }
                catch (ExecutionException e) {
                    error = ExceptionUtil.chain(error, e.getCause());
                }
                catch (CancellationException cancellationException) {}
            }
        }
        catch (InterruptedException e) {
            try {
                error = error == null ? e : ExceptionUtil.chain(e, error);
                Thread.currentThread().interrupt();
            }
            catch (Throwable throwable) {
                ParallelUtil.finishRunWithConcurrency(mayInterruptIfRunning, completionService, error);
                throw throwable;
            }
            ParallelUtil.finishRunWithConcurrency(mayInterruptIfRunning, completionService, error);
        }
        ParallelUtil.finishRunWithConcurrency(mayInterruptIfRunning, completionService, error);
    }

    private static void finishRunWithConcurrency(boolean mayInterruptIfRunning, CompletionService completionService, @Nullable Throwable error) {
        completionService.cancelAll(mayInterruptIfRunning);
        if (error != null) {
            ExceptionUtil.throwIfUnchecked(error);
            throw new RuntimeException(error);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void awaitTermination(Collection<Future<?>> futures) {
        Throwable error;
        block11: {
            boolean done = false;
            error = null;
            try {
                for (Future<?> future : futures) {
                    try {
                        future.get();
                    }
                    catch (ExecutionException ee) {
                        Throwable cause = ee.getCause();
                        if (error == cause) continue;
                        error = ExceptionUtil.chain(error, cause);
                    }
                    catch (CancellationException cancellationException) {}
                }
                done = true;
            }
            catch (InterruptedException e) {
                error = ExceptionUtil.chain(e, error);
                Thread.currentThread().interrupt();
            }
            finally {
                if (done) break block11;
                for (Future<?> future : futures) {
                    future.cancel(false);
                }
            }
        }
        if (error != null) {
            ExceptionUtil.throwIfUnchecked(error);
            throw new RuntimeException(error);
        }
    }

    private static final class PushbackIterator<T>
    implements Iterator<T> {
        private final Iterator<? extends T> delegate;
        private T pushedElement;

        private PushbackIterator(Iterator<? extends T> delegate) {
            this.delegate = delegate;
        }

        @Override
        public boolean hasNext() {
            return this.pushedElement != null || this.delegate.hasNext();
        }

        @Override
        public T next() {
            T el = this.pushedElement;
            if (el != null) {
                this.pushedElement = null;
            } else {
                el = this.delegate.next();
            }
            return el;
        }

        void pushBack(T element) {
            if (this.pushedElement != null) {
                throw new IllegalArgumentException("Cannot push back twice");
            }
            this.pushedElement = element;
        }
    }

    private static final class CompletionService {
        private static final int AWAIT_TIMEOUT_MILLIS = 100;
        private final Executor executor;
        private final ThreadPoolExecutor pool;
        private final int availableConcurrency;
        private final Set<Future<Void>> running;
        private final BlockingQueue<Future<Void>> completionQueue;

        CompletionService(ExecutorService executor, int targetConcurrency) {
            if (!ParallelUtil.canRunInParallel(executor)) {
                throw new IllegalArgumentException("executor already terminated or not usable");
            }
            if (executor instanceof ThreadPoolExecutor) {
                this.pool = (ThreadPoolExecutor)executor;
                this.availableConcurrency = this.pool.getCorePoolSize();
                int capacity = Math.max(targetConcurrency, this.availableConcurrency) + 1;
                this.completionQueue = new ArrayBlockingQueue<Future<Void>>(capacity);
            } else {
                this.pool = null;
                this.availableConcurrency = Integer.MAX_VALUE;
                this.completionQueue = new LinkedBlockingQueue<Future<Void>>();
            }
            this.executor = executor;
            this.running = Collections.newSetFromMap(new ConcurrentHashMap());
        }

        boolean trySubmit(PushbackIterator<Runnable> tasks) {
            if (tasks.hasNext()) {
                Runnable next = tasks.next();
                if (this.submit(next)) {
                    return true;
                }
                tasks.pushBack(next);
            }
            return false;
        }

        boolean submit(Runnable task) {
            Objects.requireNonNull(task);
            if (this.canSubmit()) {
                this.executor.execute(new QueueingFuture(task));
                return true;
            }
            return false;
        }

        boolean hasTasks() {
            return !this.running.isEmpty() || !this.completionQueue.isEmpty();
        }

        boolean awaitOrFail() throws InterruptedException, ExecutionException {
            Future<Void> task = this.completionQueue.poll(100L, TimeUnit.MILLISECONDS);
            if (task == null) {
                return false;
            }
            task.get();
            return true;
        }

        void cancelAll(boolean mayInterruptIfRunning) {
            this.stopFuturesAndStopScheduling(this.running, mayInterruptIfRunning);
            this.stopFutures(this.completionQueue, mayInterruptIfRunning);
        }

        private boolean canSubmit() {
            return this.pool == null || this.pool.getActiveCount() < this.availableConcurrency;
        }

        private void stopFutures(Collection<Future<Void>> futures, boolean mayInterruptIfRunning) {
            for (Future<Void> future : futures) {
                future.cancel(mayInterruptIfRunning);
            }
            futures.clear();
        }

        private void stopFuturesAndStopScheduling(Collection<Future<Void>> futures, boolean mayInterruptIfRunning) {
            if (this.pool == null) {
                this.stopFutures(futures, mayInterruptIfRunning);
                return;
            }
            for (Future<Void> future : futures) {
                if (future instanceof Runnable) {
                    this.pool.remove((Runnable)((Object)future));
                }
                future.cancel(mayInterruptIfRunning);
            }
            futures.clear();
            this.pool.purge();
        }

        private class QueueingFuture
        extends FutureTask<Void> {
            QueueingFuture(Runnable runnable) {
                super(runnable, null);
                CompletionService.this.running.add(this);
            }

            @Override
            protected void done() {
                if (!this.isCancelled()) {
                    while (!CompletionService.this.completionQueue.offer(this)) {
                    }
                }
                CompletionService.this.running.remove(this);
            }
        }
    }
}

