package com.netflix.conductor.client.automator;

import com.netflix.appinfo.InstanceInfo;
import com.netflix.conductor.client.config.PropertyFactory;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.telemetry.MetricsContainer;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.discovery.EurekaClient;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Spectator;
import com.netflix.spectator.api.patterns.ThreadPoolMonitor;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.Thread;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netflix/conductor/client/automator/TaskPollExecutor.class */
class TaskPollExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskPollExecutor.class);
    private static final Registry REGISTRY = Spectator.globalRegistry();
    private final EurekaClient eurekaClient;
    private final TaskClient taskClient;
    private final int updateRetryCount;
    private final ExecutorService executorService;
    private final Map<String, String> taskToDomain;
    private static final String DOMAIN = "domain";
    private static final String OVERRIDE_DISCOVERY = "pollOutOfDiscovery";
    private static final String ALL_WORKERS = "all";
    private static final int LEASE_EXTEND_RETRY_COUNT = 3;
    private static final double LEASE_EXTEND_DURATION_FACTOR = 0.8d;
    private ScheduledExecutorService leaseExtendExecutorService;
    Map<String, ScheduledFuture<?>> leaseExtendMap = new HashMap();
    private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = (thread, th) -> {
        MetricsContainer.incrementUncaughtExceptionCount();
        LOGGER.error("Uncaught exception. Thread {} will exit now", thread, th);
    };
    private final Map<String, PollingSemaphore> pollingSemaphoreMap = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public TaskPollExecutor(EurekaClient eurekaClient, TaskClient taskClient, int i, Map<String, String> map, String str, Map<String, Integer> map2) {
        this.eurekaClient = eurekaClient;
        this.taskClient = taskClient;
        this.updateRetryCount = i;
        this.taskToDomain = map;
        int i2 = 0;
        for (Map.Entry<String, Integer> entry : map2.entrySet()) {
            String key = entry.getKey();
            int intValue = entry.getValue().intValue();
            i2 += intValue;
            this.pollingSemaphoreMap.put(key, new PollingSemaphore(intValue));
        }
        LOGGER.info("Initialized the TaskPollExecutor with {} threads", Integer.valueOf(i2));
        this.executorService = Executors.newFixedThreadPool(i2, new BasicThreadFactory.Builder().namingPattern(str).uncaughtExceptionHandler(this.uncaughtExceptionHandler).build());
        ThreadPoolMonitor.attach(REGISTRY, (ThreadPoolExecutor) this.executorService, str);
        LOGGER.info("Initialized the task lease extend executor");
        this.leaseExtendExecutorService = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("workflow-lease-extend-%d").daemon(true).uncaughtExceptionHandler(this.uncaughtExceptionHandler).build());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void pollAndExecute(Worker worker) {
        Boolean bool = (Boolean) Optional.ofNullable(PropertyFactory.getBoolean(worker.getTaskDefName(), OVERRIDE_DISCOVERY, null)).orElseGet(() -> {
            return PropertyFactory.getBoolean(ALL_WORKERS, OVERRIDE_DISCOVERY, false);
        });
        if (this.eurekaClient != null && !this.eurekaClient.getInstanceRemoteStatus().equals(InstanceInfo.InstanceStatus.UP) && !bool.booleanValue()) {
            LOGGER.debug("Instance is NOT UP in discovery - will not poll");
            return;
        }
        if (worker.paused()) {
            MetricsContainer.incrementTaskPausedCount(worker.getTaskDefName());
            LOGGER.debug("Worker {} has been paused. Not polling anymore!", worker.getClass());
            return;
        }
        String taskDefName = worker.getTaskDefName();
        PollingSemaphore pollingSemaphore = getPollingSemaphore(taskDefName);
        int availableSlots = pollingSemaphore.availableSlots();
        if (availableSlots <= 0 || !pollingSemaphore.acquireSlots(availableSlots)) {
            return;
        }
        int i = 0;
        try {
            String str = (String) Optional.ofNullable(PropertyFactory.getString(taskDefName, DOMAIN, null)).orElseGet(() -> {
                return (String) Optional.ofNullable(PropertyFactory.getString(ALL_WORKERS, DOMAIN, null)).orElse(this.taskToDomain.get(taskDefName));
            });
            LOGGER.debug("Polling task of type: {} in domain: '{}'", taskDefName, str);
            List<Task> list = (List) MetricsContainer.getPollTimer(taskDefName).record(() -> {
                return this.taskClient.batchPollTasksInDomain(taskDefName, str, worker.getIdentity(), availableSlots, worker.getBatchPollTimeoutInMS());
            });
            i = list.size();
            for (Task task : list) {
                if (Objects.nonNull(task) && StringUtils.isNotBlank(task.getTaskId())) {
                    MetricsContainer.incrementTaskPollCount(taskDefName, 1);
                    LOGGER.debug("Polled task: {} of type: {} in domain: '{}', from worker: {}", new Object[]{task.getTaskId(), taskDefName, str, worker.getIdentity()});
                    CompletableFuture<Task> supplyAsync = CompletableFuture.supplyAsync(() -> {
                        return processTask(task, worker, pollingSemaphore);
                    }, this.executorService);
                    if (task.getResponseTimeoutSeconds() > 0 && worker.leaseExtendEnabled()) {
                        this.leaseExtendMap.put(task.getTaskId(), this.leaseExtendExecutorService.scheduleWithFixedDelay(extendLease(task, supplyAsync), Math.round(task.getResponseTimeoutSeconds() * LEASE_EXTEND_DURATION_FACTOR), Math.round(task.getResponseTimeoutSeconds() * LEASE_EXTEND_DURATION_FACTOR), TimeUnit.SECONDS));
                    }
                    supplyAsync.whenComplete(this::finalizeTask);
                } else {
                    pollingSemaphore.complete(1);
                }
            }
        } catch (Exception e) {
            MetricsContainer.incrementTaskPollErrorCount(worker.getTaskDefName(), e);
            LOGGER.error("Error when polling for tasks", e);
        }
        pollingSemaphore.complete(availableSlots - i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown(int i) {
        shutdownAndAwaitTermination(this.executorService, i);
        shutdownAndAwaitTermination(this.leaseExtendExecutorService, i);
        this.leaseExtendMap.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdownAndAwaitTermination(ExecutorService executorService, int i) {
        try {
            executorService.shutdown();
            if (executorService.awaitTermination(i, TimeUnit.SECONDS)) {
                LOGGER.debug("tasks completed, shutting down");
            } else {
                LOGGER.warn(String.format("forcing shutdown after waiting for %s second", Integer.valueOf(i)));
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            LOGGER.warn("shutdown interrupted, invoking shutdownNow");
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    private Task processTask(Task task, Worker worker, PollingSemaphore pollingSemaphore) {
        LOGGER.debug("Executing task: {} of type: {} in worker: {} at {}", new Object[]{task.getTaskId(), task.getTaskDefName(), worker.getClass().getSimpleName(), worker.getIdentity()});
        try {
            try {
                executeTask(worker, task);
                pollingSemaphore.complete(1);
            } catch (Throwable th) {
                task.setStatus(Task.Status.FAILED);
                handleException(th, new TaskResult(task), worker, task);
                pollingSemaphore.complete(1);
            }
            return task;
        } catch (Throwable th2) {
            pollingSemaphore.complete(1);
            throw th2;
        }
    }

    private void executeTask(Worker worker, Task task) {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        TaskResult taskResult = null;
        try {
            try {
                LOGGER.debug("Executing task: {} in worker: {} at {}", new Object[]{task.getTaskId(), worker.getClass().getSimpleName(), worker.getIdentity()});
                taskResult = worker.execute(task);
                taskResult.setWorkflowInstanceId(task.getWorkflowInstanceId());
                taskResult.setTaskId(task.getTaskId());
                taskResult.setWorkerId(worker.getIdentity());
                stopWatch.stop();
                MetricsContainer.getExecutionTimer(worker.getTaskDefName()).record(stopWatch.getTime(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                LOGGER.error("Unable to execute task: {} of type: {}", new Object[]{task.getTaskId(), task.getTaskDefName(), e});
                if (taskResult == null) {
                    task.setStatus(Task.Status.FAILED);
                    taskResult = new TaskResult(task);
                }
                handleException(e, taskResult, worker, task);
                stopWatch.stop();
                MetricsContainer.getExecutionTimer(worker.getTaskDefName()).record(stopWatch.getTime(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
            }
            LOGGER.debug("Task: {} executed by worker: {} at {} with status: {}", new Object[]{task.getTaskId(), worker.getClass().getSimpleName(), worker.getIdentity(), taskResult.getStatus()});
            updateTaskResult(this.updateRetryCount, task, taskResult, worker);
        } catch (Throwable th) {
            stopWatch.stop();
            MetricsContainer.getExecutionTimer(worker.getTaskDefName()).record(stopWatch.getTime(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
            throw th;
        }
    }

    private void finalizeTask(Task task, Throwable th) {
        if (th != null) {
            LOGGER.error("Error processing task: {} of type: {}", new Object[]{task.getTaskId(), task.getTaskType(), th});
            MetricsContainer.incrementTaskExecutionErrorCount(task.getTaskType(), th);
            return;
        }
        LOGGER.debug("Task:{} of type:{} finished processing with status:{}", new Object[]{task.getTaskId(), task.getTaskDefName(), task.getStatus()});
        String taskId = task.getTaskId();
        ScheduledFuture<?> scheduledFuture = this.leaseExtendMap.get(taskId);
        if (scheduledFuture != null) {
            scheduledFuture.cancel(true);
            this.leaseExtendMap.remove(taskId);
        }
    }

    private void updateTaskResult(int i, Task task, TaskResult taskResult, Worker worker) {
        try {
            Optional optional = (Optional) retryOperation(taskResult2 -> {
                return upload(taskResult2, task.getTaskType());
            }, i, taskResult, "evaluateAndUploadLargePayload");
            if (optional.isPresent()) {
                taskResult.setExternalOutputPayloadStoragePath((String) optional.get());
                taskResult.setOutputData((Map) null);
            }
            retryOperation(taskResult3 -> {
                this.taskClient.updateTask(taskResult3);
                return null;
            }, i, taskResult, "updateTask");
        } catch (Exception e) {
            worker.onErrorUpdate(task);
            MetricsContainer.incrementTaskUpdateErrorCount(worker.getTaskDefName(), e);
            LOGGER.error(String.format("Failed to update result: %s for task: %s in worker: %s", taskResult.toString(), task.getTaskDefName(), worker.getIdentity()), e);
        }
    }

    private Optional<String> upload(TaskResult taskResult, String str) {
        try {
            return this.taskClient.evaluateAndUploadLargePayload(taskResult.getOutputData(), str);
        } catch (IllegalArgumentException e) {
            taskResult.setReasonForIncompletion(e.getMessage());
            taskResult.setOutputData((Map) null);
            taskResult.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);
            return Optional.empty();
        }
    }

    private <T, R> R retryOperation(Function<T, R> function, int i, T t, String str) {
        int i2 = 0;
        while (i2 < i) {
            try {
                return function.apply(t);
            } catch (Exception e) {
                i2++;
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e2) {
                    LOGGER.error("Retry interrupted", e2);
                }
            }
        }
        throw new RuntimeException("Exhausted retries performing " + str);
    }

    private void handleException(Throwable th, TaskResult taskResult, Worker worker, Task task) {
        LOGGER.error(String.format("Error while executing task %s", task.toString()), th);
        MetricsContainer.incrementTaskExecutionErrorCount(worker.getTaskDefName(), th);
        taskResult.setStatus(TaskResult.Status.FAILED);
        taskResult.setReasonForIncompletion("Error while executing the task: " + th);
        StringWriter stringWriter = new StringWriter();
        th.printStackTrace(new PrintWriter(stringWriter));
        taskResult.log(stringWriter.toString());
        updateTaskResult(this.updateRetryCount, task, taskResult, worker);
    }

    private PollingSemaphore getPollingSemaphore(String str) {
        return this.pollingSemaphoreMap.get(str);
    }

    private Runnable extendLease(Task task, CompletableFuture<Task> completableFuture) {
        return () -> {
            if (completableFuture.isDone()) {
                LOGGER.warn("Task processing for {} completed, but its lease extend was not cancelled", task.getTaskId());
                return;
            }
            LOGGER.info("Attempting to extend lease for {}", task.getTaskId());
            try {
                TaskResult taskResult = new TaskResult(task);
                taskResult.setExtendLease(true);
                retryOperation(taskResult2 -> {
                    this.taskClient.updateTask(taskResult2);
                    return null;
                }, LEASE_EXTEND_RETRY_COUNT, taskResult, "extend lease");
                MetricsContainer.incrementTaskLeaseExtendCount(task.getTaskDefName(), 1);
            } catch (Exception e) {
                MetricsContainer.incrementTaskLeaseExtendErrorCount(task.getTaskDefName(), e);
                LOGGER.error("Failed to extend lease for {}", task.getTaskId(), e);
            }
        };
    }
}
