Hadoop 简明教程
Hadoop - MapReduce
MapReduce是一个框架,我们可以使用该框架编写应用程序来并行处理海量数据,在大规模商品硬件集群上,以可靠的方式处理数据。
MapReduce is a framework using which we can write applications to process huge amounts of data, in parallel, on large clusters of commodity hardware in a reliable manner.
What is MapReduce?
MapReduce是一种基于java的分布式计算的处理技术和程序模型。MapReduce算法包含两个重要任务,即Map和Reduce。Map获取一组数据并将它转换为另一组数据,其中各个元素被分解成元组(键/值对)。其次是reduce任务,它将map的输出作为输入,并将这些数据元组组合成更小的一组元组。正如MapReduce名称的顺序所暗示的那样,reduce任务总是在map作业之后执行。
MapReduce is a processing technique and a program model for distributed computing based on java. The MapReduce algorithm contains two important tasks, namely Map and Reduce. Map takes a set of data and converts it into another set of data, where individual elements are broken down into tuples (key/value pairs). Secondly, reduce task, which takes the output from a map as an input and combines those data tuples into a smaller set of tuples. As the sequence of the name MapReduce implies, the reduce task is always performed after the map job.
MapReduce的主要优势在于,可以在多个计算节点上轻松扩展数据处理。在MapReduce模型中,数据处理原语称为映射器和还原器。将数据处理应用程序分解成映射器和还原器有时并非易事。但是,一旦我们在MapReduce形式中编写了一个应用程序,将应用程序扩展到在一个集群中成百上千甚至数万台机器上运行仅仅是一次配置更改。这种简单的可扩展性吸引了许多程序员使用MapReduce模型。
The major advantage of MapReduce is that it is easy to scale data processing over multiple computing nodes. Under the MapReduce model, the data processing primitives are called mappers and reducers. Decomposing a data processing application into mappers and reducers is sometimes nontrivial. But, once we write an application in the MapReduce form, scaling the application to run over hundreds, thousands, or even tens of thousands of machines in a cluster is merely a configuration change. This simple scalability is what has attracted many programmers to use the MapReduce model.
The Algorithm
-
Generally MapReduce paradigm is based on sending the computer to where the data resides!
-
MapReduce program executes in three stages, namely map stage, shuffle stage, and reduce stage. Map stage − The map or mapper’s job is to process the input data. Generally the input data is in the form of file or directory and is stored in the Hadoop file system (HDFS). The input file is passed to the mapper function line by line. The mapper processes the data and creates several small chunks of data. Reduce stage − This stage is the combination of the * Shuffle * stage and the Reduce stage. The Reducer’s job is to process the data that comes from the mapper. After processing, it produces a new set of output, which will be stored in the HDFS.
-
During a MapReduce job, Hadoop sends the Map and Reduce tasks to the appropriate servers in the cluster.
-
The framework manages all the details of data-passing such as issuing tasks, verifying task completion, and copying data around the cluster between the nodes.
-
Most of the computing takes place on nodes with data on local disks that reduces the network traffic.
-
After completion of the given tasks, the cluster collects and reduces the data to form an appropriate result, and sends it back to the Hadoop server.

Inputs and Outputs (Java Perspective)
MapReduce 框架使用 <key, value> 对工作,即,该框架将作业的输入视为一组 <key, value> 对,并且将一组 <key, value> 对作为作业的输出,可能是不同类型的。
The MapReduce framework operates on <key, value> pairs, that is, the framework views the input to the job as a set of <key, value> pairs and produces a set of <key, value> pairs as the output of the job, conceivably of different types.
密钥和值类应该是通过该框架串行化的,因此,需要实现 Writable 接口。此外,关键类必须实现 Writable-Comparable 接口以方便框架进行排序。 MapReduce job 的输入和输出类型——(输入) <k1, v1> → 映射 → <k2, v2> → 缩减 → <k3, v3>(输出)。
The key and the value classes should be in serialized manner by the framework and hence, need to implement the Writable interface. Additionally, the key classes have to implement the Writable-Comparable interface to facilitate sorting by the framework. Input and Output types of a MapReduce job − (Input) <k1, v1> → map → <k2, v2> → reduce → <k3, v3>(Output).
Input |
Output |
|
Map |
<k1, v1> |
list (<k2, v2>) |
Reduce |
<k2, list(v2)> |
list (<k3, v3>) |
Terminology
-
PayLoad − Applications implement the Map and the Reduce functions, and form the core of the job.
-
Mapper − Mapper maps the input key/value pairs to a set of intermediate key/value pair.
-
NamedNode − Node that manages the Hadoop Distributed File System (HDFS).
-
DataNode − Node where data is presented in advance before any processing takes place.
-
MasterNode − Node where JobTracker runs and which accepts job requests from clients.
-
SlaveNode − Node where Map and Reduce program runs.
-
JobTracker − Schedules jobs and tracks the assign jobs to Task tracker.
-
Task Tracker − Tracks the task and reports status to JobTracker.
-
Job − A program is an execution of a Mapper and Reducer across a dataset.
-
Task − An execution of a Mapper or a Reducer on a slice of data.
-
Task Attempt − A particular instance of an attempt to execute a task on a SlaveNode.
Example Scenario
下面提供了一个组织的用电量数据。它包含月用电量和各个年份的年平均用电量。
Given below is the data regarding the electrical consumption of an organization. It contains the monthly electrical consumption and the annual average for various years.
Jan |
Feb |
Mar |
Apr |
May |
Jun |
Jul |
Aug |
Sep |
Oct |
Nov |
Dec |
Avg |
|
1979 |
23 |
23 |
2 |
43 |
24 |
25 |
26 |
26 |
26 |
26 |
25 |
26 |
25 |
1980 |
26 |
27 |
28 |
28 |
28 |
30 |
31 |
31 |
31 |
30 |
30 |
30 |
29 |
1981 |
31 |
32 |
32 |
32 |
33 |
34 |
35 |
36 |
36 |
34 |
34 |
34 |
34 |
1984 |
39 |
38 |
39 |
39 |
39 |
41 |
42 |
43 |
40 |
39 |
38 |
38 |
40 |
1985 |
38 |
39 |
39 |
39 |
39 |
41 |
41 |
41 |
00 |
40 |
39 |
39 |
45 |
如果输入了上述数据,我们必须编写应用程序来处理它并产生结果,例如找到使用量最大的年份、使用量最小的年份,等等。这对具有有限数量记录的程序员来说是小菜一碟。他们只需编写逻辑以生成所需输出,并将数据传递给编写的应用程序。
If the above data is given as input, we have to write applications to process it and produce results such as finding the year of maximum usage, year of minimum usage, and so on. This is a walkover for the programmers with finite number of records. They will simply write the logic to produce the required output, and pass the data to the application written.
但请想到自成立以来某一特定州的所有大规模行业的用电量所代表的数据。
But, think of the data representing the electrical consumption of all the largescale industries of a particular state, since its formation.
当我们编写应用程序来处理此类数据时,
When we write applications to process such bulk data,
-
They will take a lot of time to execute.
-
There will be a heavy network traffic when we move data from source to network server and so on.
为了解决这些问题,我们有MapReduce框架。
To solve these problems, we have the MapReduce framework.
Input Data
上述数据保存为*sample.txt*并作为输入提供。输入文件如下图所示。
The above data is saved as *sample.txt*and given as input. The input file looks as shown below.
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45
Example Program
下面提供了使用MapReduce框架处理示例数据的程序。
Given below is the program to the sample data using MapReduce framework.
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits {
//Mapper class
public static class E_EMapper extends MapReduceBase implements
Mapper<LongWritable ,/*Input key Type */
Text, /*Input value Type*/
Text, /*Output key Type*/
IntWritable> /*Output value Type*/
{
//Map function
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens()) {
lasttoken = s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {
//Reduce function
public void reduce( Text key, Iterator <IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int maxavg = 30;
int val = Integer.MIN_VALUE;
while (values.hasNext()) {
if((val = values.next().get())>maxavg) {
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception {
JobConf conf = new JobConf(ProcessUnits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
将上述程序保存为 ProcessUnits.java. 程序的编译和执行在下面进行了说明。
Save the above program as ProcessUnits.java. The compilation and execution of the program is explained below.
Compilation and Execution of Process Units Program
让我们假设我们处于Hadoop用户的home目录下(如/home/hadoop)。
Let us assume we are in the home directory of a Hadoop user (e.g. /home/hadoop).
按照以下步骤编译和执行上述程序。
Follow the steps given below to compile and execute the above program.
Step 1
以下命令用于创建存储已编译Java类的目录。
The following command is to create a directory to store the compiled java classes.
$ mkdir units
Step 2
下载用于编译和执行MapReduce程序的 Hadoop-core-1.2.1.jar, 。访问以下链接 mvnrepository.com 下载jar包。我们假设下载的文件夹是 /home/hadoop/.
Download Hadoop-core-1.2.1.jar, which is used to compile and execute the MapReduce program. Visit the following link mvnrepository.com to download the jar. Let us assume the downloaded folder is /home/hadoop/.
Step 3
以下命令用于编译 ProcessUnits.java 程序并创建该程序的jar包。
The following commands are used for compiling the ProcessUnits.java program and creating a jar for the program.
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .
Step 4
以下命令用于在HDFS中创建输入目录。
The following command is used to create an input directory in HDFS.
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
Step 5
以下命令用于将名为*sample.txt*的输入文件复制到HDFS的输入目录中。
The following command is used to copy the input file named *sample.txt*in the input directory of HDFS.
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
Step 6
以下命令用于验证输入目录中的文件。
The following command is used to verify the files in the input directory.
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
Step 7
以下命令用于通过从输入目录中获取输入文件来运行Eleunit_max应用程序。
The following command is used to run the Eleunit_max application by taking the input files from the input directory.
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
等待一段时间,直到文件执行完成。在执行之后,结果将包含输入拆分的数量、Map任务的数量、reducer任务的数量等,如下所示。
Wait for a while until the file is executed. After execution, as shown below, the output will contain the number of input splits, the number of Map tasks, the number of reducer tasks, etc.
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read = 61
FILE: Number of bytes written = 279400
FILE: Number of read operations = 0
FILE: Number of large read operations = 0
FILE: Number of write operations = 0
HDFS: Number of bytes read = 546
HDFS: Number of bytes written = 40
HDFS: Number of read operations = 9
HDFS: Number of large read operations = 0
HDFS: Number of write operations = 2 Job Counters
Launched map tasks = 2
Launched reduce tasks = 1
Data-local map tasks = 2
Total time spent by all maps in occupied slots (ms) = 146137
Total time spent by all reduces in occupied slots (ms) = 441
Total time spent by all map tasks (ms) = 14613
Total time spent by all reduce tasks (ms) = 44120
Total vcore-seconds taken by all map tasks = 146137
Total vcore-seconds taken by all reduce tasks = 44120
Total megabyte-seconds taken by all map tasks = 149644288
Total megabyte-seconds taken by all reduce tasks = 45178880
Map-Reduce Framework
Map input records = 5
Map output records = 5
Map output bytes = 45
Map output materialized bytes = 67
Input split bytes = 208
Combine input records = 5
Combine output records = 5
Reduce input groups = 5
Reduce shuffle bytes = 6
Reduce input records = 5
Reduce output records = 5
Spilled Records = 10
Shuffled Maps = 2
Failed Shuffles = 0
Merged Map outputs = 2
GC time elapsed (ms) = 948
CPU time spent (ms) = 5160
Physical memory (bytes) snapshot = 47749120
Virtual memory (bytes) snapshot = 2899349504
Total committed heap usage (bytes) = 277684224
File Output Format Counters
Bytes Written = 40
Step 8
以下命令用于验证输出文件夹中的结果文件。
The following command is used to verify the resultant files in the output folder.
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
Step 9
以下命令可用于查看 * Part-00000 * 文件中的输出。该文件由 HDFS 生成。
The following command is used to see the output in * Part-00000 * file. This file is generated by HDFS.
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
以下是 MapReduce 程序生成的输出。
Below is the output generated by the MapReduce program.
1981 34
1984 40
1985 45
Important Commands
所有 Hadoop 命令都可使用 $HADOOP_HOME/bin/hadoop 命令调用。在不带任何参数的情况下运行 Hadoop 脚本将打印所有命令的描述。
All Hadoop commands are invoked by the $HADOOP_HOME/bin/hadoop command. Running the Hadoop script without any arguments prints the description for all commands.
Usage − hadoop [--config confdir] COMMAND
Usage − hadoop [--config confdir] COMMAND
下表列出了可用选项及其说明。
The following table lists the options available and their description.
Sr.No. |
Option & Description |
1 |
namenode -format Formats the DFS filesystem. |
2 |
secondarynamenode Runs the DFS secondary namenode. |
3 |
namenode Runs the DFS namenode. |
4 |
datanode Runs a DFS datanode. |
5 |
dfsadmin Runs a DFS admin client. |
6 |
mradmin Runs a Map-Reduce admin client. |
7 |
fsck Runs a DFS filesystem checking utility. |
8 |
fs Runs a generic filesystem user client. |
9 |
balancer Runs a cluster balancing utility. |
10 |
oiv Applies the offline fsimage viewer to an fsimage. |
11 |
fetchdt Fetches a delegation token from the NameNode. |
12 |
jobtracker Runs the MapReduce job Tracker node. |
13 |
pipes Runs a Pipes job. |
14 |
tasktracker Runs a MapReduce task Tracker node. |
15 |
historyserver Runs job history servers as a standalone daemon. |
16 |
job Manipulates the MapReduce jobs. |
17 |
queue Gets information regarding JobQueues. |
18 |
version Prints the version. |
19 |
jar <jar> Runs a jar file. |
20 |
distcp <srcurl> <desturl> Copies file or directories recursively. |
21 |
distcp2 <srcurl> <desturl> DistCp version 2. |
22 |
archive -archiveName NAME -p <parent path> <src> <dest>* Creates a hadoop archive. |
23 |
classpath Prints the class path needed to get the Hadoop jar and the required libraries. |
24 |
daemonlog Get/Set the log level for each daemon |
How to Interact with MapReduce Jobs
用法 - hadoop job [GENERIC_OPTIONS]
Usage − hadoop job [GENERIC_OPTIONS]
以下是 Hadoop 作业中可用的通用选项。
The following are the Generic Options available in a Hadoop job.
Sr.No. |
GENERIC_OPTION & Description |
1 |
-submit <job-file> Submits the job. |
2 |
-status <job-id> Prints the map and reduce completion percentage and all job counters. |
3 |
-counter <job-id> <group-name> <countername> Prints the counter value. |
4 |
-kill <job-id> Kills the job. |
5 |
-events <job-id> <fromevent-> <-of-events> Prints the events' details received by jobtracker for the given range. |
6 |
-history [all] <jobOutputDir> - history < jobOutputDir> Prints job details, failed and killed tip details. More details about the job such as successful tasks and task attempts made for each task can be viewed by specifying the [all] option. |
7 |
-list[all] Displays all jobs. -list displays only jobs which are yet to complete. |
8 |
-kill-task <task-id> Kills the task. Killed tasks are NOT counted against failed attempts. |
9 |
-fail-task <task-id> Fails the task. Failed tasks are counted against failed attempts. |
10 |
-set-priority <job-id> <priority> Changes the priority of the job. Allowed priority values are VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW |
To see the status of job
$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004