package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.util.RuntimeAvailableProcessors;
import com.hazelcast.internal.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.internal.util.concurrent.IdleStrategy;
import com.hazelcast.internal.util.counters.Counter;
import com.hazelcast.internal.util.counters.MwCounter;
import com.hazelcast.internal.util.counters.SwCounter;
import com.hazelcast.internal.util.executor.ExecutorType;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.core.metrics.MetricTags;
import com.hazelcast.jet.impl.metrics.MetricsImpl;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.NonCompletableFuture;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.executionservice.ExecutionService;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.spi.properties.HazelcastProperty;
import io.micrometer.core.instrument.binder.BaseUnits;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

/* loaded from: input_file:BOOT-INF/lib/hazelcast-5.0.jar:com/hazelcast/jet/impl/execution/TaskletExecutionService.class */
public class TaskletExecutionService {
    public static final String TASKLET_INIT_CLOSE_EXECUTOR_NAME = "jet:tasklet_initClose";
    private final ExecutionService hzExecutionService;
    private final CooperativeWorker[] cooperativeWorkers;
    private final Thread[] cooperativeThreadPool;
    private final String hzInstanceName;
    private final ILogger logger;
    private int cooperativeThreadIndex;
    private volatile boolean isShutdown;
    private final IdleStrategy idlerCooperative;
    private final IdleStrategy idlerNonCooperative;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ExecutorService blockingTaskletExecutor = Executors.newCachedThreadPool(new BlockingTaskThreadFactory());

    @Probe(name = "blockingWorkerCount")
    private final Counter blockingWorkerCount = MwCounter.newMwCounter();
    private final Object lock = new Object();

    /* loaded from: input_file:BOOT-INF/lib/hazelcast-5.0.jar:com/hazelcast/jet/impl/execution/TaskletExecutionService$BlockingTaskThreadFactory.class */
    private final class BlockingTaskThreadFactory implements ThreadFactory {
        private final AtomicInteger seq;

        private BlockingTaskThreadFactory() {
            this.seq = new AtomicInteger();
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(@Nonnull Runnable runnable) {
            return new Thread(runnable, String.format("hz.%s.jet.blocking.thread-%d", TaskletExecutionService.this.hzInstanceName, Integer.valueOf(this.seq.getAndIncrement())));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/hazelcast-5.0.jar:com/hazelcast/jet/impl/execution/TaskletExecutionService$BlockingWorker.class */
    public final class BlockingWorker implements Runnable {
        private final TaskletTracker tracker;
        private final CountDownLatch startedLatch;

        private BlockingWorker(TaskletTracker taskletTracker, CountDownLatch countDownLatch) {
            this.tracker = taskletTracker;
            this.startedLatch = countDownLatch;
        }

        /* JADX WARN: Type inference failed for: r0v8, types: [com.hazelcast.internal.util.concurrent.IdleStrategy] */
        @Override // java.lang.Runnable
        public void run() {
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            Tasklet tasklet = this.tracker.tasklet;
            Thread.currentThread().setContextClassLoader(this.tracker.jobClassLoader);
            ?? r0 = TaskletExecutionService.this.idlerNonCooperative;
            MetricsImpl.Container container = MetricsImpl.container();
            try {
                try {
                    TaskletExecutionService.this.blockingWorkerCount.inc();
                    container.setContext(tasklet.getMetricsContext());
                    this.startedLatch.countDown();
                    tasklet.init();
                    long j = 0;
                    do {
                        ProgressState call = tasklet.call();
                        if (call.isMadeProgress()) {
                            j = 0;
                        } else {
                            long j2 = j + 1;
                            j = r0;
                            r0.idle(j2);
                        }
                        if (call.isDone() || this.tracker.executionTracker.executionCompletedExceptionally()) {
                            break;
                        }
                    } while (!TaskletExecutionService.this.isShutdown);
                } catch (Throwable th) {
                    TaskletExecutionService.this.logger.warning("Exception in " + tasklet, th);
                    this.tracker.executionTracker.exception(new JetException("Exception in " + tasklet + ": " + th, th));
                    TaskletExecutionService.this.blockingWorkerCount.inc(-1L);
                    container.setContext(null);
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    this.tracker.executionTracker.taskletDone();
                }
            } finally {
                TaskletExecutionService.this.blockingWorkerCount.inc(-1L);
                container.setContext(null);
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                this.tracker.executionTracker.taskletDone();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/hazelcast-5.0.jar:com/hazelcast/jet/impl/execution/TaskletExecutionService$CooperativeWorker.class */
    public final class CooperativeWorker implements Runnable {
        private static final int COOPERATIVE_LOGGING_THRESHOLD = 5;
        private boolean finestLogEnabled;
        private Thread myThread;
        private MetricsImpl.Container userMetricsContextContainer;

        @Probe(name = "iterationCount")
        private final Counter iterationCount = SwCounter.newSwCounter();
        private final ProgressTracker progressTracker = new ProgressTracker();
        private final Consumer<TaskletTracker> runTasklet = this::runTasklet;
        private final Semaphore newTaskletSemaphore = new Semaphore(0);

        @Probe(name = "taskletCount")
        private final CopyOnWriteArrayList<TaskletTracker> trackers = new CopyOnWriteArrayList<>();

        CooperativeWorker() {
        }

        /* JADX WARN: Type inference failed for: r0v4, types: [com.hazelcast.internal.util.concurrent.IdleStrategy] */
        @Override // java.lang.Runnable
        public void run() {
            this.myThread = Thread.currentThread();
            this.userMetricsContextContainer = MetricsImpl.container();
            ?? r0 = TaskletExecutionService.this.idlerCooperative;
            long j = 0;
            while (!TaskletExecutionService.this.isShutdown) {
                this.finestLogEnabled = TaskletExecutionService.this.logger.isFinestEnabled();
                this.progressTracker.reset();
                this.trackers.forEach(this.runTasklet);
                this.iterationCount.inc();
                if (this.progressTracker.isMadeProgress()) {
                    j = 0;
                } else if (this.trackers.isEmpty()) {
                    this.newTaskletSemaphore.drainPermits();
                    if (this.trackers.isEmpty() && !TaskletExecutionService.this.isShutdown) {
                        try {
                            this.newTaskletSemaphore.acquire();
                        } catch (InterruptedException e) {
                            TaskletExecutionService.this.logger.severe("Cooperative worker interrupted", e);
                            return;
                        }
                    }
                } else {
                    long j2 = j + 1;
                    j = r0;
                    r0.idle(j2);
                }
            }
            this.trackers.forEach(taskletTracker -> {
                taskletTracker.executionTracker.taskletDone();
            });
            this.trackers.clear();
        }

        private void runTasklet(TaskletTracker taskletTracker) {
            long j = 0;
            if (this.finestLogEnabled) {
                j = System.nanoTime();
            }
            try {
                try {
                    this.myThread.setContextClassLoader(taskletTracker.jobClassLoader);
                    this.userMetricsContextContainer.setContext(taskletTracker.tasklet.getMetricsContext());
                    ProgressState call = taskletTracker.tasklet.call();
                    if (call.isDone()) {
                        dismissTasklet(taskletTracker);
                    }
                    this.progressTracker.mergeWith(call);
                    this.userMetricsContextContainer.setContext(null);
                } catch (Throwable th) {
                    if (th instanceof CancellationException) {
                        TaskletExecutionService.this.logger.info("Job was cancelled by the user.");
                        taskletTracker.executionTracker.exception((CancellationException) th);
                    } else {
                        TaskletExecutionService.this.logger.warning("Exception in " + taskletTracker.tasklet, th);
                        taskletTracker.executionTracker.exception(new JetException("Exception in " + taskletTracker.tasklet + ": " + th, th));
                    }
                    this.userMetricsContextContainer.setContext(null);
                }
                if (taskletTracker.executionTracker.executionCompletedExceptionally()) {
                    dismissTasklet(taskletTracker);
                }
                if (this.finestLogEnabled) {
                    long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - j);
                    if (millis > 5) {
                        TaskletExecutionService.this.logger.finest("Cooperative tasklet call of '" + taskletTracker.tasklet + "' took more than 5 ms: " + millis + BaseUnits.MILLISECONDS);
                    }
                }
            } catch (Throwable th2) {
                this.userMetricsContextContainer.setContext(null);
                throw th2;
            }
        }

        private void dismissTasklet(TaskletTracker taskletTracker) {
            LoggingUtil.logFinest(TaskletExecutionService.this.logger, "Tasklet %s is done", taskletTracker.tasklet);
            taskletTracker.executionTracker.taskletDone();
            this.trackers.remove(taskletTracker);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/hazelcast-5.0.jar:com/hazelcast/jet/impl/execution/TaskletExecutionService$ExecutionTracker.class */
    public final class ExecutionTracker {
        private final AtomicInteger completionLatch;
        final NonCompletableFuture future = new NonCompletableFuture();
        volatile List<Future<?>> blockingFutures = Collections.emptyList();
        private final AtomicReference<Throwable> executionException = new AtomicReference<>();

        ExecutionTracker(int i, CompletableFuture<Void> completableFuture) {
            this.completionLatch = new AtomicInteger(i);
            completableFuture.whenComplete(ExceptionUtil.withTryCatch(TaskletExecutionService.this.logger, (r5, th) -> {
                if (th == null) {
                    th = new IllegalStateException("cancellationFuture must be completed exceptionally");
                }
                exception(th);
                this.blockingFutures.forEach(future -> {
                    future.cancel(false);
                });
            }));
        }

        void exception(Throwable th) {
            this.executionException.compareAndSet(null, th);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void taskletDone() {
            if (this.completionLatch.decrementAndGet() == 0) {
                Throwable th = this.executionException.get();
                if (th == null) {
                    this.future.internalComplete();
                } else {
                    this.future.internalCompleteExceptionally(th);
                }
            }
        }

        boolean executionCompletedExceptionally() {
            return this.executionException.get() != null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/hazelcast-5.0.jar:com/hazelcast/jet/impl/execution/TaskletExecutionService$TaskletTracker.class */
    public static final class TaskletTracker {
        final Tasklet tasklet;
        final ExecutionTracker executionTracker;
        final ClassLoader jobClassLoader;

        TaskletTracker(Tasklet tasklet, ExecutionTracker executionTracker, ClassLoader classLoader) {
            this.tasklet = tasklet;
            this.executionTracker = executionTracker;
            this.jobClassLoader = classLoader;
        }

        public String toString() {
            return "Tracking " + this.tasklet;
        }
    }

    public TaskletExecutionService(NodeEngineImpl nodeEngineImpl, int i, HazelcastProperties hazelcastProperties) {
        this.hzExecutionService = nodeEngineImpl.getExecutionService();
        this.hzExecutionService.register(TASKLET_INIT_CLOSE_EXECUTOR_NAME, RuntimeAvailableProcessors.get(), Integer.MAX_VALUE, ExecutorType.CACHED);
        this.hzInstanceName = nodeEngineImpl.getHazelcastInstance().getName();
        this.cooperativeWorkers = new CooperativeWorker[i];
        this.cooperativeThreadPool = new Thread[i];
        this.logger = nodeEngineImpl.getLoggingService().getLogger(TaskletExecutionService.class);
        this.idlerCooperative = createIdler(hazelcastProperties, ClusterProperty.JET_IDLE_COOPERATIVE_MIN_MICROSECONDS, ClusterProperty.JET_IDLE_COOPERATIVE_MAX_MICROSECONDS);
        this.idlerNonCooperative = createIdler(hazelcastProperties, ClusterProperty.JET_IDLE_NONCOOPERATIVE_MIN_MICROSECONDS, ClusterProperty.JET_IDLE_NONCOOPERATIVE_MAX_MICROSECONDS);
        Arrays.setAll(this.cooperativeWorkers, i2 -> {
            return new CooperativeWorker();
        });
        Arrays.setAll(this.cooperativeThreadPool, i3 -> {
            return new Thread(this.cooperativeWorkers[i3], String.format("hz.%s.jet.cooperative.thread-%d", this.hzInstanceName, Integer.valueOf(i3)));
        });
        Arrays.stream(this.cooperativeThreadPool).forEach((v0) -> {
            v0.start();
        });
        MetricsRegistry metricsRegistry = nodeEngineImpl.getMetricsRegistry();
        MetricDescriptor withTag = metricsRegistry.newMetricDescriptor().withTag(MetricTags.MODULE, "jet");
        metricsRegistry.registerStaticMetrics(withTag, (MetricDescriptor) this);
        for (int i4 = 0; i4 < this.cooperativeWorkers.length; i4++) {
            metricsRegistry.registerStaticMetrics(withTag.withDiscriminator(MetricTags.COOPERATIVE_WORKER, String.valueOf(i4)), (MetricDescriptor) this.cooperativeWorkers[i4]);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> beginExecute(@Nonnull List<? extends Tasklet> list, @Nonnull CompletableFuture<Void> completableFuture, @Nonnull ClassLoader classLoader) {
        ExecutionTracker executionTracker = new ExecutionTracker(list.size(), completableFuture);
        try {
            Map map = (Map) list.stream().collect(Collectors.partitioningBy(tasklet -> {
                tasklet.getClass();
                return ((Boolean) Util.doWithClassLoader(classLoader, tasklet::isCooperative)).booleanValue();
            }));
            submitCooperativeTasklets(executionTracker, classLoader, (List) map.get(true));
            submitBlockingTasklets(executionTracker, classLoader, (List) map.get(false));
        } catch (Throwable th) {
            executionTracker.future.internalCompleteExceptionally(th);
        }
        return executionTracker.future;
    }

    public void shutdown() {
        this.isShutdown = true;
        Arrays.stream(this.cooperativeWorkers).forEach(cooperativeWorker -> {
            cooperativeWorker.newTaskletSemaphore.release();
        });
        this.blockingTaskletExecutor.shutdownNow();
        this.hzExecutionService.shutdownExecutor(TASKLET_INIT_CLOSE_EXECUTOR_NAME);
    }

    private void submitBlockingTasklets(ExecutionTracker executionTracker, ClassLoader classLoader, List<Tasklet> list) {
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        Stream<R> map = list.stream().map(tasklet -> {
            return new BlockingWorker(new TaskletTracker(tasklet, executionTracker, classLoader), countDownLatch);
        });
        ExecutorService executorService = this.blockingTaskletExecutor;
        executorService.getClass();
        executionTracker.blockingFutures = (List) map.map((v1) -> {
            return r2.submit(v1);
        }).collect(Collectors.toList());
        countDownLatch.getClass();
        Util.uncheckRun(countDownLatch::await);
    }

    private void submitCooperativeTasklets(ExecutionTracker executionTracker, ClassLoader classLoader, List<Tasklet> list) {
        List[] listArr = new List[this.cooperativeWorkers.length];
        Arrays.setAll(listArr, i -> {
            return new ArrayList();
        });
        awaitAll((List) list.stream().map(tasklet -> {
            return this.hzExecutionService.submit(TASKLET_INIT_CLOSE_EXECUTOR_NAME, () -> {
                tasklet.getClass();
                Util.doWithClassLoader(classLoader, tasklet::init);
            });
        }).collect(Collectors.toList()));
        synchronized (this.lock) {
            Iterator<Tasklet> it = list.iterator();
            while (it.hasNext()) {
                listArr[this.cooperativeThreadIndex].add(new TaskletTracker(it.next(), executionTracker, classLoader));
                this.cooperativeThreadIndex = (this.cooperativeThreadIndex + 1) % listArr.length;
            }
        }
        for (int i2 = 0; i2 < listArr.length; i2++) {
            this.cooperativeWorkers[i2].trackers.addAll(listArr[i2]);
            this.cooperativeWorkers[i2].newTaskletSemaphore.release();
        }
        Arrays.stream(this.cooperativeThreadPool).forEach(LockSupport::unpark);
    }

    private void awaitAll(List<? extends Future<?>> list) {
        Throwable th = null;
        int i = 0;
        Iterator<? extends Future<?>> it = list.iterator();
        while (it.hasNext()) {
            try {
                it.next().get();
            } catch (InterruptedException | ExecutionException e) {
                Throwable peel = ExceptionUtil.peel(e);
                this.logger.severe("Tasklet initialization failed", peel);
                th = th != null ? th : peel;
                i++;
            }
        }
        if (th != null) {
            throw new JetException(String.format("%,d of %,d tasklets failed to initialize. One of the failures is attached as the cause and its summary is %s", Integer.valueOf(i), Integer.valueOf(list.size()), th), th);
        }
    }

    public void awaitWorkerTermination() {
        if (!$assertionsDisabled && !this.isShutdown) {
            throw new AssertionError("Not shut down");
        }
        while (!this.blockingTaskletExecutor.awaitTermination(1L, TimeUnit.MINUTES)) {
            try {
                this.logger.warning("Blocking tasklet executor did not terminate in 1 minute");
            } catch (InterruptedException e) {
                ExceptionUtil.sneakyThrow(e);
                return;
            }
        }
        for (Thread thread : this.cooperativeThreadPool) {
            thread.join();
        }
    }

    private BackoffIdleStrategy createIdler(HazelcastProperties hazelcastProperties, HazelcastProperty hazelcastProperty, HazelcastProperty hazelcastProperty2) {
        int integer = hazelcastProperties.getInteger(hazelcastProperty);
        int integer2 = hazelcastProperties.getInteger(hazelcastProperty2);
        String name = hazelcastProperty.getName();
        String name2 = hazelcastProperty2.getName();
        if (integer >= integer2) {
            this.logger.warning(String.format("The property %s must be set less than or equal to %s but current values are: %s=%d, %s=%d. Using minimum value as maximum instead.", name, name2, name, Integer.valueOf(integer), name2, Integer.valueOf(integer2)));
            integer2 = integer;
        }
        LoggingUtil.logFinest(this.logger, "Creating idler with %s=%dµs,%s=%dµs", name, Integer.valueOf(integer), name2, Integer.valueOf(integer2));
        return new BackoffIdleStrategy(0L, 0L, hazelcastProperty.getTimeUnit().toNanos(integer), hazelcastProperty2.getTimeUnit().toNanos(integer2));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 3237136:
                if (implMethodName.equals("init")) {
                    z = false;
                    break;
                }
                break;
            case 93223254:
                if (implMethodName.equals("await")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/RunnableEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("runEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/execution/Tasklet") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    Tasklet tasklet = (Tasklet) serializedLambda.getCapturedArg(0);
                    return tasklet::init;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/RunnableEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("runEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("java/util/concurrent/CountDownLatch") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    CountDownLatch countDownLatch = (CountDownLatch) serializedLambda.getCapturedArg(0);
                    return countDownLatch::await;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        $assertionsDisabled = !TaskletExecutionService.class.desiredAssertionStatus();
    }
}
