/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.server.worker;

import com.sampullara.cli.Args;
import com.sampullara.cli.Argument;
import io.mantisrx.common.metrics.netty.MantisNettyEventsListenerFactory;
import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.loader.ClassLoaderHandle;
import io.mantisrx.runtime.loader.SinkSubscriptionStateHandler;
import io.mantisrx.runtime.loader.config.WorkerConfiguration;
import io.mantisrx.server.core.BaseService;
import io.mantisrx.server.core.CoreConfiguration;
import io.mantisrx.server.core.Service;
import io.mantisrx.server.core.WrappedExecuteStageRequest;
import io.mantisrx.server.master.client.HighAvailabilityServices;
import io.mantisrx.server.master.client.HighAvailabilityServicesUtil;
import io.mantisrx.server.master.client.MantisMasterGateway;
import io.mantisrx.server.worker.RuntimeTaskImpl;
import io.mantisrx.server.worker.config.ConfigurationFactory;
import io.mantisrx.server.worker.config.StaticPropertiesConfigurationFactory;
import io.mantisrx.shaded.com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.time.Clock;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import mantis.io.reactivex.netty.RxNetty;
import mantis.io.reactivex.netty.metrics.MetricEventsListenerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Subscription;
import rx.subjects.PublishSubject;

public class MantisWorker
extends BaseService {
    private static final Logger logger = LoggerFactory.getLogger(MantisWorker.class);
    @Argument(alias="p", description="Specify a configuration file", required=false)
    private static String propFile = "worker.properties";
    private CountDownLatch blockUntilShutdown = new CountDownLatch(1);
    private List<Service> mantisServices = new LinkedList<Service>();

    public MantisWorker(ConfigurationFactory configFactory, io.mantisrx.server.master.client.config.ConfigurationFactory coreConfigFactory) {
        this(configFactory, Optional.empty());
    }

    public MantisWorker(ConfigurationFactory configFactory, final Optional<Job> jobToRun) {
        System.setProperty("rx.ring-buffer.size", "1024");
        final WorkerConfiguration config = configFactory.getConfig();
        final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtil.createHAServices((CoreConfiguration)config);
        this.mantisServices.add(new Service(){

            public void start() {
                highAvailabilityServices.startAsync().awaitRunning();
            }

            public void shutdown() {
                highAvailabilityServices.stopAsync().awaitTerminated();
            }

            public void enterActiveMode() {
            }

            public String toString() {
                return "HighAvailabilityServices Service";
            }
        });
        final MantisMasterGateway gateway = highAvailabilityServices.getMasterClientApi();
        Thread t = new Thread(){

            @Override
            public void run() {
                MantisWorker.this.shutdown();
            }
        };
        t.setDaemon(true);
        Runtime.getRuntime().addShutdownHook(t);
        final PublishSubject executeStageSubject = PublishSubject.create();
        this.mantisServices.add(new Service(){
            private RuntimeTaskImpl runtimeTaskImpl;
            private Subscription vmStatusSubscription;

            public void start() {
                ClassLoader classLoader;
                if (Thread.currentThread().getContextClassLoader() == null) {
                    classLoader = ClassLoader.getSystemClassLoader();
                    logger.info("Choosing system classloader {}", (Object)classLoader);
                } else {
                    classLoader = Thread.currentThread().getContextClassLoader();
                    logger.info("Choosing current thread classloader {}", (Object)classLoader);
                }
                executeStageSubject.asObservable().first().subscribe(wrappedRequest -> {
                    try {
                        this.runtimeTaskImpl = new RuntimeTaskImpl();
                        this.runtimeTaskImpl.initialize((WrappedExecuteStageRequest)wrappedRequest, config, gateway, ClassLoaderHandle.fixed((ClassLoader)classLoader).getOrResolveClassLoader((Collection)ImmutableList.of(), (Collection)ImmutableList.of()), SinkSubscriptionStateHandler.Factory.forEphemeralJobsThatNeedToBeKilledInAbsenceOfSubscriber((MantisMasterGateway)gateway, (Clock)Clock.systemDefaultZone()));
                        this.runtimeTaskImpl.setJob(jobToRun);
                        this.runtimeTaskImpl.startAsync();
                    }
                    catch (Exception ex) {
                        logger.error("Failed to start task, request: {}", wrappedRequest, (Object)ex);
                        throw new RuntimeException("Failed to start task", ex);
                    }
                });
            }

            public void shutdown() {
                if (this.runtimeTaskImpl != null) {
                    try {
                        this.runtimeTaskImpl.stopAsync().awaitTerminated();
                    }
                    finally {
                        this.vmStatusSubscription.unsubscribe();
                    }
                }
            }

            public void enterActiveMode() {
            }

            public String toString() {
                return "TaskService";
            }
        });
    }

    private static Properties loadProperties(String propFile) {
        Properties props = new Properties();
        try (InputStream in = MantisWorker.findResourceAsStream(propFile);){
            props.load(in);
        }
        catch (IOException e) {
            throw new RuntimeException(String.format("Can't load properties from the given property file %s: %s", propFile, e.getMessage()), e);
        }
        return props;
    }

    private static InputStream findResourceAsStream(String resourceName) throws FileNotFoundException {
        File resource = new File(resourceName);
        if (resource.exists()) {
            return new FileInputStream(resource);
        }
        InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(resourceName);
        if (is == null) {
            throw new FileNotFoundException(String.format("Can't find property file %s. Make sure the property file is either in your path or in your classpath ", resourceName));
        }
        return is;
    }

    public static void main(String[] args) {
        try {
            Args.parse(MantisWorker.class, (String[])args);
        }
        catch (IllegalArgumentException e) {
            Args.usage(MantisWorker.class);
            System.exit(1);
        }
        try {
            StaticPropertiesConfigurationFactory workerConfigFactory = new StaticPropertiesConfigurationFactory(MantisWorker.loadProperties(propFile));
            io.mantisrx.server.master.client.config.StaticPropertiesConfigurationFactory coreConfigFactory = new io.mantisrx.server.master.client.config.StaticPropertiesConfigurationFactory(MantisWorker.loadProperties(propFile));
            MantisWorker worker = new MantisWorker((ConfigurationFactory)workerConfigFactory, (io.mantisrx.server.master.client.config.ConfigurationFactory)coreConfigFactory);
            worker.start();
        }
        catch (Exception e) {
            logger.error("Unexpected error: " + e.getMessage(), (Throwable)e);
            System.exit(2);
        }
    }

    public void start() {
        this.startUp();
        this.awaitTerminated();
    }

    public void startUp() {
        logger.info("Starting Mantis Worker");
        RxNetty.useMetricListenersFactory((MetricEventsListenerFactory)new MantisNettyEventsListenerFactory());
        for (Service service : this.mantisServices) {
            logger.info("Starting service: " + service);
            try {
                service.start();
            }
            catch (Throwable e) {
                logger.error(String.format("Failed to start service %s: %s", service, e.getMessage()), e);
                throw e;
            }
            logger.info("Started service: " + service);
        }
        logger.info("Started Mantis Worker successfully");
    }

    public void awaitTerminated() {
        try {
            this.blockUntilShutdown.await();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void shutdown() {
        logger.info("Shutting down Mantis Worker");
        for (Service service : this.mantisServices) {
            service.shutdown();
        }
        this.blockUntilShutdown.countDown();
    }

    public void enterActiveMode() {
    }
}

