Summary -
In this topic, we described about the Partitioners with detailed example.
Partitioner allows distributing how outputs from the map stage are send to the reducers. Partitioner controls the keys partition of the intermediate map-outputs. The key or a subset of the key is used to derive the partition by a hash function.
The total number of partitions is almost same as the number of reduce tasks for the job. Partitioner runs on the same machine where the mapper had completed its execution by consuming the mapper output. Entire mapper output sent to partitioner.
Partitioner forms number of reduce task groups from the mapper output. By default, Hadoop framework is hash based partitioner. The Hash partitioner partitions the key space by using the hash code.
Default hash partitioner syntax -
public int getPartition(K key, V value,
int numReduceTasks) {
return(key.hashCode()
& Integer.MAX_VALUE) % numReduceTasks;
}
Hadoop allows making the partitioner as a custom partitioner. The Patitioner abstract class with a single method used for the Partitioner in Hadoop. Patitioner abstract class with a single method can be extended to write the custom partitioner.
public abstract class Partitioner<KEY, VALUE> {
public abstract int getPartition
(KEY key, VALUE value, int numPartitions){
return;
}
}
Parameters -
- key - the key to be partioned.
- value - the entry value.
- numPartitions - the total number of partitions.
Uses -
- Custom partitioner.
- Optimize the mapreduce programs and executed the given problem as well as possible.
- Can divide the output files how it required.
Example: - Word count example with custom partitioner
Input - aa bb cc dd ee aa ff bb cc dd ee ff
Save the input as input.txt and place it in the Hadoop library.
$vim input.txt
aa bb cc dd ee aa ff bb cc dd ee ff
$hadoop fs -mkdir -p /user/user-name/wc/input
$hadoop fs -put input.txt /user/user-name/wc/input/
Program:
package org.example.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordcountExample {
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("Usage: [input] [output]");
System.exit(-1);
}
Job jobex = jobex.getInstance(getConf());
jobex.setJobName("wordcount");
jobex.setJarByClass(WordcountDriver.class);
jobex.setOutputKeyClass(Text.class);
jobex.setOutputValueClass(IntWritable.class);
jobex.setMapperClass(WordcountExampleMapper.class);
jobex.setCombinerClass(WordcountExampleReducer.class);
jobex.setPartitionerClass(WordcountExamplepartitioner.class);
jobex.setNumReduceTasks(2);
jobex.setReducerClass(WordcountExampleReducer.class);
jobex.setInputFormatClass(TextInputFormat.class);
jobex.setOutputFormatClass(TextOutputFormat.class);
Path inputFilePath = new Path(args[0]);
Path outputFilePath = new Path(args[1]);
/* This line is to accept the input recursively */
FileInputFormat.setInputDirRecursive(job, true);
FileInputFormat.addInputPath(job, inputFilePath);
FileOutputFormat.setOutputPath(job, outputFilePath);
/* Delete output filepath if already exists */
FileSystem fs = FileSystem.newInstance(getConf());
if (fs.exists(outputFilePath)) {
fs.delete(outputFilePath, true);
}
return jobex.waitForCompletion(true) ? 0: 1;
}
}
public class WordcountExampleMapper extends Mapper<LongWritable,
Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
public class WordcountExamplepartitioner extends
Partitioner<Text, IntWritable> {
String partititonkey;
public int getPartition(Text key, IntWritable value,
int numPartitions) {
// TODO Auto-generated method stub
if(numPartitions == 2){
String partitionKey = key.toString();
if(partitionKey.charAt(0) > 'b' )
return 0;
else
return 1;
} else if(numPartitions == 1)
return 0;
else{
System.err.println("WordCountExampleParitioner
can only handle either 1 or 2 paritions");
return 0;
}
}
}
public class WordcountExampleReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable totalWordCount = new IntWritable();
public void reduce(final Text key, final
Iterable<IntWritable> values,
final Context context) throws IOException,
InterruptedException {
int totalcount = 0;
Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()) {
totalcount += iterator.next().get();
}
totalWordCount.set(totalcount);
context.write(key, totalWordCount);
}
}
Expected Result:
aa 2 bb 2 cc 2 dd 2 ee 2 ff 2