/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.yarn;

import com.google.common.base.Preconditions;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.jobmanager.JobManager;
import eu.stratosphere.yarn.Utils;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.util.Records;

public class ApplicationMaster {
    private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
    private static final int HEAP_LIMIT_CAP = 500;

    private void run() throws Exception {
        String line;
        Configuration conf = Utils.initializeYarnConfiguration();
        FileSystem fs = FileSystem.get((Configuration)conf);
        Map<String, String> envs = System.getenv();
        String currDir = envs.get(ApplicationConstants.Environment.PWD.key());
        String logDirs = envs.get(ApplicationConstants.Environment.LOG_DIRS.key());
        String ownHostname = envs.get(ApplicationConstants.Environment.NM_HOST.key());
        String appId = envs.get("_APP_ID");
        String clientHomeDir = envs.get("_CLIENT_HOME_DIR");
        String applicationMasterHost = envs.get(ApplicationConstants.Environment.NM_HOST.key());
        String remoteStratosphereJarPath = envs.get("_STRATOSPHERE_JAR_PATH");
        String shipListString = envs.get("_CLIENT_SHIP_FILES");
        String yarnClientUsername = envs.get("_CLIENT_USERNAME");
        int taskManagerCount = Integer.valueOf(envs.get("_CLIENT_TM_COUNT"));
        int memoryPerTaskManager = Integer.valueOf(envs.get("_CLIENT_TM_MEMORY"));
        int coresPerTaskManager = Integer.valueOf(envs.get("_CLIENT_TM_CORES"));
        int heapLimit = (int)((double)memoryPerTaskManager * 0.85);
        if (memoryPerTaskManager - heapLimit > 500) {
            heapLimit = memoryPerTaskManager - 500;
        }
        if (currDir == null) {
            throw new RuntimeException("Current directory unknown");
        }
        if (ownHostname == null) {
            throw new RuntimeException("Own hostname (" + ApplicationConstants.Environment.NM_HOST + ") not set.");
        }
        LOG.info((Object)("Working directory " + currDir));
        Utils.getStratosphereConfiguration(currDir);
        String localWebInterfaceDir = currDir + "/resources/" + "web-docs-infoserver";
        FileInputStream fis = new FileInputStream(currDir + "/stratosphere-conf.yaml");
        BufferedReader br = new BufferedReader(new InputStreamReader(fis));
        BufferedWriter output = new BufferedWriter(new FileWriter(currDir + "/stratosphere-conf-modified.yaml"));
        while ((line = br.readLine()) != null) {
            if (line.contains("jobmanager.rpc.address")) {
                output.append("jobmanager.rpc.address: " + ownHostname + "\n");
                continue;
            }
            if (line.contains("jobmanager.web.rootpath")) {
                output.append("jobmanager.web.rootpath: \n");
                continue;
            }
            output.append(line + "\n");
        }
        output.append("jobmanager.rpc.address: " + ownHostname + "\n");
        output.append("jobmanager.web.rootpath: " + localWebInterfaceDir + "\n");
        output.append("jobmanager.web.logpath: " + logDirs + "\n");
        ((Writer)output).close();
        br.close();
        File newConf = new File(currDir + "/stratosphere-conf-modified.yaml");
        if (!newConf.exists()) {
            LOG.warn((Object)"modified yaml does not exist!");
        }
        Utils.copyJarContents("resources/web-docs-infoserver", ApplicationMaster.class.getProtectionDomain().getCodeSource().getLocation().getPath());
        String pathToNepheleConfig = currDir + "/stratosphere-conf-modified.yaml";
        String[] args = new String[]{"-executionMode", "cluster", "-configDir", pathToNepheleConfig};
        JobManager jm = JobManager.initialize((String[])args);
        jm.startInfoServer();
        AMRMClient rmClient = AMRMClient.createAMRMClient();
        rmClient.init(conf);
        rmClient.start();
        NMClient nmClient = NMClient.createNMClient();
        nmClient.init(conf);
        nmClient.start();
        LOG.info((Object)"registering ApplicationMaster");
        rmClient.registerApplicationMaster(applicationMasterHost, 0, "http://" + applicationMasterHost + ":" + GlobalConfiguration.getString((String)"jobmanager.web.port", (String)"undefined"));
        Priority priority = (Priority)Records.newRecord(Priority.class);
        priority.setPriority(0);
        Resource capability = (Resource)Records.newRecord(Resource.class);
        capability.setMemory(memoryPerTaskManager);
        capability.setVirtualCores(coresPerTaskManager);
        for (int i = 0; i < taskManagerCount; ++i) {
            AMRMClient.ContainerRequest containerAsk = new AMRMClient.ContainerRequest(capability, null, null, priority);
            LOG.info((Object)("Requesting TaskManager container " + i));
            rmClient.addContainerRequest(containerAsk);
        }
        LocalResource stratosphereJar = (LocalResource)Records.newRecord(LocalResource.class);
        LocalResource stratosphereConf = (LocalResource)Records.newRecord(LocalResource.class);
        Path remoteJarPath = new Path(remoteStratosphereJarPath);
        Utils.registerLocalResource(fs, remoteJarPath, stratosphereJar);
        Path remoteConfPath = Utils.setupLocalResource(conf, fs, appId, new Path("file://" + currDir + "/stratosphere-conf-modified.yaml"), stratosphereConf, new Path(clientHomeDir));
        LOG.info((Object)("Prepared localresource for modified yaml: " + stratosphereConf));
        boolean hasLog4j = new File(currDir + "/log4j.properties").exists();
        LocalResource[] remoteShipRsc = null;
        String[] remoteShipPaths = shipListString.split(",");
        if (!shipListString.isEmpty()) {
            remoteShipRsc = new LocalResource[remoteShipPaths.length];
            int i = 0;
            for (String remoteShipPathStr : remoteShipPaths) {
                if (remoteShipPathStr == null || remoteShipPathStr.isEmpty()) continue;
                remoteShipRsc[i] = (LocalResource)Records.newRecord(LocalResource.class);
                Path remoteShipPath = new Path(remoteShipPathStr);
                Utils.registerLocalResource(fs, remoteShipPath, remoteShipRsc[i]);
                ++i;
            }
        }
        String javaOpts = GlobalConfiguration.getString((String)"env.java.opts", (String)"");
        int allocatedContainers = 0;
        int completedContainers = 0;
        while (allocatedContainers < taskManagerCount) {
            AllocateResponse response = rmClient.allocate(0.0f);
            for (Container container : response.getAllocatedContainers()) {
                LOG.info((Object)("Got new Container for TM " + container.getId() + " on host " + container.getNodeId().getHost()));
                ++allocatedContainers;
                ContainerLaunchContext ctx = (ContainerLaunchContext)Records.newRecord(ContainerLaunchContext.class);
                String tmCommand = "$JAVA_HOME/bin/java -Xmx" + heapLimit + "m " + javaOpts;
                if (hasLog4j) {
                    tmCommand = tmCommand + " -Dlog.file=\"<LOG_DIR>/taskmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
                }
                tmCommand = tmCommand + " eu.stratosphere.yarn.YarnTaskManagerRunner -configDir .  1><LOG_DIR>/taskmanager-stdout.log 2><LOG_DIR>/taskmanager-stderr.log";
                ctx.setCommands(Collections.singletonList(tmCommand));
                LOG.info((Object)("Starting TM with command=" + tmCommand));
                HashMap<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
                localResources.put("stratosphere.jar", stratosphereJar);
                localResources.put("stratosphere-conf.yaml", stratosphereConf);
                if (!shipListString.isEmpty()) {
                    Preconditions.checkNotNull((Object)remoteShipRsc);
                    for (int i = 0; i < remoteShipPaths.length; ++i) {
                        localResources.put(new Path(remoteShipPaths[i]).getName(), remoteShipRsc[i]);
                    }
                }
                ctx.setLocalResources(localResources);
                HashMap<String, String> containerEnv = new HashMap<String, String>();
                Utils.setupEnv(conf, containerEnv);
                containerEnv.put("_CLIENT_USERNAME", yarnClientUsername);
                ctx.setEnvironment(containerEnv);
                UserGroupInformation user = UserGroupInformation.getCurrentUser();
                try {
                    Credentials credentials = user.getCredentials();
                    DataOutputBuffer dob = new DataOutputBuffer();
                    credentials.writeTokenStorageToStream((DataOutputStream)dob);
                    ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
                    ctx.setTokens(securityTokens);
                }
                catch (IOException e) {
                    LOG.warn((Object)("Getting current user info failed when trying to launch the container" + e.getMessage()));
                }
                LOG.info((Object)("Launching container " + allocatedContainers));
                nmClient.startContainer(container, ctx);
            }
            for (ContainerStatus status : response.getCompletedContainersStatuses()) {
                LOG.info((Object)("Completed container (while allocating) " + status.getContainerId() + ". Total Completed:" + ++completedContainers));
                LOG.info((Object)("Diagnostics " + status.getDiagnostics()));
            }
            Thread.sleep(100L);
        }
        while (completedContainers < taskManagerCount) {
            AllocateResponse response = rmClient.allocate((float)(completedContainers / taskManagerCount));
            for (ContainerStatus status : response.getCompletedContainersStatuses()) {
                LOG.info((Object)("Completed container " + status.getContainerId() + ". Total Completed:" + ++completedContainers));
                LOG.info((Object)("Diagnostics " + status.getDiagnostics()));
            }
            Thread.sleep(5000L);
        }
        LOG.info((Object)"Shutting down JobManager");
        jm.shutdown();
        rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "", "");
    }

    public static void main(String[] args) throws Exception {
        String yarnClientUsername = System.getenv("_CLIENT_USERNAME");
        LOG.info((Object)("YARN daemon runs as '" + UserGroupInformation.getCurrentUser().getShortUserName() + "' setting" + " user to execute Stratosphere ApplicationMaster/JobManager to '" + yarnClientUsername + "'"));
        UserGroupInformation ugi = UserGroupInformation.createRemoteUser((String)yarnClientUsername);
        for (Token toks : UserGroupInformation.getCurrentUser().getTokens()) {
            ugi.addToken(toks);
        }
        ugi.doAs((PrivilegedAction)new PrivilegedAction<Object>(){

            @Override
            public Object run() {
                try {
                    new ApplicationMaster().run();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                return null;
            }
        });
    }
}

