Now a day’s SequenceFile is extensively used in MapReduce as I/O formats. This is the fasted
way to store and extract the data in file system. That’s why even temporary outputs
of maps are stored in this format. A SequenceFile contain binary key/value pairs.
Find more details here.
Below is
the example which shows how to read a SequenceFile in a MapReduce job.
First you need to find what your key
and value format is. You can do it with the help of these two HDFS commands.
hadoop fs -ls path_of_sequence_file (check for header part only)
OR
hadoop fs -text path_of_sequence_file
Now in this example my key is NullWritable and value is MapWritable.
import java.io.IOException;
import java.text.ParseException;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class SequenceFileReader {
public static class BulkLoadMap extends Mapper<NullWritable , MapWritable, Text, Text> {
@Override
public void map(NullWritable key, MapWritable value, Context context) throws IOException, InterruptedException {
.
.
.
.
context.write(opKey,opValue);
}
}
/**
* Main Class
*/
public static void main(String[] args) throws Exception {
String inPutPath = args[1];
String outPutPath = args[2];
Configuration conf = new Configuration();
Job job = new Job(conf, "Read-SequenceFile");
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat.class);
job.setSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
job.setMapperClass(InSightBulkLoad.BulkLoadMap.class);
job.setJarByClass(InSightBulkLoad.class);
MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class, NullWritable.class, Text.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
FileInputFormat.setInputPaths(job,inPutPath);
FileOutputFormat.setOutputPath(job, new Path(outPutPath));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}