Saturday, December 14, 2013

Reading SequenceFile from HDFS in MapReduce

   Now a day’s SequenceFile is extensively used in MapReduce as I/O formats. This is the fasted way to store and extract the data in file system. That’s why even temporary outputs of maps are stored in this format. A SequenceFile contain binary key/value pairs. Find more details here.
Below is the example which shows how to read a SequenceFile in a MapReduce job.
First you need to find what your key and value format is. You can do it with the help of these two HDFS commands.
hadoop fs -ls path_of_sequence_file         (check for header part only)
OR
hadoop fs -text path_of_sequence_file
Now in this example my key is NullWritable and value is MapWritable.
 import java.io.IOException;  
 import java.text.ParseException;  
 import java.util.Map.Entry;  
 import org.apache.hadoop.conf.Configuration;  
 import org.apache.hadoop.fs.Path;  
 import org.apache.hadoop.hbase.client.HTable;  
 import org.apache.hadoop.hbase.client.Put;  
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;  
 import org.apache.hadoop.hbase.util.Bytes;  
 import org.apache.hadoop.io.MapWritable;  
 import org.apache.hadoop.io.NullWritable;  
 import org.apache.hadoop.io.Text;  
 import org.apache.hadoop.io.Writable;  
 import org.apache.hadoop.mapreduce.Job;  
 import org.apache.hadoop.mapreduce.Mapper;  
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;  
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
 import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;  
 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;  
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
 public class SequenceFileReader {  
      public static class BulkLoadMap extends Mapper<NullWritable , MapWritable, Text, Text> {       
           @Override  
           public void map(NullWritable key, MapWritable value, Context context) throws IOException, InterruptedException {  
             .  
                .  
                .  
                .  
                context.write(opKey,opValue);  
           }  
      }  
      /**  
       * Main Class  
       */  
      public static void main(String[] args) throws Exception {  
           String inPutPath = args[1];  
           String outPutPath = args[2];  
           Configuration conf = new Configuration();  
           Job job = new Job(conf, "Read-SequenceFile");  
           job.setMapOutputKeyClass(ImmutableBytesWritable.class);  
           job.setMapOutputValueClass(Put.class);  
           job.setOutputKeyClass(NullWritable.class);  
           job.setOutputValueClass(Text.class);  
           job.setInputFormatClass(SequenceFileInputFormat.class);  
           job.setOutputFormatClass(HFileOutputFormat.class);  
           job.setSpeculativeExecution(false);  
           job.setReduceSpeculativeExecution(false);  
           job.setMapperClass(InSightBulkLoad.BulkLoadMap.class);  
           job.setJarByClass(InSightBulkLoad.class);  
           MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class, NullWritable.class, Text.class);  
           LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);  
           FileInputFormat.setInputPaths(job,inPutPath);  
           FileOutputFormat.setOutputPath(job, new Path(outPutPath));  
           System.exit(job.waitForCompletion(true) ? 0 : 1);  
      }  
 }