Hadoop 简明教程
Hadoop - Streaming
Hadoop 流式处理是 Hadoop 发行版中附带的一个实用工具。此实用工具允许你使用任何可执行文件或脚本作为映射器和/或规约器来创建和运行 Map/Reduce 作业。
Hadoop streaming is a utility that comes with the Hadoop distribution. This utility allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer.
Example Using Python
对于 Hadoop 流式处理,我们考虑单词计数问题。Hadoop 中的任何作业都必须具有两个阶段:映射器和规约器。我们已用 Python 脚本为映射器和规约器编写了代码,以便在 Hadoop 下运行它。你也可以用 Perl 和 Ruby 编写相同的代码。
For Hadoop streaming, we are considering the word-count problem. Any job in Hadoop must have two phases: mapper and reducer. We have written codes for the mapper and the reducer in python script to run it under Hadoop. One can also write the same in Perl and Ruby.
Mapper Phase Code
!/usr/bin/python
import sys
# Input takes from standard input for myline in sys.stdin:
# Remove whitespace either side
myline = myline.strip()
# Break the line into words
words = myline.split()
# Iterate the words list
for myword in words:
# Write the results to standard output
print '%s\t%s' % (myword, 1)
确保此文件具有执行权限 (chmod +x /home/ expert/hadoop-1.2.1/mapper.py)。
Make sure this file has execution permission (chmod +x /home/ expert/hadoop-1.2.1/mapper.py).
Reducer Phase Code
#!/usr/bin/python
from operator import itemgetter
import sys
current_word = ""
current_count = 0
word = ""
# Input takes from standard input for myline in sys.stdin:
# Remove whitespace either side
myline = myline.strip()
# Split the input we got from mapper.py word,
count = myline.split('\t', 1)
# Convert count variable to integer
try:
count = int(count)
except ValueError:
# Count was not a number, so silently ignore this line continue
if current_word == word:
current_count += count
else:
if current_word:
# Write result to standard output print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# Do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
将映射器和规约器代码保存在 Hadoop 主目录中的 mapper.py 和 reducer.py 中。确保这些文件具有执行权限 (chmod +x mapper.py 和 chmod +x reducer.py)。由于 Python 对缩进很敏感,因此可以通过下面的链接下载相同的代码。
Save the mapper and reducer codes in mapper.py and reducer.py in Hadoop home directory. Make sure these files have execution permission (chmod +x mapper.py and chmod +x reducer.py). As python is indentation sensitive so the same code can be download from the below link.
Execution of WordCount Program
$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.
2.1.jar \
-input input_dirs \
-output output_dir \
-mapper <path/mapper.py \
-reducer <path/reducer.py
使用“\”换行,以便于清晰阅读。
Where "\" is used for line continuation for clear readability.
How Streaming Works
在上述示例中,映射器和规约器都是从标准输入读取输入并向标准输出发送输出的 Python 脚本。此实用工具将创建一个 Map/Reduce 作业,将作业提交到适当的集群,并在作业完成之前监控作业进度。
In the above example, both the mapper and the reducer are python scripts that read the input from standard input and emit the output to standard output. The utility will create a Map/Reduce job, submit the job to an appropriate cluster, and monitor the progress of the job until it completes.
当为映射器指定脚本时,每个映射器任务将在映射器初始化时作为单独的进程启动该脚本。随着映射器任务的运行,它会将其输入转换为行,并将这些行馈送给进程的标准输入 (STDIN)。同时,映射器从进程的标准输出 (STDOUT) 收集面向行的输出,并将每行转换成一个键值对,该键值对将作为映射器的输出被收集。默认情况下,一行开头到第一个制表符字符的前缀是键,该行的其余部分(不包括制表符字符)将是值。如果该行中没有制表符字符,则整行被视为键,而值为空。但是,可以根据需要进行自定义。
When a script is specified for mappers, each mapper task will launch the script as a separate process when the mapper is initialized. As the mapper task runs, it converts its inputs into lines and feed the lines to the standard input (STDIN) of the process. In the meantime, the mapper collects the line-oriented outputs from the standard output (STDOUT) of the process and converts each line into a key/value pair, which is collected as the output of the mapper. By default, the prefix of a line up to the first tab character is the key and the rest of the line (excluding the tab character) will be the value. If there is no tab character in the line, then the entire line is considered as the key and the value is null. However, this can be customized, as per one need.
当为规约器指定脚本时,每个规约器任务都将作为单独的进程启动该脚本,然后初始化规约器。随着规约器任务的运行,它会将其输入键值对转换为行,并将这些行馈送给进程的标准输入 (STDIN)。同时,规约器从进程的标准输出 (STDOUT) 收集面向行的输出,将每行转换成一个键值对,该键值对将作为规约器的输出被收集。默认情况下,一行开头到第一个制表符字符的前缀是键,该行的其余部分(不包括制表符字符)是值。但是,可以根据具体要求进行自定义。
When a script is specified for reducers, each reducer task will launch the script as a separate process, then the reducer is initialized. As the reducer task runs, it converts its input key/values pairs into lines and feeds the lines to the standard input (STDIN) of the process. In the meantime, the reducer collects the line-oriented outputs from the standard output (STDOUT) of the process, converts each line into a key/value pair, which is collected as the output of the reducer. By default, the prefix of a line up to the first tab character is the key and the rest of the line (excluding the tab character) is the value. However, this can be customized as per specific requirements.
Important Commands
Parameters |
Options |
Description |
-input directory/file-name |
Required |
Input location for mapper. |
-output directory-name |
Required |
Output location for reducer. |
-mapper executable or script or JavaClassName |
Required |
Mapper executable. |
-reducer executable or script or JavaClassName |
Required |
Reducer executable. |
-file file-name |
Optional |
Makes the mapper, reducer, or combiner executable available locally on the compute nodes. |
-inputformat JavaClassName |
Optional |
Class you supply should return key/value pairs of Text class. If not specified, TextInputFormat is used as the default. |
-outputformat JavaClassName |
Optional |
Class you supply should take key/value pairs of Text class. If not specified, TextOutputformat is used as the default. |
-partitioner JavaClassName |
Optional |
Class that determines which reduce a key is sent to. |
-combiner streamingCommand or JavaClassName |
Optional |
Combiner executable for map output. |
-cmdenv name=value |
Optional |
Passes the environment variable to streaming commands. |
-inputreader |
Optional |
For backwards-compatibility: specifies a record reader class (instead of an input format class). |
-verbose |
Optional |
Verbose output. |
-lazyOutput |
Optional |
Creates output lazily. For example, if the output format is based on FileOutputFormat, the output file is created only on the first call to output.collect (or Context.write). |
-numReduceTasks |
Optional |
Specifies the number of reducers. |
-mapdebug |
Optional |
Script to call when map task fails. |
-reducedebug |
Optional |
Script to call when reduce task fails. |