Hadoop 简明教程

Hadoop - Streaming

Hadoop 流式处理是 Hadoop 发行版中附带的一个实用工具。此实用工具允许你使用任何可执行文件或脚本作为映射器和/或规约器来创建和运行 Map/Reduce 作业。

Example Using Python

对于 Hadoop 流式处理,我们考虑单词计数问题。Hadoop 中的任何作业都必须具有两个阶段:映射器和规约器。我们已用 Python 脚本为映射器和规约器编写了代码,以便在 Hadoop 下运行它。你也可以用 Perl 和 Ruby 编写相同的代码。

Mapper Phase Code

!/usr/bin/python

import sys

# Input takes from standard input for myline in sys.stdin:
   # Remove whitespace either side
   myline = myline.strip()

   # Break the line into words
   words = myline.split()

   # Iterate the words list
   for myword in words:
      # Write the results to standard output
      print '%s\t%s' % (myword, 1)

确保此文件具有执行权限 (chmod +x /home/ expert/hadoop-1.2.1/mapper.py)。

Reducer Phase Code

#!/usr/bin/python

from operator import itemgetter
import sys

current_word = ""
current_count = 0
word = ""

# Input takes from standard input for myline in sys.stdin:
   # Remove whitespace either side
   myline = myline.strip()

   # Split the input we got from mapper.py word,
   count = myline.split('\t', 1)

   # Convert count variable to integer
   try:
      count = int(count)

   except ValueError:
      # Count was not a number, so silently ignore this line continue

   if current_word == word:
   current_count += count
   else:
      if current_word:
         # Write result to standard output print '%s\t%s' % (current_word, current_count)

      current_count = count
      current_word = word

# Do not forget to output the last word if needed!
if current_word == word:
   print '%s\t%s' % (current_word, current_count)

将映射器和规约器代码保存在 Hadoop 主目录中的 mapper.py 和 reducer.py 中。确保这些文件具有执行权限 (chmod +x mapper.py 和 chmod +x reducer.py)。由于 Python 对缩进很敏感,因此可以通过下面的链接下载相同的代码。

Execution of WordCount Program

$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.
2.1.jar \
   -input input_dirs \
   -output output_dir \
   -mapper <path/mapper.py \
   -reducer <path/reducer.py

使用“\”换行,以便于清晰阅读。

For Example,

./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py

How Streaming Works

在上述示例中,映射器和规约器都是从标准输入读取输入并向标准输出发送输出的 Python 脚本。此实用工具将创建一个 Map/Reduce 作业,将作业提交到适当的集群,并在作业完成之前监控作业进度。

当为映射器指定脚本时,每个映射器任务将在映射器初始化时作为单独的进程启动该脚本。随着映射器任务的运行,它会将其输入转换为行,并将这些行馈送给进程的标准输入 (STDIN)。同时,映射器从进程的标准输出 (STDOUT) 收集面向行的输出,并将每行转换成一个键值对,该键值对将作为映射器的输出被收集。默认情况下,一行开头到第一个制表符字符的前缀是键,该行的其余部分(不包括制表符字符)将是值。如果该行中没有制表符字符,则整行被视为键,而值为空。但是,可以根据需要进行自定义。

当为规约器指定脚本时,每个规约器任务都将作为单独的进程启动该脚本,然后初始化规约器。随着规约器任务的运行,它会将其输入键值对转换为行,并将这些行馈送给进程的标准输入 (STDIN)。同时,规约器从进程的标准输出 (STDOUT) 收集面向行的输出,将每行转换成一个键值对,该键值对将作为规约器的输出被收集。默认情况下,一行开头到第一个制表符字符的前缀是键,该行的其余部分(不包括制表符字符)是值。但是,可以根据具体要求进行自定义。

Important Commands

Parameters

Options

Description

-input directory/file-name

Required

Input location for mapper.

-output directory-name

Required

Output location for reducer.

-映射器可执行文件或脚本或 JavaClassName

Required

Mapper executable.

-规约器可执行文件或脚本或 JavaClassName

Required

Reducer executable.

-file file-name

Optional

使映射器、规约器或组合器可执行文件在计算节点上局部可用。

-inputformat JavaClassName

Optional

你提供的类应该返回 Text 类的键值对。如果未指定,则 TextInputFormat 将用作默认值。

-outputformat JavaClassName

Optional

你提供的类应采用文本类的键/值对。如果没有指定,则 TextOutputformat 将用作默认值。

-partitioner JavaClassName

Optional

确定一个键应发送到的减少方式的类。

-combiner streamingCommand or JavaClassName

Optional

用于地图输出的合并程序。

-cmdenv name=value

Optional

将环境变量传递到流式命令。

-inputreader

Optional

向后兼容:指定记录读取器类(而不是输入格式类)。

-verbose

Optional

Verbose output.

-lazyOutput

Optional

延迟创建输出。例如,如果输出格式基于 FileOutputFormat,则仅在首次调用 output.collect(或 Context.write)时才会创建输出文件。

-numReduceTasks

Optional

指定减速器数量。

-mapdebug

Optional

映射任务失败时要调用的脚本。

-reducedebug

Optional

减少任务失败时要调用的脚本。