/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.server.worker;

import io.mantisrx.common.JsonSerializer;
import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.loader.RuntimeTask;
import io.mantisrx.runtime.loader.SinkSubscriptionStateHandler;
import io.mantisrx.runtime.loader.config.MetricsCollector;
import io.mantisrx.runtime.loader.config.WorkerConfiguration;
import io.mantisrx.runtime.loader.config.WorkerConfigurationUtils;
import io.mantisrx.runtime.loader.config.WorkerConfigurationWritable;
import io.mantisrx.server.agent.metrics.cgroups.CgroupsMetricsCollector;
import io.mantisrx.server.core.CoreConfiguration;
import io.mantisrx.server.core.ExecuteStageRequest;
import io.mantisrx.server.core.Service;
import io.mantisrx.server.core.Status;
import io.mantisrx.server.core.WrappedExecuteStageRequest;
import io.mantisrx.server.core.metrics.MetricsFactory;
import io.mantisrx.server.master.client.HighAvailabilityServices;
import io.mantisrx.server.master.client.HighAvailabilityServicesUtil;
import io.mantisrx.server.master.client.MantisMasterGateway;
import io.mantisrx.server.master.client.TaskStatusUpdateHandler;
import io.mantisrx.server.worker.ExecuteStageRequestService;
import io.mantisrx.server.worker.WorkerExecutionOperationsNetworkStage;
import io.mantisrx.server.worker.client.WorkerMetricsClient;
import io.mantisrx.server.worker.mesos.VirtualMachineTaskStatus;
import io.mantisrx.shaded.com.google.common.util.concurrent.AbstractIdleService;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import org.apache.flink.util.UserCodeClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

public class RuntimeTaskImpl
extends AbstractIdleService
implements RuntimeTask {
    private static final Logger log = LoggerFactory.getLogger(RuntimeTaskImpl.class);
    private WrappedExecuteStageRequest wrappedExecuteStageRequest;
    private WorkerConfiguration config;
    private final List<Service> mantisServices = new ArrayList<Service>();
    private HighAvailabilityServices highAvailabilityServices;
    private TaskStatusUpdateHandler taskStatusUpdateHandler;
    private MantisMasterGateway masterMonitor;
    private UserCodeClassLoader userCodeClassLoader;
    private SinkSubscriptionStateHandler.Factory sinkSubscriptionStateHandlerFactory;
    private final PublishSubject<Observable<Status>> tasksStatusSubject;
    private final PublishSubject<VirtualMachineTaskStatus> vmTaskStatusSubject = PublishSubject.create();
    private Optional<Job> mantisJob = Optional.empty();
    private ExecuteStageRequest executeStageRequest;

    public RuntimeTaskImpl() {
        this.tasksStatusSubject = PublishSubject.create();
    }

    public RuntimeTaskImpl(PublishSubject<Observable<Status>> tasksStatusSubject) {
        this.tasksStatusSubject = tasksStatusSubject;
    }

    public void initialize(String executeStageRequestString, String workerConfigurationString, UserCodeClassLoader userCodeClassLoader) {
        try {
            log.info("Creating runtimeTaskImpl.");
            log.info("runtimeTaskImpl workerConfigurationString: {}", (Object)workerConfigurationString);
            log.info("runtimeTaskImpl executeStageRequestString: {}", (Object)executeStageRequestString);
            JsonSerializer ser = new JsonSerializer();
            WorkerConfigurationWritable configWritable = WorkerConfigurationUtils.stringToWorkerConfiguration((String)workerConfigurationString);
            this.config = configWritable;
            ExecuteStageRequest executeStageRequest = (ExecuteStageRequest)ser.fromJSON(executeStageRequestString, ExecuteStageRequest.class);
            this.wrappedExecuteStageRequest = new WrappedExecuteStageRequest(PublishSubject.create(), executeStageRequest);
            log.info("Picking Cgroups metrics collector.");
            configWritable.setMetricsCollector((MetricsCollector)CgroupsMetricsCollector.valueOf((Properties)System.getProperties()));
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        this.highAvailabilityServices = HighAvailabilityServicesUtil.createHAServices((CoreConfiguration)this.config);
        this.executeStageRequest = this.wrappedExecuteStageRequest.getRequest();
        this.masterMonitor = this.highAvailabilityServices.getMasterClientApi();
        this.userCodeClassLoader = userCodeClassLoader;
        this.sinkSubscriptionStateHandlerFactory = SinkSubscriptionStateHandler.Factory.forEphemeralJobsThatNeedToBeKilledInAbsenceOfSubscriber((MantisMasterGateway)this.highAvailabilityServices.getMasterClientApi(), (Clock)Clock.systemDefaultZone());
        this.taskStatusUpdateHandler = TaskStatusUpdateHandler.forReportingToGateway((MantisMasterGateway)this.masterMonitor);
        this.getStatus().observeOn(Schedulers.io()).subscribe(status -> this.taskStatusUpdateHandler.onStatusUpdate(status));
    }

    protected void initialize(WrappedExecuteStageRequest wrappedExecuteStageRequest, WorkerConfiguration config, MantisMasterGateway masterMonitor, UserCodeClassLoader userCodeClassLoader, SinkSubscriptionStateHandler.Factory sinkSubscriptionStateHandlerFactory) {
        log.info("initialize RuntimeTaskImpl on injected ExecuteStageRequest: {}", (Object)wrappedExecuteStageRequest.getRequest());
        this.wrappedExecuteStageRequest = wrappedExecuteStageRequest;
        this.executeStageRequest = wrappedExecuteStageRequest.getRequest();
        this.config = config;
        this.masterMonitor = masterMonitor;
        this.userCodeClassLoader = userCodeClassLoader;
        this.sinkSubscriptionStateHandlerFactory = sinkSubscriptionStateHandlerFactory;
        this.taskStatusUpdateHandler = TaskStatusUpdateHandler.forReportingToGateway((MantisMasterGateway)masterMonitor);
        this.getStatus().observeOn(Schedulers.io()).subscribe(status -> this.taskStatusUpdateHandler.onStatusUpdate(status));
    }

    public void setJob(Optional<Job> job) {
        this.mantisJob = job;
    }

    protected void startUp() throws Exception {
        try {
            log.info("Starting current task {}", (Object)this);
            if (this.highAvailabilityServices != null && !this.highAvailabilityServices.isRunning()) {
                this.highAvailabilityServices.startAsync().awaitRunning();
            }
            this.doRun();
        }
        catch (Exception e) {
            log.error("Failed executing the task {}", (Object)this.executeStageRequest, (Object)e);
            throw e;
        }
    }

    private void doRun() throws Exception {
        PublishSubject executeStageSubject = PublishSubject.create();
        this.mantisServices.add((Service)MetricsFactory.newMetricsServer((CoreConfiguration)this.config, (ExecuteStageRequest)this.executeStageRequest));
        WorkerMetricsClient workerMetricsClient = new WorkerMetricsClient(this.masterMonitor);
        this.mantisServices.add((Service)new ExecuteStageRequestService((Observable<WrappedExecuteStageRequest>)executeStageSubject, (Observer<Observable<Status>>)this.tasksStatusSubject, new WorkerExecutionOperationsNetworkStage((Observer<VirtualMachineTaskStatus>)this.vmTaskStatusSubject, this.masterMonitor, this.config, workerMetricsClient, this.sinkSubscriptionStateHandlerFactory, this.userCodeClassLoader.asClassLoader()), this.getJobProviderClass(), this.userCodeClassLoader, this.mantisJob));
        log.info("Starting Mantis Worker for task {}", (Object)this);
        for (Service service : this.mantisServices) {
            log.info("Starting service: " + service.getClass().getName());
            try {
                service.start();
            }
            catch (Throwable e) {
                log.error(String.format("Failed to start service %s: %s", service, e.getMessage()), e);
                throw e;
            }
        }
        executeStageSubject.onNext((Object)this.wrappedExecuteStageRequest);
    }

    protected void shutDown() {
        log.info("Attempting to cancel task {}", (Object)this);
        for (Service service : this.mantisServices) {
            log.info("Stopping service: " + service.getClass().getName());
            try {
                service.shutdown();
            }
            catch (Throwable e) {
                log.error(String.format("Failed to stop service %s: %s", service, e.getMessage()), e);
                throw e;
            }
        }
    }

    private Optional<String> getJobProviderClass() {
        return this.executeStageRequest.getNameOfJobProviderClass();
    }

    protected Observable<Status> getStatus() {
        return this.tasksStatusSubject.flatMap(status -> status);
    }

    public Observable<VirtualMachineTaskStatus> getVMStatus() {
        return this.vmTaskStatusSubject;
    }

    public String getWorkerId() {
        return this.executeStageRequest.getWorkerId().getId();
    }
}

