Saturday, December 14, 2013

Reading SequenceFile from HDFS in MapReduce

   Now a day’s SequenceFile is extensively used in MapReduce as I/O formats. This is the fasted way to store and extract the data in file system. That’s why even temporary outputs of maps are stored in this format. A SequenceFile contain binary key/value pairs. Find more details here.
Below is the example which shows how to read a SequenceFile in a MapReduce job.
First you need to find what your key and value format is. You can do it with the help of these two HDFS commands.
hadoop fs -ls path_of_sequence_file         (check for header part only)
OR
hadoop fs -text path_of_sequence_file
Now in this example my key is NullWritable and value is MapWritable.
 import java.io.IOException;  
 import java.text.ParseException;  
 import java.util.Map.Entry;  
 import org.apache.hadoop.conf.Configuration;  
 import org.apache.hadoop.fs.Path;  
 import org.apache.hadoop.hbase.client.HTable;  
 import org.apache.hadoop.hbase.client.Put;  
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;  
 import org.apache.hadoop.hbase.util.Bytes;  
 import org.apache.hadoop.io.MapWritable;  
 import org.apache.hadoop.io.NullWritable;  
 import org.apache.hadoop.io.Text;  
 import org.apache.hadoop.io.Writable;  
 import org.apache.hadoop.mapreduce.Job;  
 import org.apache.hadoop.mapreduce.Mapper;  
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;  
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
 import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;  
 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;  
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
 public class SequenceFileReader {  
      public static class BulkLoadMap extends Mapper<NullWritable , MapWritable, Text, Text> {       
           @Override  
           public void map(NullWritable key, MapWritable value, Context context) throws IOException, InterruptedException {  
             .  
                .  
                .  
                .  
                context.write(opKey,opValue);  
           }  
      }  
      /**  
       * Main Class  
       */  
      public static void main(String[] args) throws Exception {  
           String inPutPath = args[1];  
           String outPutPath = args[2];  
           Configuration conf = new Configuration();  
           Job job = new Job(conf, "Read-SequenceFile");  
           job.setMapOutputKeyClass(ImmutableBytesWritable.class);  
           job.setMapOutputValueClass(Put.class);  
           job.setOutputKeyClass(NullWritable.class);  
           job.setOutputValueClass(Text.class);  
           job.setInputFormatClass(SequenceFileInputFormat.class);  
           job.setOutputFormatClass(HFileOutputFormat.class);  
           job.setSpeculativeExecution(false);  
           job.setReduceSpeculativeExecution(false);  
           job.setMapperClass(InSightBulkLoad.BulkLoadMap.class);  
           job.setJarByClass(InSightBulkLoad.class);  
           MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class, NullWritable.class, Text.class);  
           LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);  
           FileInputFormat.setInputPaths(job,inPutPath);  
           FileOutputFormat.setOutputPath(job, new Path(outPutPath));  
           System.exit(job.waitForCompletion(true) ? 0 : 1);  
      }  
 }  

Friday, November 15, 2013

Bulk load data in HBase table with HFileOutputFormat

             Do you have lots of data needs to be loaded in HBase table, that too in very short time? Here are some findings that will help you with this task. There are many ways you can load data in Hbase, some of them I have mentioned below…

1.       With pig HBaseStorage() function :

This is the easiest way to do load data in Hbase, but its slow since at the backend it sends multiple PUT() request to HBase.

 STORE hbase_data INTO 'hbase_table' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('cf1:col1,cf1:col2');  

2.       Import TSV:

It’s a three step process,             

                    I.            Prepare you data to be loaded in the format of  Key\tCol1\tCol2.

                  II.            Covert this in a StoreFile with the help of ImportTSV command. (http://hbase.apache.org/book/ops_mgt.html#importtsv)

                III.            Complete BulkLoad StoreFile to HBase table. (http://hbase.apache.org/book/ops_mgt.html#completebulkload)

3.       MR  to load data in HBase table:

With help of HFileOutput format, you can write a map Reduce code to write data directly into HBase.

4.       MR to prepare StoreFile:

This is the fastest way to do bulk load, it includes two steps.

                    I.            Write a java MR job to prepare a store file (sample code given below)

                  II.            Complete BulkLoad StoreFile to HBase table. (http://hbase.apache.org/book/ops_mgt.html#completebulkload)

5.       Also you will find some more options here :

                                I.            http://blog.cloudera.com/blog/2013/09/how-to-use-hbase-bulk-loading-and-why/
                              II.            http://docs.hortonworks.com/HDPDocuments/HDP1/HDP-1.3.2/bk_user-guide/content/user-guide-hbase-import-2.html
                            III.            http://docs.hortonworks.com/HDPDocuments/HDP1/HDP-1.3.2/bk_user-guide/content/user-guide-hbase-import-1.html


Important Notes (Only If you are going with option 3 OR 4)
HFileOutput Format runs a reducer itself, even if you set numbers of reducers to zero, this reducer for sorting and merging data to be BulkLoaded in HBase . The number of reducers run is one per region, So make sure that you have multiples regions, otherwise all data will go to single reducer. Which will make the job run very slow. you can set number of regions while creating the HBase table. See the example below
hbase org.apache.hadoop.hbase.util.RegionSplitter -c 10 -f  <Column_family> <HBase_Table>


Source code for option 4 : 

 package com.sample.bulkload.hbase;  
 import java.io.IOException;  
 import java.util.HashMap;  
 import java.util.Iterator;  
 import java.util.Map;  
 import java.util.Map.Entry;  
 import org.apache.hadoop.conf.Configuration;  
 import org.apache.hadoop.fs.Path;  
 import org.apache.hadoop.hbase.HBaseConfiguration;  
 import org.apache.hadoop.hbase.client.HTable;  
 import org.apache.hadoop.hbase.client.Put;  
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;  
 import org.apache.hadoop.hbase.util.Bytes;  
 import org.apache.hadoop.io.LongWritable;  
 import org.apache.hadoop.io.Text;  
 import org.apache.hadoop.mapreduce.Job;  
 import org.apache.hadoop.mapreduce.Mapper;  
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
 public class HBaseBulkLoad {  
      public static class BulkLoadMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {       
           public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
                String line = value.toString();       
                String rowKey = .......  
                .  
                .  
                .  
                ImmutableBytesWritable HKey = new ImmutableBytesWritable(Bytes.toBytes(rowKey));  
                Put HPut = new Put(Bytes.toBytes(rowKey));  
                HPut.add(cf, col1, val1);  
                HPut.add(cf, col2, val2);  
                context.write(HKey,HPut);  
           }   
      }  
      public static void main(String[] args) throws Exception {  
           Configuration conf = HBaseConfiguration.create();  
           String inputPath = args[0];  
           String outputPath = args[1];  
           HTable hTable = new HTable(conf, args[3]);  
           Job job = new Job(conf,"HBase_Bulk_loader");        
           job.setMapOutputKeyClass(ImmutableBytesWritable.class);  
           job.setMapOutputValueClass(Put.class);  
           job.setSpeculativeExecution(false);  
           job.setReduceSpeculativeExecution(false);  
           job.setInputFormatClass(TextInputFormat.class);  
           job.setOutputFormatClass(HFileOutputFormat.class);  
           job.setJarByClass(HBaseBulkLoad.class);  
           job.setMapperClass(HBaseBulkLoad.BulkLoadMap.class);  
           FileInputFormat.setInputPaths(job, inputPath);  
           FileOutputFormat.setOutputPath(job,new Path(outputPath));             
           HFileOutputFormat.configureIncrementalLoad(job, hTable);  
           System.exit(job.waitForCompletion(true) ? 0 : 1);  
      }  
 }  

Thursday, October 3, 2013

How to delete _$folder$ file from AWS S3 directories ?

The _$folder$ file gets created win S3 directory structures because because of use of tools to interact with S3 file system (like S3fox).
The files are visible only with AWS S3 console or with s3cmd from CLI.
The files causes no harm to the file system, but if you want to delete it you can chose any of the way.
1. Delete it from AWS S3 console.
2. From CLI with S3cmd
 s3cmd del s3://<s3_bucket_name>/<dir_name>/_\$folder\$

I have deleted the files present recursively from S3 directories with following script :

 dir_list=`hadoop fs -ls s3://<s3_bucket_name>/<dir_name>/*/| cut -d' ' -f17 `  
  for dir in $dir_list  
  do  
     file_list=`s3cmd ls s3://<s3_bucket_name>${dir}/* | grep folder | cut -d' ' -f14`  
     for file in $file_list  
        do  
            s3cmd del `echo ${file} |sed -n 1'p' | tr '\$' '\\\$'`  
        done  
  done  

Tuesday, October 1, 2013

How to create AWS job flow for HBase from CLI?

1. Download the Amazon Elastic MapReduce CLI from the location below
wget http://elasticmapreduce.s3.amazonaws.com/elastic-mapreduce-ruby.zip
2. Unzip it
unzip elastic-mapreduce-ruby.zip
3. Create a shell script with following code (create_jf.sh)
 ruby elastic-mapreduce \  
 -v \  
 —create \  
 —alive \  
 —region “us-east-1” \  
 —access-id <your_access_id> \  
 —private-key <your_private_kay> \  
 —key-pair <your_key_pair> \  
 —ami-version latest \  
 —visible-to-all-users \  
 —hbase \  
 —name “HBASE from CLI” \  
 —instance-group MASTER \  
 —instance-count 1 \  
 —instance-type m1.large \  
 —instance-group CORE \  
 —instance-count 1 \  
 —instance-type m1.large \  
 —pig-interactive \  
 —pig-versions latest \  
 —hive-interactive \  
 —hive-versions latest \  
 —bootstrap-action “s3://elasticmapreduce/bootstrap-actions/configure-hadoop” \  
 —args “-m,mapred.tasktracker.map.tasks.maximum=6,-m,mapred.tasktracker.reduce.tasks.maximum=2”  
4. Create job flow
bash create_jf.sh

5. Monitor job flow from AWS console
Copy the job flow Id you get after successful run of create_jf.sh and search it with AWS EMR console.