Hadoop Streaming
By default, the MapReduce framework is written in the Java programming language. While it primarily supports programs written in Java, Hadoop also provides an API that allows writing MapReduce programs in languages other than Java.
Hadoop Streaming is a utility included with the Hadoop distribution that enables users to write MapReduce programs in various programming and scripting languages. Supported languages include Python, PHP, Ruby, Perl, and Bash, among others.
Working: -
- The Streaming utility will create a MapReduce job, submit it to the appropriate cluster, and monitor the progress until the job is completed.
- When a script is specified for the mappers, each mapper task launches the script as a separate process during its initialization. Each mapper task converts its input into lines and feeds the converted lines to the standard input (stdin) of the process.
- The mapper gathers the line-oriented outputs from the standard output (STDOUT) of the process and transforms each line into a key/value pair. This key/value pair serves as the output of the mapper. By default, the prefix of each line, up to the first tab character, is considered the key, while the remainder of the line is treated as the value.
- If a line does not contain a tab character, the entire line is treated as the key, and the value is set to null. The key-value pairs can be customized according to specific requirements. These mapper output key-value pairs are then passed to the reducer tasks.
- Each reducer task will launch the script as a separate process, after which the reducer is initialized. While the reducer task is running, it converts input key/value pairs into lines and sends those lines to the standard input (STDIN) of the process.
- Each reducer task processes its input key/value pairs into lines and converts them into tab-separated key-value pairs. The output of the Reducer represents the final output of the MapReduce job.
Important Commands -
Parameters | Required/Optional | Description |
---|---|---|
-input directory/file-name | Required | Input location for mapper. |
-output directory-name | Required | Output location for reducer. |
-mapper executable or script or JavaClassName | Required | Mapper executable. |
-reducer executable or script or JavaClassName | Required | Reducer executable. |
-file file-name | Optional | Makes the mapper, reducer or combiner executable available locally on the compute nodes. |
-inputformat JavaClassName | Optional | If specified, it should return key, value pairs of Text class. If not specified TextInputFormat is used as the default. |
-outputformat JavaClassName | Optional | If specified, it should take key, value pairs of Text class. If not specified TextOutputformat is used as the default. |
-partitioner JavaClassName | Optional | Class that specifies the reduce a key that is sent to. |
-combiner streaming Command or JavaClassName | Optional | Combiner executable for map output. |
-inputreader | Optional | Specifies a record reader class instead of an input format class. |
-verbose | Optional | Verbose output. |
-lazyOutput | Optional | Creates output lazily. |
-numReduceTasks | Optional | Specifies the number of reducers. |
-mapdebug | Optional | Script to call when map task fails. |
-reducedebug | Optional | Script to call when reduction makes the task failure |
-cmdenv name=value | Optional | Passes the environment variable to streaming commands. |
Example Using Python -
The following example describes the word count program in Python, including the mapper phase and reducer phase code -
Mapper Phase code: -
!/usr/bin/env python
import sys
#--- get all lines from stdin ---
for line in sys.stdin:
#--- remove leading and trailing whitespace---
line = line.strip()
#--- split the line into words ---
words = line.split()
#--- output tuples [word, 1] in tab-delimited format---
for word in words:
print '%s\t%s' % (word, "1")
Make sure the above program executing with the permission:
chmod +x /Hadoop_path/mapper.py
Reducer Phase code: -
#!/usr/bin/env python
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
continue
try:
word2count[word] = word2count[word]+count
except:
word2count[word] = count
# write the tuples to stdout
# Note: they are unsorted
for word in word2count.keys():
print '%s\t%s'% ( word, word2count[word] )
Make sure the above program executing with the permission:
chmod +x /Hadoop_path/reducer.py
Execution of WordCount Program: -
$hadoop_home_path/hadoop jar hadoop_streaming_path
/hadoop_streaming.jar \
-input input_dir \
-output output_dir \
-mapper <path/mapper.py> \
-reducer <path/reducer.py>
Where "\" is used for line continuation for clear readability.