/*
 * Decompiled with CFR 0.152.
 */
package hdfs;

import com.gemstone.gemfire.Statistics;
import com.gemstone.gemfire.StatisticsFactory;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheListener;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.SequenceFileHoplog;
import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper;
import hdfs.HDFSBlackboard;
import hdfs.HDFSPrms;
import hdfs.HDFSUtilBB;
import hydra.CacheHelper;
import hydra.ConfigPrms;
import hydra.DistributedSystemHelper;
import hydra.FileUtil;
import hydra.HDFSStoreDescription;
import hydra.HDFSStoreHelper;
import hydra.HadoopDescription;
import hydra.HadoopHelper;
import hydra.HostDescription;
import hydra.HydraRuntimeException;
import hydra.HydraTimeoutException;
import hydra.Log;
import hydra.MasterController;
import hydra.PortHelper;
import hydra.Prms;
import hydra.ProcessMgr;
import hydra.RemoteTestModule;
import hydra.TestConfig;
import hydra.blackboard.SharedMap;
import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.rmi.RemoteException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import util.SummaryLogListener;
import util.TestException;
import util.TestHelper;
import util.ValueHolder;

public class HDFSUtil {
    public static final String HDFS_RESULT_REGION = "hdfsResultRegion";
    public static boolean serverAlive = true;

    public static void startMapReduceCluster() {
        HadoopHelper.startResourceManager(ConfigPrms.getHadoopConfig());
        HadoopHelper.startNodeManagers(ConfigPrms.getHadoopConfig());
    }

    public static void stopMapReduceCluster() {
        HadoopHelper.stopNodeManagers(ConfigPrms.getHadoopConfig());
        HadoopHelper.stopResourceManager(ConfigPrms.getHadoopConfig());
    }

    public static void startCluster() {
        if (HDFSPrms.useExistingCluster()) {
            if (HDFSPrms.manageMapReduceComponents()) {
                HDFSUtil.startMapReduceCluster();
            }
        } else {
            HadoopHelper.startCluster(ConfigPrms.getHadoopConfig());
        }
    }

    public static void stopCluster() {
        if (HDFSPrms.useExistingCluster()) {
            if (HDFSPrms.manageMapReduceComponents()) {
                HDFSUtil.stopMapReduceCluster();
            }
        } else {
            HadoopHelper.stopCluster(ConfigPrms.getHadoopConfig());
        }
    }

    public static void startHDFSCluster() {
        HadoopHelper.startHDFSCluster(ConfigPrms.getHadoopConfig());
    }

    public static void stopHDFSCluster() {
        HadoopHelper.stopHDFSCluster(ConfigPrms.getHadoopConfig());
    }

    public static void configureHDFSTask() {
        HadoopHelper.configureHDFS(ConfigPrms.getHadoopConfig());
    }

    public static void configureHadoopTask() {
        HadoopHelper.configureHadoop(ConfigPrms.getHadoopConfig());
    }

    public static void formatNameNodesTask() {
        HadoopHelper.formatNameNodes(ConfigPrms.getHadoopConfig());
    }

    public static void startNameNodesTask() {
        HadoopHelper.startNameNodes(ConfigPrms.getHadoopConfig());
    }

    public static void startDataNodesTask() {
        HadoopHelper.startDataNodes(ConfigPrms.getHadoopConfig());
    }

    public static synchronized void restartCluster() {
        String hadoopConfig = ConfigPrms.getHadoopConfig();
        MasterController.sleepForMs(HDFSPrms.hadoopStartWaitSec() * 1000);
        HadoopHelper.startNameNodes(hadoopConfig);
        HadoopHelper.startDataNodes(hadoopConfig);
        HadoopHelper.startResourceManager(hadoopConfig);
        HadoopHelper.startNodeManagers(hadoopConfig);
        MasterController.sleepForMs(HDFSPrms.hadoopReturnWaitSec() * 1000);
    }

    public static void stopNameNodesTask() {
        HadoopHelper.stopNameNodes(ConfigPrms.getHadoopConfig());
    }

    public static void stopDataNodesTask() {
        HadoopHelper.stopDataNodes(ConfigPrms.getHadoopConfig());
    }

    public static void configureAndStartHadoop() {
        String hadoopConfig = ConfigPrms.getHadoopConfig();
        HadoopHelper.configureHadoop(hadoopConfig);
        HadoopHelper.startCluster(hadoopConfig);
    }

    public static void stopStartDataNodes() {
        String hadoopConfig = ConfigPrms.getHadoopConfig();
        MasterController.sleepForMs(HDFSPrms.hadoopStopWaitSec() * 1000);
        HDFSUtilBB.getBB().getSharedCounters().increment(HDFSUtilBB.recycleInProgress);
        HadoopHelper.stopDataNodes(hadoopConfig);
        MasterController.sleepForMs(HDFSPrms.hadoopStartWaitSec() * 1000);
        HadoopHelper.startDataNodes(hadoopConfig);
        MasterController.sleepForMs(HDFSPrms.hadoopReturnWaitSec() * 1000);
        HDFSUtilBB.getBB().getSharedCounters().zero(HDFSUtilBB.recycleInProgress);
    }

    public static void stopStartNameNodes() {
        String hadoopConfig = ConfigPrms.getHadoopConfig();
        MasterController.sleepForMs(HDFSPrms.hadoopStopWaitSec() * 1000);
        HDFSUtilBB.getBB().getSharedCounters().increment(HDFSUtilBB.recycleInProgress);
        HadoopHelper.stopNameNodes(hadoopConfig);
        MasterController.sleepForMs(HDFSPrms.hadoopStartWaitSec() * 1000);
        HadoopHelper.startNameNodes(hadoopConfig);
        MasterController.sleepForMs(HDFSPrms.hadoopReturnWaitSec() * 1000);
        HDFSUtilBB.getBB().getSharedCounters().zero(HDFSUtilBB.recycleInProgress);
    }

    public static void recycleDataNode() {
        Integer pid;
        String hadoopConfig = ConfigPrms.getHadoopConfig();
        HadoopDescription hdd = HadoopHelper.getHadoopDescription(hadoopConfig);
        List<HadoopDescription.DataNodeDescription> dnds = hdd.getDataNodeDescriptions();
        if (dnds.isEmpty()) {
            String s = "Cannot recycle data node, dataNodeDescriptions is empty";
            throw new HydraRuntimeException(s);
        }
        HadoopDescription.DataNodeDescription dnd = dnds.get(0);
        HostDescription hd = dnd.getHostDescription();
        String host = dnd.getHostName();
        String pidfn = dnd.getPIDDir() + "/hadoop-" + System.getProperty("user.name") + "-datanode.pid";
        if (hdd.isSecure()) {
            Log.getLogWriter().info("Using secure mode (kerberos) as root");
            pidfn = dnd.getPIDDir() + "/hadoop_secure_dn.pid";
        }
        if (!ProcessMgr.processExists(host, pid = HDFSUtil.getPid(pidfn))) {
            String s = pid + " for data node is not running on " + host + " see " + dnd.getLogDir() + " for output";
            throw new HydraRuntimeException(s);
        }
        Log.getLogWriter().info("Killing data node on host " + host + " pid = " + pid + ", see " + dnd.getLogDir() + " for output");
        MasterController.sleepForMs(HDFSPrms.hadoopStopWaitSec() * 1000);
        Log.getLogWriter().info("Killing data node on host " + host + " pid = " + pid + ", see " + dnd.getLogDir() + " for output");
        HDFSUtilBB.getBB().getSharedCounters().increment(HDFSUtilBB.recycleInProgress);
        if (hdd.isSecure()) {
            ProcessMgr.shutdownProcess(host, pid);
        } else {
            ProcessMgr.killProcess(host, pid);
        }
        Log.getLogWriter().info("Killed data node on host " + host + " pid = " + pid + ", see " + dnd.getLogDir() + " for output");
        MasterController.sleepForMs(HDFSPrms.hadoopStartWaitSec() * 1000);
        Log.getLogWriter().info("Restarting data node on host " + host + " see " + dnd.getLogDir() + " for output");
        HadoopHelper.startDataNode(hadoopConfig, host);
        Log.getLogWriter().info("Restarted data node on host " + host + " see " + dnd.getLogDir() + " for output");
        MasterController.sleepForMs(HDFSPrms.hadoopReturnWaitSec() * 1000);
        HDFSUtilBB.getBB().getSharedCounters().zero(HDFSUtilBB.recycleInProgress);
    }

    public static void recycleNameNode() {
        String pidfn;
        Integer pid;
        String hadoopConfig = ConfigPrms.getHadoopConfig();
        HadoopDescription hd = HadoopHelper.getHadoopDescription(hadoopConfig);
        List<HadoopDescription.NameNodeDescription> nnds = hd.getNameNodeDescriptions();
        if (nnds.isEmpty()) {
            String s = "Cannot recycle name node, nameNodeDescriptions is empty";
            throw new HydraRuntimeException(s);
        }
        HadoopDescription.NameNodeDescription nnd = nnds.get(0);
        String host = nnd.getHostName();
        if (!ProcessMgr.processExists(host, pid = HDFSUtil.getPid(pidfn = nnd.getPIDDir() + "/hadoop-" + System.getProperty("user.name") + "-namenode.pid"))) {
            String s = pid + " for name node is not running on " + host + " see " + nnd.getLogDir() + " for output";
            throw new HydraRuntimeException(s);
        }
        Log.getLogWriter().info("Killing name node on host " + host + " pid = " + pid + ", see " + nnd.getLogDir() + " for output");
        MasterController.sleepForMs(HDFSPrms.hadoopStopWaitSec() * 1000);
        HDFSUtilBB.getBB().getSharedCounters().increment(HDFSUtilBB.recycleInProgress);
        ProcessMgr.killProcess(host, pid);
        Log.getLogWriter().info("Killed name node on host " + host + " pid = " + pid + ", see " + nnd.getLogDir() + " for output");
        MasterController.sleepForMs(HDFSPrms.hadoopStartWaitSec() * 1000);
        Log.getLogWriter().info("Restarting name node on host " + host + " see " + nnd.getLogDir() + " for output");
        HadoopHelper.startNameNode(hadoopConfig, host);
        Log.getLogWriter().info("Restarted name node on host " + host + " see " + nnd.getLogDir() + " for output");
        MasterController.sleepForMs(HDFSPrms.hadoopReturnWaitSec() * 1000);
        HDFSUtilBB.getBB().getSharedCounters().zero(HDFSUtilBB.recycleInProgress);
    }

    private static Integer getPid(String pidfn) {
        Integer pid = null;
        try {
            pid = new Integer(FileUtil.getContents(pidfn));
        }
        catch (NumberFormatException e) {
            String s = "Cannot read PID from file: " + pidfn;
            throw new HydraRuntimeException(s, e);
        }
        catch (RemoteException e) {
            String s = "Cannot read PID from file: " + pidfn;
            throw new HydraRuntimeException(s, e);
        }
        catch (IOException e) {
            String s = "Cannot read PID file: " + pidfn;
            throw new HydraRuntimeException(s, e);
        }
        return pid;
    }

    public static void loadDataFromHDFS(Region aRegion, String regionName) {
        FileSystem fs;
        Cache aCache = CacheHelper.getCache();
        GemFireCacheImpl cacheImpl = (GemFireCacheImpl)aCache;
        String hdfsStoreName = aRegion.getAttributes().getHDFSStoreName();
        HDFSStoreImpl storeImpl = cacheImpl.findHDFSStore(hdfsStoreName);
        Log.getLogWriter().info("validate() invoked for " + hdfsStoreName);
        try {
            fs = storeImpl.getFileSystem();
        }
        catch (IOException e1) {
            throw new HDFSIOException(e1.getMessage(), (Throwable)e1);
        }
        Log.getLogWriter().info("creating region based on files in HDFS directory: " + storeImpl.getHomeDir());
        try {
            Path basePath = new Path(storeImpl.getHomeDir());
            String pathPattern = regionName + "/*/*.*";
            String sep = File.separator;
            String hadoopCmd = " fs -ls " + basePath + sep + pathPattern;
            int vmid = RemoteTestModule.getMyVmid();
            String clientName = RemoteTestModule.getMyClientName();
            String host = RemoteTestModule.getMyHost();
            HostDescription hd = TestConfig.getInstance().getClientDescription(clientName).getVmDescription().getHostDescription();
            String logfn = hd.getUserDir() + sep + "vm_" + vmid + "_" + clientName + "_" + host + "_loadDataFromHDFS.log";
            HadoopDescription hdd = HadoopHelper.getHadoopDescription(ConfigPrms.getHadoopConfig());
            if (hdd.isSecure()) {
                Log.getLogWriter().info("HDFSUtil.loadDataFromHDFS()-execute kinit in secure mode");
                String userKinit = "/export/gcm/where/java/hadoop/hadoop-secure-keytabs/gfxd-secure.keytab gfxd-secure@GEMSTONE.COM";
                HadoopHelper.executeKinitCommand(host, userKinit, 120);
            }
            HadoopHelper.runHadoopCommand(ConfigPrms.getHadoopConfig(), hadoopCmd, logfn);
            Path regionPath = new Path(basePath, pathPattern);
            FileStatus[] sortedListOfHoplogs = fs.globStatus(regionPath);
            for (int i = 0; i < sortedListOfHoplogs.length; ++i) {
                Path hoplogPath = sortedListOfHoplogs[i].getPath();
                Log.getLogWriter().fine("Processing hoplog " + hoplogPath);
                try {
                    HDFSUtil.loadSequenceFile(aRegion, fs, hoplogPath);
                    continue;
                }
                catch (FileNotFoundException fnfe) {
                    if (hoplogPath.toString().endsWith(".tmp")) {
                        Path newHoplogPath = HDFSUtil.getBaseHoplogPath(hoplogPath);
                        Log.getLogWriter().info("loadDataFromHDFS caught FileNotFoundException while processing " + hoplogPath + ", retrying with base filename " + newHoplogPath);
                        HDFSUtil.loadSequenceFile(aRegion, fs, newHoplogPath);
                        continue;
                    }
                    throw new TestException("loadDataFromHDFS caught unexpected Exception for SequenceHoplog which is not a tmp file" + fnfe + "\n" + TestHelper.getStackTrace(fnfe));
                }
            }
        }
        catch (TestException e) {
            throw e;
        }
        catch (Exception e) {
            throw new TestException("loadDataFromHDFS caught unexpected Exception " + e + ": " + TestHelper.getStackTrace(e));
        }
        finally {
            HadoopDescription hdd = HadoopHelper.getHadoopDescription(ConfigPrms.getHadoopConfig());
            if (hdd.getSecurityAuthentication().equals("kerberos")) {
                HadoopHelper.executeKdestroyCommand(RemoteTestModule.getMyHost(), 120);
            }
        }
    }

    private static Path getBaseHoplogPath(Path hoplogPath) {
        String originalFilename = hoplogPath.toString();
        int tmpExtIndex = originalFilename.lastIndexOf(".tmp");
        String trimmedFilename = originalFilename.substring(0, tmpExtIndex);
        return new Path(trimmedFilename);
    }

    public static void loadSequenceFile(Region aRegion, FileSystem inputFS, Path sequenceFileName) throws Exception {
        SequenceFileHoplog hoplog = new SequenceFileHoplog(inputFS, sequenceFileName, null);
        HoplogSetReader.HoplogIterator iter = hoplog.getReader().scan();
        boolean isSerialExecution = TestConfig.tab().booleanAt(Prms.serialExecution);
        while (iter.hasNext()) {
            iter.next();
            UnsortedHoplogPersistedEvent te = UnsortedHoplogPersistedEvent.fromBytes((byte[])((byte[])iter.getValue()));
            String stringkey = (String)CacheServerHelper.deserialize((byte[])((byte[])iter.getKey()));
            Operation op = te.getOperation();
            Object value = null;
            ValueHolder displayVal = null;
            if (te != null) {
                displayVal = (ValueHolder)te.getDeserializedValue();
            }
            Log.getLogWriter().fine("loadSequenceFile record: " + op.toString() + ": key = " + stringkey + ", value = " + displayVal);
            try {
                if (op.isCreate()) {
                    aRegion.put((Object)stringkey, (Object)((ValueHolder)te.getDeserializedValue()));
                    continue;
                }
                if (op.isUpdate()) {
                    aRegion.put((Object)stringkey, (Object)((ValueHolder)te.getDeserializedValue()));
                    continue;
                }
                if (op.isDestroy()) {
                    aRegion.destroy((Object)stringkey);
                    continue;
                }
                Log.getLogWriter().info("StreamingValidator.loadSequenceFile(): Unexpected operation " + op.toString() + " : " + TestHelper.getStackTrace());
            }
            catch (EntryNotFoundException e) {
                if (op.isDestroy() && te.isPossibleDuplicate()) {
                    Log.getLogWriter().info("loadSequenceFile caught EntryNotFoundException for " + stringkey);
                    continue;
                }
                if (!isSerialExecution) continue;
                Log.getLogWriter().info("loadSequenceFile caught " + (Object)((Object)e) + ":" + TestHelper.getStackTrace(e));
                throw new TestException("loadSequenceFile caught unexpected Exception " + (Object)((Object)e) + ":" + TestHelper.getStackTrace(e));
            }
        }
        iter.close();
        hoplog.close();
    }

    public static void execMapReduceJob() {
        String extraArgs = null;
        HDFSUtil.execMapReduceJob(extraArgs);
    }

    public static void execMapReduceJob(String extraArgs) {
        String mapReduceClassName = HDFSPrms.getMapReduceClassName();
        try {
            HDFSUtil.execMapReduceJob(mapReduceClassName, extraArgs);
        }
        finally {
            HadoopDescription hdd = HadoopHelper.getHadoopDescription(ConfigPrms.getHadoopConfig());
            if (hdd.getSecurityAuthentication().equals("kerberos")) {
                HadoopHelper.executeKdestroyCommand(RemoteTestModule.getMyHost(), 120);
            }
        }
    }

    public static void execMapReduceJob(String mapReduceClassName, String extraArgs) {
        List endpoints = DistributedSystemHelper.getSystemEndpoints();
        DistributedSystemHelper.Endpoint endpoint = (DistributedSystemHelper.Endpoint)endpoints.get(0);
        String locatorHost = endpoint.getHost();
        int locatorPort = endpoint.getPort();
        String sep = File.separator;
        String jtests = System.getProperty("JTESTS");
        String MRJarPath = jtests + sep + ".." + sep + "extraJars" + sep + "mapreduce.jar";
        String gemfireJarPath = jtests + sep + ".." + sep + ".." + sep + "product" + sep + "lib" + sep + "gemfire.jar";
        String gemfireLibPath = jtests + sep + ".." + sep + ".." + sep + "product" + sep + "lib";
        String hbaseJarPath = HDFSUtil.getHbaseJar(gemfireLibPath);
        String hdfsStoreConfig = ConfigPrms.getHDFSStoreConfig();
        HDFSStoreDescription hsd = HDFSStoreHelper.getHDFSStoreDescription(hdfsStoreConfig);
        String hdfsStoreName = hsd.getName();
        HadoopDescription hdd = HadoopHelper.getHadoopDescription(ConfigPrms.getHadoopConfig());
        String confDir = hdd.getResourceManagerDescription().getConfDir();
        String hdfsHomeDir = HDFSStoreHelper.getHDFSStore(hdfsStoreName).getHomeDir();
        String cmd = "env CLASSPATH=" + System.getProperty("java.class.path") + " ";
        cmd = cmd + "env HADOOP_CLASSPATH=" + System.getProperty("java.class.path") + " ";
        cmd = cmd + hdd.getHadoopDist() + sep + "bin" + sep + "yarn ";
        cmd = cmd + "--config " + confDir + " ";
        cmd = cmd + "jar " + MRJarPath + " ";
        cmd = cmd + mapReduceClassName + " ";
        cmd = cmd + " -libjars  " + MRJarPath + "," + gemfireJarPath + "," + hbaseJarPath + " ";
        cmd = cmd + locatorHost + " " + locatorPort + " " + hdfsHomeDir + " ";
        if (extraArgs != null) {
            cmd = cmd + extraArgs;
        }
        Log.getLogWriter().info("Executing " + cmd + "...");
        int vmid = RemoteTestModule.getMyVmid();
        String clientName = RemoteTestModule.getMyClientName();
        String host = RemoteTestModule.getMyHost();
        HostDescription hd = TestConfig.getInstance().getClientDescription(clientName).getVmDescription().getHostDescription();
        String logfn = hd.getUserDir() + sep + "vm_" + vmid + "_" + clientName + "_" + host + "_" + mapReduceClassName + ".log";
        if (hdd.isSecure()) {
            String userKinit = "/export/gcm/where/java/hadoop/hadoop-secure-keytabs/gfxd-secure.keytab gfxd-secure@GEMSTONE.COM";
            HadoopHelper.executeKinitCommand(host, userKinit, 120);
        }
        int pid = ProcessMgr.bgexec(cmd, hd.getUserDir(), logfn);
        try {
            RemoteTestModule.Master.recordHDFSPIDNoDumps(hd, pid, false);
        }
        catch (RemoteException e) {
            String s = "Failed to record PID: " + pid;
            throw new HydraRuntimeException(s, e);
        }
        int maxWaitSec = (int)TestConfig.tab().longAt(Prms.maxResultWaitSec);
        if (!ProcessMgr.waitForDeath(host, pid, maxWaitSec)) {
            String s = "Waited more than " + maxWaitSec + " seconds for MapReduce Job";
            throw new HydraTimeoutException(s);
        }
        try {
            RemoteTestModule.Master.removeHDFSPIDNoDumps(hd, pid, false);
        }
        catch (RemoteException e) {
            Log.getLogWriter().info("execMapReduceJob caught " + e + ": " + TestHelper.getStackTrace(e));
            String s = "Failed to remove PID: " + pid;
            throw new HydraRuntimeException(s, e);
        }
        Log.getLogWriter().info("Completed MapReduce job  on host " + host + " using command: " + cmd + ", see " + logfn + " for output");
    }

    private static String getHbaseJar(String gemfireLibPath) {
        class HbaseJarFilter
        implements FileFilter {
            HbaseJarFilter() {
            }

            @Override
            public boolean accept(File fn) {
                return fn.getName().startsWith("hbase") && fn.getName().contains("gemfire") && fn.getName().endsWith(".jar");
            }
        }
        List hbaseJars = FileUtil.getFiles(new File(gemfireLibPath), new HbaseJarFilter(), false);
        if (hbaseJars.size() != 1) {
            throw new TestException("TestException: cannot uniquely identify gemfire/lib/hbase*gemfire*.jar, please check for a change in hbase jar file naming");
        }
        File matchingJarFile = (File)hbaseJars.get(0);
        return matchingJarFile.getAbsolutePath();
    }

    public static synchronized void getAllHDFSEventsForKey(String keys) {
        Cache cache = CacheHelper.getCache();
        if (cache.getRegion(HDFS_RESULT_REGION) == null) {
            Region hdfsResultRegion = cache.createRegionFactory(RegionShortcut.REPLICATE).addCacheListener((CacheListener)new SummaryLogListener()).create(HDFS_RESULT_REGION);
            Log.getLogWriter().info("Created hdfsResultRegion " + hdfsResultRegion.getFullPath() + " for dumping HDFS updates for " + keys);
            CacheServer cs = cache.addCacheServer();
            int port = PortHelper.getRandomPort();
            cs.setPort(port);
            try {
                cs.start();
                Log.getLogWriter().info("Started server listenening on port " + port);
            }
            catch (IOException e) {
                throw new TestException("getAllHDSEventsForKey caught " + e + " while starting a CacheServer " + TestHelper.getStackTrace(e));
            }
        }
        HDFSUtil.execMapReduceJob("hdfs.mapreduce.GetAllHDFSEventsForKey", keys);
    }

    public static void dumpHDFSResultRegion() {
        StringBuffer hdfsOps = new StringBuffer();
        Region hdfsResultRegion = CacheHelper.getCache().getRegion(HDFS_RESULT_REGION);
        if (hdfsResultRegion == null) {
            return;
        }
        TreeSet keySet = new TreeSet(hdfsResultRegion.keySet());
        for (String key : keySet) {
            Object o = hdfsResultRegion.get((Object)key);
            hdfsOps.append("  " + key + ": " + o.toString() + "\n");
        }
        Log.getLogWriter().info("Contents of HDFSResultRegion = \n" + hdfsOps.toString());
    }

    public static String getHDFSStoreName(Region aRegion) {
        RegionAttributes attrs = aRegion.getAttributes();
        return attrs.getHDFSStoreName();
    }

    public static synchronized void HydraTask_startQueueMonitor() {
        DistributedSystem statFactory = DistributedSystemHelper.getDistributedSystem();
        String key = "AsyncEventQueueStatistics: eventQueueSize for vm_" + RemoteTestModule.getMyVmid();
        Log.getLogWriter().info("Starting event queue monitor with key: " + key);
        SharedMap bb = HDFSBlackboard.getInstance().getSharedMap();
        Thread queueMonitor = new Thread(new Runnable((StatisticsFactory)statFactory, key, bb){
            final /* synthetic */ StatisticsFactory val$statFactory;
            final /* synthetic */ String val$key;
            final /* synthetic */ SharedMap val$bb;
            {
                this.val$statFactory = statisticsFactory;
                this.val$key = string;
                this.val$bb = sharedMap;
            }

            @Override
            public void run() {
                Statistics[] qStats = this.val$statFactory.findStatisticsByType(this.val$statFactory.findType("AsyncEventQueueStatistics"));
                while (serverAlive) {
                    SystemFailure.checkFailure();
                    long qSize = 0L;
                    for (int i = 0; i < qStats.length; ++i) {
                        qSize += (long)qStats[i].getInt(AsyncEventQueueStats.getEventQueueSizeId());
                    }
                    Log.getLogWriter().fine("Updating " + this.val$key + " with eventQueueSize = " + qSize);
                    this.val$bb.put(this.val$key, new Long(qSize));
                    if (!serverAlive) continue;
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }

            protected void finalize() throws Throwable {
                Object o = this.val$bb.remove(this.val$key);
                Log.getLogWriter().severe("Removing HDFSBlackboard key " + this.val$key + " with value (HDFS AEQ size) = " + o);
                super.finalize();
            }
        }, "HDFS AEQ Monitor");
        queueMonitor.setDaemon(true);
        queueMonitor.start();
    }

    public static synchronized void HydraTask_waitForQueuesToDrain() {
        SharedMap bb = HDFSBlackboard.getInstance().getSharedMap();
        long startTime = System.currentTimeMillis();
        HDFSStoreDescription hsd = HDFSStoreHelper.getHDFSStoreDescription(ConfigPrms.getHDFSStoreConfig());
        long maxWait = hsd.getBatchTimeInterval() + 180000;
        long entriesLeft = 0L;
        while (System.currentTimeMillis() - startTime < maxWait) {
            Set keySet = bb.getMap().keySet();
            Iterator kIt = keySet.iterator();
            boolean pass = true;
            while (kIt.hasNext()) {
                String k = (String)kIt.next();
                if (!k.startsWith("AsyncEventQueueStatistics: eventQueueSize")) continue;
                Long value = (Long)bb.get(k);
                Log.getLogWriter().info("Checking event queue key: " + k + " found eventQueueSize = " + value);
                if (value <= 0L) continue;
                pass = false;
                entriesLeft = value;
                Log.getLogWriter().warning("Still waiting for " + k + " to drain. Current value is " + value);
                break;
            }
            if (pass) {
                entriesLeft = 0L;
                break;
            }
            try {
                Thread.sleep(1000L);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (entriesLeft > 0L) {
            throw new TestException("TIMED OUT waiting for HDFS AEQ to drain.  eventQueueSize =  " + entriesLeft);
        }
        Log.getLogWriter().info("QUEUES ARE DRAINED");
    }
}

