/*
 * Decompiled with CFR 0.152.
 */
package wan.ml;

import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Region;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.util.Properties;
import java.util.zip.GZIPInputStream;
import wan.ml.GemFireTradeLoader;
import wan.ml.Trade;

public class GemFireTradeBurstFeeder
extends GemFireTradeLoader {
    Region bridgeTradeCache = null;
    protected int burst_sleep_interval;
    protected int burst_time;

    public static void main(String[] args) {
        Properties p = new Properties();
        try {
            p.put("trade_start_id", args[0]);
            p.put("trades_file_start_line", args[1]);
            p.put("trades_file_end_line", args[2]);
            p.put("trades_per_second", args[3]);
            p.put("trade_file_location", args[4]);
            p.put("BURST_SLEEP_INTERVAL", args[5]);
            p.put("BURST_TIME", args[6]);
        }
        catch (Exception e) {
            System.out.println("Error Extracting Startup Arguments: " + e);
            e.printStackTrace();
            System.exit(0);
        }
        GemFireTradeBurstFeeder app = new GemFireTradeBurstFeeder();
        Cache c = CacheFactory.getAnyInstance();
        try {
            app.bridgeTradeCache = c.getRegion("TRADES");
            System.out.println("Retrieved Bridge Region TRADES");
            System.out.println("TRADES Region Attributes: " + app.bridgeTradeCache.getAttributes().toString());
        }
        catch (Exception e) {
            System.out.println("Could not create TRADES bridge Region: " + e);
            e.printStackTrace();
            System.exit(0);
        }
        app.init(p);
    }

    @Override
    public void initTestParams(Properties p) {
        super.initTestParams(p);
        try {
            this.burst_sleep_interval = Integer.parseInt(p.getProperty("BURST_SLEEP_INTERVAL")) * 1000;
            this.burst_time = Integer.parseInt(p.getProperty("BURST_TIME"));
            System.out.println("burst_sleep_interval: " + this.burst_sleep_interval + " ms");
            System.out.println("burst_time: " + this.burst_time + " sec");
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    protected void easyPumpTrades() {
        int trade_count;
        block13: {
            String xml = null;
            boolean isPureDelayAdjustedForLatency = false;
            trade_count = 0;
            start_time = System.currentTimeMillis();
            long endBatch = 0L;
            long startBatch = 0L;
            int burst = 0;
            int sentInBurst = 0;
            double sleepMs = 0.0;
            double accumulatedSleepMs = 0.0;
            int interval = 1000;
            try {
                LineNumberReader line_reader = new LineNumberReader(new InputStreamReader(new GZIPInputStream(new FileInputStream(file_name))));
                line_reader.setLineNumber(START_ROW);
                sleepMs = 0.0;
                block7: while (true) {
                    System.out.println("Sleeping for " + this.burst_sleep_interval + " ms");
                    Thread.sleep(this.burst_sleep_interval);
                    burst = 0;
                    while (true) {
                        block14: {
                            if (burst >= this.burst_time) continue block7;
                            System.out.println("Sending burst " + burst + 1);
                            long now = System.currentTimeMillis();
                            long end = now + (long)interval;
                            sentInBurst = 0;
                            long startBurst = System.currentTimeMillis();
                            while (now < end && sentInBurst < RATE_PER_SECOND) {
                                now = System.currentTimeMillis();
                                xml = line_reader.readLine();
                                if (null == xml) {
                                    logger.warning("The XML is null at line number " + line_reader.getLineNumber());
                                    long end_time = System.currentTimeMillis();
                                    System.out.println("Processed " + trade_count + " trades in " + (end_time - start_time) + " millies.");
                                    System.exit(0);
                                }
                                if ("<XMLROOT>".equals(xml) || line_reader.getLineNumber() < START_ROW) continue;
                                if ("</XMLROOT>".equals(xml) || line_reader.getLineNumber() > END_ROW) break block13;
                                xml = GemFireTradeBurstFeeder.assignTradeId(xml);
                                long writeStartNanoTime = System.currentTimeMillis();
                                long writeWallStartTime = System.currentTimeMillis();
                                this.vendorPublishingAPI(xml);
                                this.bracketLog(writeWallStartTime, System.currentTimeMillis() - writeStartNanoTime, xml);
                                ++trade_count;
                                accumulatedSleepMs += sleepMs;
                                if (accumulatedSleepMs > 50.0) {
                                    accumulatedSleepMs -= 50.0;
                                    try {
                                        Thread.sleep(50L);
                                    }
                                    catch (InterruptedException e) {
                                        break;
                                    }
                                }
                                ++sentInBurst;
                            }
                            long endBurst = System.currentTimeMillis();
                            System.out.println("Time to send a burst: " + (endBurst - startBurst));
                            System.out.println("Sent " + sentInBurst + " trades in burst " + burst);
                            if (now < end) {
                                sleepMs += (double)(end - now) / ((double)RATE_PER_SECOND * 1.0);
                                accumulatedSleepMs = 0.0;
                                try {
                                    System.out.println("Sleeping for: " + (end - now) + " ms");
                                    Thread.sleep(end - now);
                                    break block14;
                                }
                                catch (InterruptedException e) {
                                    continue block7;
                                }
                            }
                            sleepMs -= (double)(now - end) / ((double)RATE_PER_SECOND * 1.0);
                        }
                        ++burst;
                    }
                    break;
                }
            }
            catch (VirtualMachineError e) {
                SystemFailure.initiateFailure((Error)e);
                throw e;
            }
            catch (Throwable e) {
                e.printStackTrace();
            }
        }
        long end_time = System.currentTimeMillis();
        System.out.println("Processed " + trade_count + " trades in " + (end_time - start_time) + " millies.");
    }

    @Override
    protected void vendorPublishingAPI(String xml) throws Throwable {
        Trade t = new Trade(xml);
        t.setGFLatencyStart();
        this.bridgeTradeCache.put((Object)new Long(t.TradeId), (Object)t);
    }
}

