/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.addons.hbase.example;

import eu.stratosphere.addons.hbase.TableInputFormat;
import eu.stratosphere.addons.hbase.common.HBaseKey;
import eu.stratosphere.addons.hbase.common.HBaseResult;
import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.Program;
import eu.stratosphere.api.common.ProgramDescription;
import eu.stratosphere.api.common.io.FileOutputFormat;
import eu.stratosphere.api.common.io.InputFormat;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.common.operators.base.GenericDataSinkBase;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.api.java.record.operators.FileDataSink;
import eu.stratosphere.api.java.record.operators.GenericDataSource;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;

public class HBaseReadExample
implements Program,
ProgramDescription {
    public Plan getPlan(String ... args) {
        int numSubTasks = args.length > 0 ? Integer.parseInt(args[0]) : 1;
        String output = args.length > 1 ? args[1] : "";
        GenericDataSource source = new GenericDataSource((InputFormat)new MyTableInputFormat(), "HBase Input");
        source.setParameter("hbase.inputtable", "twitter");
        source.setParameter("hbase.config.location", "/etc/hbase/conf/hbase-site.xml");
        FileDataSink out = new FileDataSink((FileOutputFormat)new CsvOutputFormat(), output, (Operator)source, "HBase String dump");
        ((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)CsvOutputFormat.configureRecordFormat((FileDataSink)out).recordDelimiter('\n')).fieldDelimiter(' ')).field(StringValue.class, 0)).field(StringValue.class, 1)).field(StringValue.class, 2)).field(StringValue.class, 3);
        Plan plan = new Plan((GenericDataSinkBase)out, "HBase access Example");
        plan.setDefaultParallelism(numSubTasks);
        return plan;
    }

    public String getDescription() {
        return "Parameters: [numSubStasks] [input] [output]";
    }

    public static class MyTableInputFormat
    extends TableInputFormat {
        private static final long serialVersionUID = 1L;
        private final byte[] META_FAMILY = "meta".getBytes();
        private final byte[] USER_COLUMN = "user".getBytes();
        private final byte[] TIMESTAMP_COLUMN = "timestamp".getBytes();
        private final byte[] TEXT_FAMILY = "text".getBytes();
        private final byte[] TWEET_COLUMN = "tweet".getBytes();
        StringValue row_string = new StringValue();
        StringValue user_string = new StringValue();
        StringValue timestamp_string = new StringValue();
        StringValue tweet_string = new StringValue();

        @Override
        protected HTable createTable(Configuration parameters) {
            return super.createTable(parameters);
        }

        @Override
        protected Scan createScanner(Configuration parameters) {
            Scan scan = new Scan();
            scan.addColumn(this.META_FAMILY, this.USER_COLUMN);
            scan.addColumn(this.META_FAMILY, this.TIMESTAMP_COLUMN);
            scan.addColumn(this.TEXT_FAMILY, this.TWEET_COLUMN);
            return scan;
        }

        @Override
        public void mapResultToRecord(Record record, HBaseKey key, HBaseResult result) {
            Result res = result.getResult();
            res.getRow();
            record.setField(0, (Value)this.toString(this.row_string, res.getRow()));
            record.setField(1, (Value)this.toString(this.user_string, res.getValue(this.META_FAMILY, this.USER_COLUMN)));
            record.setField(2, (Value)this.toString(this.timestamp_string, res.getValue(this.META_FAMILY, this.TIMESTAMP_COLUMN)));
            record.setField(3, (Value)this.toString(this.tweet_string, res.getValue(this.TEXT_FAMILY, this.TWEET_COLUMN)));
        }

        private final StringValue toString(StringValue string, byte[] bytes) {
            string.setValueAscii(bytes, 0, bytes.length);
            return string;
        }
    }
}

