Map Reduce 简明教程

MapReduce - Combiners

一个合并器,也称为 semi-reducer, 是一个可选的类,它通过接受 Map 类的输入,然后将输出键值对传递给 Reducer 类的操作来工作。

A Combiner, also known as a semi-reducer, is an optional class that operates by accepting the inputs from the Map class and thereafter passing the output key-value pairs to the Reducer class.

合并器的主要功能是使用相同的键总结映射输出记录。合并器的输出(键值集合)将作为输入通过网络发送到实际 Reducer 任务。

The main function of a Combiner is to summarize the map output records with the same key. The output (key-value collection) of the combiner will be sent over the network to the actual Reducer task as input.

Combiner

合并器类在 Map 类和 Reduce 类之间使用,减少 Map 和 Reduce 之间的数据传输量。通常,映射任务的输出很大,而传输到减少任务的数据也很大。

The Combiner class is used in between the Map class and the Reduce class to reduce the volume of data transfer between Map and Reduce. Usually, the output of the map task is large and the data transferred to the reduce task is high.

以下 MapReduce 任务图显示了 COMBINER PHASE。

The following MapReduce task diagram shows the COMBINER PHASE.

combiner

How Combiner Works?

以下是 关于 MapReduce 合并器工作原理的简要摘要 −

Here is a brief summary on how MapReduce Combiner works −

  1. A combiner does not have a predefined interface and it must implement the Reducer interface’s reduce() method.

  2. A combiner operates on each map output key. It must have the same output key-value types as the Reducer class.

  3. A combiner can produce summary information from a large dataset because it replaces the original Map output.

虽然合并器是可选的,但它有助于将数据分为多个组以进行减少阶段的处理,从而简化了处理过程。

Although, Combiner is optional yet it helps segregating data into multiple groups for Reduce phase, which makes it easier to process.

MapReduce Combiner Implementation

以下示例提供了有关合并器的理论思想。我们假设我们有以下名为 input.txt 的输入文本文件供 MapReduce 使用。

The following example provides a theoretical idea about combiners. Let us assume we have the following input text file named input.txt for MapReduce.

What do you mean by Object
What do you know about Java
What is Java Virtual Machine
How Java enabled High Performance

下面讨论了带有合并器的 MapReduce 程序的重要阶段。

The important phases of the MapReduce program with Combiner are discussed below.

Record Reader

这是 MapReduce 的第一阶段,其中记录读取器将从输入文本文件中读取每一行文本,并生成键值对作为输出。

This is the first phase of MapReduce where the Record Reader reads every line from the input text file as text and yields output as key-value pairs.

Input − 输入文件中的逐行文本。

Input − Line by line text from the input file.

Output − 形成键值对。以下是一组预期的键值对。

Output − Forms the key-value pairs. The following is the set of expected key-value pairs.

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

Map Phase

映射阶段从记录读取器获取输入,对其进行处理,并将输出生成为另一组键值对。

The Map phase takes input from the Record Reader, processes it, and produces the output as another set of key-value pairs.

Input − 以下键值对是从记录读取器获取的输入。

Input − The following key-value pair is the input taken from the Record Reader.

<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>

映射阶段读取每个键值对,使用 StringTokenizer 从值中划分每个单词,将每个单词视为键,并将该单词的计数视为值。以下代码片段显示了 Mapper 类和映射函数。

The Map phase reads each key-value pair, divides each word from the value using StringTokenizer, treats each word as key and the count of that word as value. The following code snippet shows the Mapper class and the map function.

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
   private final static IntWritable one = new IntWritable(1);
   private Text word = new Text();

   public void map(Object key, Text value, Context context) throws IOException, InterruptedException
   {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens())
      {
         word.set(itr.nextToken());
         context.write(word, one);
      }
   }
}

Output − 预期的输出如下 −

Output − The expected output is as follows −

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

Combiner Phase

合并器阶段从映射阶段获取每个键值对,对其进行处理,并将输出生成 key-value collection 对。

The Combiner phase takes each key-value pair from the Map phase, processes it, and produces the output as key-value collection pairs.

Input − 以下键值对是从映射阶段获取的输入。

Input − The following key-value pair is the input taken from the Map phase.

<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>

合并器阶段读取每个键值对,将公共单词组合成键,将值组合成集合。通常情况下,合并器的代码和操作与还原器的代码和操作类似。以下是对映射器、合并器和还原器类的代码段声明。

The Combiner phase reads each key-value pair, combines the common words as key and values as collection. Usually, the code and operation for a Combiner is similar to that of a Reducer. Following is the code snippet for Mapper, Combiner and Reducer class declaration.

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);

Output − 预期的输出如下 −

Output − The expected output is as follows −

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

Reducer Phase

还原器阶段从合并器阶段接收每个键值集合对,处理它,并将输出作为键值对传递。请注意,合并器功能与还原器相同。

The Reducer phase takes each key-value collection pair from the Combiner phase, processes it, and passes the output as key-value pairs. Note that the Combiner functionality is same as the Reducer.

Input - 以下键值对是从合并器阶段获取的输入。

Input − The following key-value pair is the input taken from the Combiner phase.

<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

还原器阶段读取每个键值对。以下是合并器的代码片段。

The Reducer phase reads each key-value pair. Following is the code snippet for the Combiner.

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
   private IntWritable result = new IntWritable();

   public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
   {
      int sum = 0;
      for (IntWritable val : values)
      {
         sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
   }
}

Output - 还原器阶段的预期输出如下:

Output − The expected output from the Reducer phase is as follows −

<What,3> <do,2> <you,2> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,3>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>

Record Writer

这是 MapReduce 的最后一个阶段,其中记录编写器从还原器阶段编写每个键值对,并将输出作为文本发送。

This is the last phase of MapReduce where the Record Writer writes every key-value pair from the Reducer phase and sends the output as text.

Input - 还原器阶段的每个键值对以及输出格式。

Input − Each key-value pair from the Reducer phase along with the Output format.

Output - 它以文本格式提供给您键值对。以下为预期输出。

Output − It gives you the key-value pairs in text format. Following is the expected output.

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1

Example Program

以下代码块计算程序中的单词数。

The following code block counts the number of words in a program.

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
   public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
   {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();

      public void map(Object key, Text value, Context context) throws IOException, InterruptedException
      {
         StringTokenizer itr = new StringTokenizer(value.toString());
         while (itr.hasMoreTokens())
         {
            word.set(itr.nextToken());
            context.write(word, one);
         }
      }
   }

   public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
   {
      private IntWritable result = new IntWritable();
      public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
      {
         int sum = 0;
         for (IntWritable val : values)
         {
            sum += val.get();
         }
         result.set(sum);
         context.write(key, result);
      }
   }

   public static void main(String[] args) throws Exception
   {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "word count");

      job.setJarByClass(WordCount.class);
      job.setMapperClass(TokenizerMapper.class);
      job.setCombinerClass(IntSumReducer.class);
      job.setReducerClass(IntSumReducer.class);

      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);

      FileInputFormat.addInputPath(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
}

将上述程序另存为 WordCount.java 。下面给出了程序的编译和执行。

Save the above program as WordCount.java. The compilation and execution of the program is given below.

Compilation and Execution

我们假设我们位于 Hadoop 用户的主目录中(例如,/home/hadoop)。

Let us assume we are in the home directory of Hadoop user (for example, /home/hadoop).

按照以下步骤编译和执行上述程序。

Follow the steps given below to compile and execute the above program.

Step 1 - 使用以下命令创建目录来存储已编译的 Java 类。

Step 1 − Use the following command to create a directory to store the compiled java classes.

$ mkdir units

Step 2 - 下载用于编译和执行 MapReduce 程序的 Hadoop-core-1.2.1.jar。您可以从 mvnrepository.com 下载 jar。

Step 2 − Download Hadoop-core-1.2.1.jar, which is used to compile and execute the MapReduce program. You can download the jar from mvnrepository.com.

让我们假设下载的文件夹为 /home/hadoop/。

Let us assume the downloaded folder is /home/hadoop/.

Step 3 - 使用以下命令编译 WordCount.java 程序并为程序创建 jar。

Step 3 − Use the following commands to compile the WordCount.java program and to create a jar for the program.

$ javac -classpath hadoop-core-1.2.1.jar -d units WordCount.java
$ jar -cvf units.jar -C units/ .

Step 4 - 使用以下命令在 HDFS 中创建输入目录。

Step 4 − Use the following command to create an input directory in HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5 - 使用以下命令在 HDFS 的输入目录中复制名为 input.txt 的输入文件。

Step 5 − Use the following command to copy the input file named input.txt in the input directory of HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/input.txt input_dir

Step 6 - 使用以下命令验证输入目录中的文件。

Step 6 − Use the following command to verify the files in the input directory.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7 - 使用以下命令通过从输入目录获取输入文件来运行单词计数应用程序。

Step 7 − Use the following command to run the Word count application by taking input files from the input directory.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

等到该文件执行完毕。执行后,输出将包含许多输入拆分、映射任务和归并任务。

Wait for a while till the file gets executed. After execution, the output contains a number of input splits, Map tasks, and Reducer tasks.

Step 8 − 使用以下命令在输出文件夹中验证结果文件。

Step 8 − Use the following command to verify the resultant files in the output folder.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9 − 使用以下命令查看 Part-00000 文件中的输出。该文件由 HDFS 生成。

Step 9 − Use the following command to see the output in Part-00000 file. This file is generated by HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下是 MapReduce 程序生成的输出。

Following is the output generated by the MapReduce program.

What           3
do             2
you            2
mean           1
by             1
Object         1
know           1
about          1
Java           3
is             1
Virtual        1
Machine        1
How            1
enabled        1
High           1
Performance    1