/*
 * Decompiled with CFR 0.152.
 */
package io.littlehorse.sdk.worker.internal;

import io.grpc.stub.StreamObserver;
import io.littlehorse.sdk.common.LHLibUtil;
import io.littlehorse.sdk.common.config.LHConfig;
import io.littlehorse.sdk.common.exception.InputVarSubstitutionError;
import io.littlehorse.sdk.common.exception.LHSerdeError;
import io.littlehorse.sdk.common.proto.LHHostInfo;
import io.littlehorse.sdk.common.proto.LHPublicApiGrpc;
import io.littlehorse.sdk.common.proto.RegisterTaskWorkerRequest;
import io.littlehorse.sdk.common.proto.RegisterTaskWorkerResponse;
import io.littlehorse.sdk.common.proto.ReportTaskRun;
import io.littlehorse.sdk.common.proto.ScheduledTask;
import io.littlehorse.sdk.common.proto.TaskDef;
import io.littlehorse.sdk.common.proto.TaskStatus;
import io.littlehorse.sdk.common.proto.VariableType;
import io.littlehorse.sdk.common.proto.VariableValue;
import io.littlehorse.sdk.worker.WorkerContext;
import io.littlehorse.sdk.worker.internal.LHServerConnection;
import io.littlehorse.sdk.worker.internal.util.ReportTaskObserver;
import io.littlehorse.sdk.worker.internal.util.VariableMapping;
import java.io.Closeable;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LHServerConnectionManager
implements StreamObserver<RegisterTaskWorkerResponse>,
Closeable {
    private static final Logger log = LoggerFactory.getLogger(LHServerConnectionManager.class);
    public Object executable;
    public Method taskMethod;
    public LHConfig config;
    public List<VariableMapping> mappings;
    public TaskDef taskDef;
    private boolean running;
    private List<LHServerConnection> runningConnections;
    private LHPublicApiGrpc.LHPublicApiStub bootstrapStub;
    private ExecutorService threadPool;
    private Semaphore workerSemaphore;
    private Thread rebalanceThread;
    private static final int TOTAL_RETRIES = 5;

    public LHServerConnectionManager(Method taskMethod, TaskDef taskDef, LHConfig config, List<VariableMapping> mappings, Object executable) throws IOException {
        this.executable = executable;
        this.taskMethod = taskMethod;
        taskMethod.setAccessible(true);
        this.config = config;
        this.mappings = mappings;
        this.taskDef = taskDef;
        this.bootstrapStub = config.getAsyncStub();
        this.running = false;
        this.runningConnections = new ArrayList<LHServerConnection>();
        this.workerSemaphore = new Semaphore(config.getWorkerThreads());
        this.threadPool = Executors.newFixedThreadPool(config.getWorkerThreads());
        this.rebalanceThread = new Thread(() -> {
            while (this.running) {
                this.doHeartbeat();
                try {
                    Thread.sleep(5000L);
                }
                catch (Exception exception) {}
            }
        });
    }

    public void submitTaskForExecution(ScheduledTask scheduledTask, LHPublicApiGrpc.LHPublicApiStub specificStub) {
        try {
            this.workerSemaphore.acquire();
        }
        catch (InterruptedException exn) {
            throw new RuntimeException(exn);
        }
        this.threadPool.submit(() -> this.doTask(scheduledTask, specificStub));
    }

    private void doTask(ScheduledTask scheduledTask, LHPublicApiGrpc.LHPublicApiStub specificStub) {
        ReportTaskRun result = this.executeTask(scheduledTask, LHLibUtil.fromProtoTs(scheduledTask.getCreatedAt()));
        this.workerSemaphore.release();
        String wfRunId = LHLibUtil.getWfRunId(scheduledTask.getSource());
        try {
            log.debug("Going to report task for wfRun {}", (Object)wfRunId);
            specificStub.reportTask(result, new ReportTaskObserver(this, result, 5));
            log.debug("Successfully contacted LHServer on reportTask for wfRun {}", (Object)wfRunId);
        }
        catch (Exception exn) {
            log.warn("Failed to report task for wfRun {}: {}", (Object)wfRunId, (Object)exn.getMessage());
            this.retryReportTask(result, 5);
        }
    }

    public void onNext(RegisterTaskWorkerResponse next) {
        for (LHHostInfo host : next.getYourHostsList()) {
            if (this.isAlreadyRunning(host)) continue;
            try {
                this.runningConnections.add(new LHServerConnection(this, host));
                log.info("Adding connection to: {}:{} for taskdef {}", new Object[]{host.getHost(), host.getPort(), this.taskDef.getName()});
            }
            catch (IOException exn) {
                log.error("Yikes, caught IOException in onNext", (Throwable)exn);
                throw new RuntimeException(exn);
            }
        }
        for (int i = this.runningConnections.size() - 1; i >= 0; --i) {
            LHServerConnection runningThread = this.runningConnections.get(i);
            if (this.shouldBeRunning(runningThread, next.getYourHostsList())) continue;
            log.info("Stopping worker thread for host {}:{}", (Object)runningThread.getHostInfo().getHost(), (Object)runningThread.getHostInfo().getPort());
            runningThread.close();
            this.runningConnections.remove(i);
        }
    }

    private boolean shouldBeRunning(LHServerConnection ssc, List<LHHostInfo> hosts) {
        for (LHHostInfo h : hosts) {
            if (!ssc.isSameAs(h)) continue;
            return true;
        }
        return false;
    }

    private boolean isAlreadyRunning(LHHostInfo host) {
        for (LHServerConnection ssc : this.runningConnections) {
            if (!ssc.isSameAs(host)) continue;
            return true;
        }
        return false;
    }

    public void onError(Throwable t) {
        log.error("Failed contacting bootstrap host {}:{}", new Object[]{this.config.getApiBootstrapHost(), this.config.getApiBootstrapPort(), t});
        this.running = false;
    }

    public void onCompleted() {
    }

    private void doHeartbeat() {
        this.bootstrapStub.registerTaskWorker(RegisterTaskWorkerRequest.newBuilder().setTaskDefName(this.taskDef.getName()).setClientId(this.config.getClientId()).setListenerName(this.config.getConnectListener()).build(), this);
    }

    public void retryReportTask(ReportTaskRun result, int retriesLeft) {
        this.threadPool.submit(() -> {
            log.debug("Retrying reportTask rpc on taskRun {}", (Object)LHLibUtil.taskRunIdToString(result.getTaskRunId()));
            try {
                Thread.sleep(500L);
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.bootstrapStub.reportTask(result, new ReportTaskObserver(this, result, retriesLeft - 1));
        });
    }

    public void onConnectionClosed(LHServerConnection connection) {
        this.runningConnections.removeIf(thing -> thing == connection);
    }

    public void start() {
        this.running = true;
        this.rebalanceThread.start();
    }

    @Override
    public void close() {
        this.running = false;
    }

    private ReportTaskRun executeTask(ScheduledTask scheduledTask, Date scheduleTime) {
        ReportTaskRun.Builder taskResult = ReportTaskRun.newBuilder().setTaskRunId(scheduledTask.getTaskRunId()).setAttemptNumber(scheduledTask.getAttemptNumber());
        WorkerContext wc = new WorkerContext(scheduledTask, scheduleTime);
        try {
            Object rawResult = this.invoke(scheduledTask, wc);
            VariableValue serialized = LHLibUtil.objToVarVal(rawResult);
            taskResult.setOutput(serialized.toBuilder()).setStatus(TaskStatus.TASK_SUCCESS);
            if (wc.getLogOutput() != null) {
                taskResult.setLogOutput(VariableValue.newBuilder().setStr(wc.getLogOutput()));
            }
        }
        catch (InputVarSubstitutionError exn) {
            log.error("Failed calculating task input variables", (Throwable)exn);
            taskResult.setLogOutput(this.exnToVarVal(exn, wc));
            taskResult.setStatus(TaskStatus.TASK_INPUT_VAR_SUB_ERROR);
        }
        catch (LHSerdeError exn) {
            log.error("Failed serializing Task Output", (Throwable)exn);
            taskResult.setLogOutput(this.exnToVarVal(exn, wc));
            taskResult.setStatus(TaskStatus.TASK_OUTPUT_SERIALIZING_ERROR);
        }
        catch (InvocationTargetException exn) {
            log.error("Task Method threw an exception", exn.getCause());
            taskResult.setLogOutput(this.exnToVarVal(exn.getCause(), wc));
            taskResult.setStatus(TaskStatus.TASK_FAILED);
        }
        catch (Exception exn) {
            log.error("Unexpected exception during task execution", (Throwable)exn);
            taskResult.setLogOutput(this.exnToVarVal(exn, wc));
            taskResult.setStatus(TaskStatus.TASK_FAILED);
        }
        taskResult.setTime(LHLibUtil.fromDate(new Date()));
        return taskResult.build();
    }

    private Object invoke(ScheduledTask scheduledTask, WorkerContext context) throws InputVarSubstitutionError, Exception {
        ArrayList<Object> inputs = new ArrayList<Object>();
        for (VariableMapping mapping : this.mappings) {
            inputs.add(mapping.assign(scheduledTask, context));
        }
        return this.taskMethod.invoke(this.executable, inputs.toArray());
    }

    private VariableValue.Builder exnToVarVal(Throwable exn, WorkerContext ctx) {
        StringWriter sw = new StringWriter();
        PrintWriter pw = new PrintWriter(sw);
        exn.printStackTrace(pw);
        Object output = sw.toString();
        if (ctx.getLogOutput() != null) {
            output = (String)output + "\n\n\n\n" + ctx.getLogOutput();
        }
        return VariableValue.newBuilder().setStr((String)output).setType(VariableType.STR);
    }

    public int getNumThreads() {
        return this.config.getWorkerThreads();
    }
}

