Hadoop Streaming

By default, the MapReduce framework is written in the Java programming language. While it primarily supports programs written in Java, Hadoop also provides an API that allows writing MapReduce programs in languages other than Java.

Hadoop Streaming is a utility included with the Hadoop distribution that enables users to write MapReduce programs in various programming and scripting languages. Supported languages include Python, PHP, Ruby, Perl, and Bash, among others.

Working: -

  • The Streaming utility will create a MapReduce job, submit it to the appropriate cluster, and monitor the progress until the job is completed.
  • When a script is specified for the mappers, each mapper task launches the script as a separate process during its initialization. Each mapper task converts its input into lines and feeds the converted lines to the standard input (stdin) of the process.
  • The mapper gathers the line-oriented outputs from the standard output (STDOUT) of the process and transforms each line into a key/value pair. This key/value pair serves as the output of the mapper. By default, the prefix of each line, up to the first tab character, is considered the key, while the remainder of the line is treated as the value.
  • If a line does not contain a tab character, the entire line is treated as the key, and the value is set to null. The key-value pairs can be customized according to specific requirements. These mapper output key-value pairs are then passed to the reducer tasks.
  • Each reducer task will launch the script as a separate process, after which the reducer is initialized. While the reducer task is running, it converts input key/value pairs into lines and sends those lines to the standard input (STDIN) of the process.
  • Each reducer task processes its input key/value pairs into lines and converts them into tab-separated key-value pairs. The output of the Reducer represents the final output of the MapReduce job.

Important Commands -

ParametersRequired/OptionalDescription
-input directory/file-nameRequiredInput location for mapper.
-output directory-nameRequiredOutput location for reducer.
-mapper executable or script or JavaClassNameRequiredMapper executable.
-reducer executable or script or JavaClassNameRequiredReducer executable.
-file file-nameOptionalMakes the mapper, reducer or combiner executable available locally on the compute nodes.
-inputformat JavaClassNameOptionalIf specified, it should return key, value pairs of Text class. If not specified TextInputFormat is used as the default.
-outputformat JavaClassNameOptionalIf specified, it should take key, value pairs of Text class. If not specified TextOutputformat is used as the default.
-partitioner JavaClassNameOptionalClass that specifies the reduce a key that is sent to.
-combiner streaming Command or JavaClassNameOptionalCombiner executable for map output.
-inputreaderOptionalSpecifies a record reader class instead of an input format class.
-verboseOptionalVerbose output.
-lazyOutputOptionalCreates output lazily.
-numReduceTasksOptionalSpecifies the number of reducers.
-mapdebugOptionalScript to call when map task fails.
-reducedebugOptionalScript to call when reduction makes the task failure
-cmdenv name=valueOptionalPasses the environment variable to streaming commands.

Example Using Python -

The following example describes the word count program in Python, including the mapper phase and reducer phase code -

Mapper Phase code: -

!/usr/bin/env python
import sys
 
#--- get all lines from stdin ---
for line in sys.stdin:
    #--- remove leading and trailing whitespace---
    line = line.strip()

    #--- split the line into words ---
    words = line.split()

    #--- output tuples [word, 1] in tab-delimited format---
    for word in words: 
        print '%s\t%s' % (word, "1")

Make sure the above program executing with the permission:

chmod +x /Hadoop_path/mapper.py

Reducer Phase code: -

#!/usr/bin/env python
import sys
# maps words to their counts
word2count = {} 
# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
 
    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        continue

    try:
        word2count[word] = word2count[word]+count
    except:
        word2count[word] = count
 
# write the tuples to stdout
# Note: they are unsorted
for word in word2count.keys():
    print '%s\t%s'% ( word, word2count[word] )

Make sure the above program executing with the permission:

chmod +x /Hadoop_path/reducer.py

Execution of WordCount Program: -

$hadoop_home_path/hadoop jar  hadoop_streaming_path
/hadoop_streaming.jar \
   -input input_dir \ 
   -output output_dir \ 
   -mapper <path/mapper.py> \ 
   -reducer <path/reducer.py>

Where "\" is used for line continuation for clear readability.