Friday, November 15, 2013

Bulk load data in HBase table with HFileOutputFormat

             Do you have lots of data needs to be loaded in HBase table, that too in very short time? Here are some findings that will help you with this task. There are many ways you can load data in Hbase, some of them I have mentioned below…

1.       With pig HBaseStorage() function :

This is the easiest way to do load data in Hbase, but its slow since at the backend it sends multiple PUT() request to HBase.

 STORE hbase_data INTO 'hbase_table' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('cf1:col1,cf1:col2');  

2.       Import TSV:

It’s a three step process,             

                    I.            Prepare you data to be loaded in the format of  Key\tCol1\tCol2.

                  II.            Covert this in a StoreFile with the help of ImportTSV command. (http://hbase.apache.org/book/ops_mgt.html#importtsv)

                III.            Complete BulkLoad StoreFile to HBase table. (http://hbase.apache.org/book/ops_mgt.html#completebulkload)

3.       MR  to load data in HBase table:

With help of HFileOutput format, you can write a map Reduce code to write data directly into HBase.

4.       MR to prepare StoreFile:

This is the fastest way to do bulk load, it includes two steps.

                    I.            Write a java MR job to prepare a store file (sample code given below)

                  II.            Complete BulkLoad StoreFile to HBase table. (http://hbase.apache.org/book/ops_mgt.html#completebulkload)

5.       Also you will find some more options here :

                                I.            http://blog.cloudera.com/blog/2013/09/how-to-use-hbase-bulk-loading-and-why/
                              II.            http://docs.hortonworks.com/HDPDocuments/HDP1/HDP-1.3.2/bk_user-guide/content/user-guide-hbase-import-2.html
                            III.            http://docs.hortonworks.com/HDPDocuments/HDP1/HDP-1.3.2/bk_user-guide/content/user-guide-hbase-import-1.html


Important Notes (Only If you are going with option 3 OR 4)
HFileOutput Format runs a reducer itself, even if you set numbers of reducers to zero, this reducer for sorting and merging data to be BulkLoaded in HBase . The number of reducers run is one per region, So make sure that you have multiples regions, otherwise all data will go to single reducer. Which will make the job run very slow. you can set number of regions while creating the HBase table. See the example below
hbase org.apache.hadoop.hbase.util.RegionSplitter -c 10 -f  <Column_family> <HBase_Table>


Source code for option 4 : 

 package com.sample.bulkload.hbase;  
 import java.io.IOException;  
 import java.util.HashMap;  
 import java.util.Iterator;  
 import java.util.Map;  
 import java.util.Map.Entry;  
 import org.apache.hadoop.conf.Configuration;  
 import org.apache.hadoop.fs.Path;  
 import org.apache.hadoop.hbase.HBaseConfiguration;  
 import org.apache.hadoop.hbase.client.HTable;  
 import org.apache.hadoop.hbase.client.Put;  
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;  
 import org.apache.hadoop.hbase.util.Bytes;  
 import org.apache.hadoop.io.LongWritable;  
 import org.apache.hadoop.io.Text;  
 import org.apache.hadoop.mapreduce.Job;  
 import org.apache.hadoop.mapreduce.Mapper;  
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
 public class HBaseBulkLoad {  
      public static class BulkLoadMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {       
           public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
                String line = value.toString();       
                String rowKey = .......  
                .  
                .  
                .  
                ImmutableBytesWritable HKey = new ImmutableBytesWritable(Bytes.toBytes(rowKey));  
                Put HPut = new Put(Bytes.toBytes(rowKey));  
                HPut.add(cf, col1, val1);  
                HPut.add(cf, col2, val2);  
                context.write(HKey,HPut);  
           }   
      }  
      public static void main(String[] args) throws Exception {  
           Configuration conf = HBaseConfiguration.create();  
           String inputPath = args[0];  
           String outputPath = args[1];  
           HTable hTable = new HTable(conf, args[3]);  
           Job job = new Job(conf,"HBase_Bulk_loader");        
           job.setMapOutputKeyClass(ImmutableBytesWritable.class);  
           job.setMapOutputValueClass(Put.class);  
           job.setSpeculativeExecution(false);  
           job.setReduceSpeculativeExecution(false);  
           job.setInputFormatClass(TextInputFormat.class);  
           job.setOutputFormatClass(HFileOutputFormat.class);  
           job.setJarByClass(HBaseBulkLoad.class);  
           job.setMapperClass(HBaseBulkLoad.BulkLoadMap.class);  
           FileInputFormat.setInputPaths(job, inputPath);  
           FileOutputFormat.setOutputPath(job,new Path(outputPath));             
           HFileOutputFormat.configureIncrementalLoad(job, hTable);  
           System.exit(job.waitForCompletion(true) ? 0 : 1);  
      }  
 }  

1 comment: