/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.server.worker.mesos;

import io.mantisrx.server.core.BaseService;
import io.mantisrx.server.core.WrappedExecuteStageRequest;
import io.mantisrx.server.worker.VirtualMachineWorkerService;
import io.mantisrx.server.worker.mesos.MesosExecutorCallbackHandler;
import io.mantisrx.server.worker.mesos.VirtualMachineTaskStatus;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.mesos.Executor;
import org.apache.mesos.MesosExecutorDriver;
import org.apache.mesos.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.functions.Action1;

public class VirualMachineWorkerServiceMesosImpl
extends BaseService
implements VirtualMachineWorkerService {
    private static final Logger logger = LoggerFactory.getLogger(VirualMachineWorkerServiceMesosImpl.class);
    private MesosExecutorDriver mesosDriver;
    private ExecutorService executor;
    private Observer<WrappedExecuteStageRequest> executeStageRequestObserver;
    private Observable<VirtualMachineTaskStatus> vmTaskStatusObservable;

    public VirualMachineWorkerServiceMesosImpl(Observer<WrappedExecuteStageRequest> executeStageRequestObserver, Observable<VirtualMachineTaskStatus> vmTaskStatusObservable) {
        this.executeStageRequestObserver = executeStageRequestObserver;
        this.vmTaskStatusObservable = vmTaskStatusObservable;
        this.executor = Executors.newSingleThreadExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "vm_worker_mesos_executor_thread");
                t.setDaemon(true);
                return t;
            }
        });
    }

    public void start() {
        logger.info("Registering Mantis Worker with Mesos executor callbacks");
        this.mesosDriver = new MesosExecutorDriver((Executor)new MesosExecutorCallbackHandler(this.executeStageRequestObserver));
        logger.info("launch driver on background thread");
        this.executor.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    VirualMachineWorkerServiceMesosImpl.this.mesosDriver.run();
                }
                catch (Exception e) {
                    logger.error("Failed to register Mantis Worker with Mesos executor callbacks", (Throwable)e);
                }
            }
        });
        logger.info("subscribe to vm task updates on current thread");
        this.vmTaskStatusObservable.subscribe((Action1)new Action1<VirtualMachineTaskStatus>(){

            public void call(VirtualMachineTaskStatus vmTaskStatus) {
                VirtualMachineTaskStatus.TYPE type = vmTaskStatus.getType();
                if (type == VirtualMachineTaskStatus.TYPE.COMPLETED) {
                    Protos.Status status = VirualMachineWorkerServiceMesosImpl.this.mesosDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(Protos.TaskID.newBuilder().setValue(vmTaskStatus.getTaskId()).build()).setState(Protos.TaskState.TASK_FINISHED).build());
                    logger.info("Sent COMPLETED state to mesos, driver status=" + status);
                } else if (type == VirtualMachineTaskStatus.TYPE.STARTED) {
                    Protos.Status status = VirualMachineWorkerServiceMesosImpl.this.mesosDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(Protos.TaskID.newBuilder().setValue(vmTaskStatus.getTaskId()).build()).setState(Protos.TaskState.TASK_RUNNING).build());
                    logger.info("Sent RUNNING state to mesos, driver status=" + status);
                }
            }
        });
    }

    public void shutdown() {
        logger.info("Unregistering Mantis Worker with Mesos executor callbacks");
        this.mesosDriver.stop();
        this.executor.shutdown();
    }

    public void enterActiveMode() {
    }
}

