Hadoop 简明教程
Hadoop - MapReduce
MapReduce是一个框架,我们可以使用该框架编写应用程序来并行处理海量数据,在大规模商品硬件集群上,以可靠的方式处理数据。
What is MapReduce?
MapReduce是一种基于java的分布式计算的处理技术和程序模型。MapReduce算法包含两个重要任务,即Map和Reduce。Map获取一组数据并将它转换为另一组数据,其中各个元素被分解成元组(键/值对)。其次是reduce任务,它将map的输出作为输入,并将这些数据元组组合成更小的一组元组。正如MapReduce名称的顺序所暗示的那样,reduce任务总是在map作业之后执行。
MapReduce的主要优势在于,可以在多个计算节点上轻松扩展数据处理。在MapReduce模型中,数据处理原语称为映射器和还原器。将数据处理应用程序分解成映射器和还原器有时并非易事。但是,一旦我们在MapReduce形式中编写了一个应用程序,将应用程序扩展到在一个集群中成百上千甚至数万台机器上运行仅仅是一次配置更改。这种简单的可扩展性吸引了许多程序员使用MapReduce模型。
The Algorithm
-
通常,MapReduce范例基于将计算机发送到数据所在的位置!
-
MapReduce程序执行三个阶段,分别是map阶段、shuffle阶段和reduce阶段。 Map stage - map或mapper的任务是处理输入数据。通常,输入数据以文件或目录的形式存在,并存储在Hadoop文件系统(HDFS)中。输入文件按行传递到mapper函数。mapper处理数据并创建许多小块数据。 Reduce stage - 此阶段是 * Shuffle * 阶段和 Reduce 阶段的组合。Reducer的任务是处理来自mapper的数据。处理后,它会生成一组新输出,将存储在HDFS中。
-
在 MapReduce 作业期间,Hadoop 会将映射和缩减任务发送到集群中的适当服务器。
-
该框架管理数据传递的所有详细信息,诸如发布任务、验证任务完成和在节点之间复制集群中的数据。
-
大多数计算发生在本地磁盘上有数据的节点上,这减少了网络流量。
-
在给定任务完成后,集群会收集并缩减数据以形成适当的结果,然后将其发送回 Hadoop 服务器。
Inputs and Outputs (Java Perspective)
MapReduce 框架使用 <key, value> 对工作,即,该框架将作业的输入视为一组 <key, value> 对,并且将一组 <key, value> 对作为作业的输出,可能是不同类型的。
密钥和值类应该是通过该框架串行化的,因此,需要实现 Writable 接口。此外,关键类必须实现 Writable-Comparable 接口以方便框架进行排序。 MapReduce job 的输入和输出类型——(输入) <k1, v1> → 映射 → <k2, v2> → 缩减 → <k3, v3>(输出)。
Input |
Output |
|
Map |
<k1, v1> |
list (<k2, v2>) |
Reduce |
<k2, list(v2)> |
list (<k3, v3>) |
Terminology
-
PayLoad - 应用程序实现映射和缩减函数,并构成作业的核心。
-
Mapper - 映射器将输入键/值对映射到一组中间键/值对。
-
NamedNode - 管理 Hadoop 分布式文件系统 (HDFS) 的节点。
-
DataNode - 在进行任何处理之前,提前提供数据的节点。
-
MasterNode - JobTracker 运行并接受来自客户端的作业请求的节点。
-
SlaveNode - 映射和缩减程序运行的节点。
-
JobTracker - 调度作业并跟踪将作业分配给任务跟踪器。
-
Task Tracker - 跟踪任务并将状态报告给 JobTracker。
-
Job - 程序是在数据集上执行映射器和缩减器。
-
Task - 在数据切片上执行映射器或缩减器的执行。
-
Task Attempt - 在 SlaveNode 上尝试执行任务的特定实例。
Example Scenario
下面提供了一个组织的用电量数据。它包含月用电量和各个年份的年平均用电量。
Jan |
Feb |
Mar |
Apr |
May |
Jun |
Jul |
Aug |
Sep |
Oct |
Nov |
Dec |
Avg |
|
1979 |
23 |
23 |
2 |
43 |
24 |
25 |
26 |
26 |
26 |
26 |
25 |
26 |
25 |
1980 |
26 |
27 |
28 |
28 |
28 |
30 |
31 |
31 |
31 |
30 |
30 |
30 |
29 |
1981 |
31 |
32 |
32 |
32 |
33 |
34 |
35 |
36 |
36 |
34 |
34 |
34 |
34 |
1984 |
39 |
38 |
39 |
39 |
39 |
41 |
42 |
43 |
40 |
39 |
38 |
38 |
40 |
1985 |
38 |
39 |
39 |
39 |
39 |
41 |
41 |
41 |
00 |
40 |
39 |
39 |
45 |
如果输入了上述数据,我们必须编写应用程序来处理它并产生结果,例如找到使用量最大的年份、使用量最小的年份,等等。这对具有有限数量记录的程序员来说是小菜一碟。他们只需编写逻辑以生成所需输出,并将数据传递给编写的应用程序。
但请想到自成立以来某一特定州的所有大规模行业的用电量所代表的数据。
当我们编写应用程序来处理此类数据时,
-
它们执行需要很长时间。
-
当我们将数据从源移动到网络服务器时会造成大量的网络流量。
为了解决这些问题,我们有MapReduce框架。
Input Data
上述数据保存为*sample.txt*并作为输入提供。输入文件如下图所示。
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45
Example Program
下面提供了使用MapReduce框架处理示例数据的程序。
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits {
//Mapper class
public static class E_EMapper extends MapReduceBase implements
Mapper<LongWritable ,/*Input key Type */
Text, /*Input value Type*/
Text, /*Output key Type*/
IntWritable> /*Output value Type*/
{
//Map function
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens()) {
lasttoken = s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {
//Reduce function
public void reduce( Text key, Iterator <IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int maxavg = 30;
int val = Integer.MIN_VALUE;
while (values.hasNext()) {
if((val = values.next().get())>maxavg) {
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception {
JobConf conf = new JobConf(ProcessUnits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
将上述程序保存为 ProcessUnits.java. 程序的编译和执行在下面进行了说明。
Compilation and Execution of Process Units Program
让我们假设我们处于Hadoop用户的home目录下(如/home/hadoop)。
按照以下步骤编译和执行上述程序。
Step 2
下载用于编译和执行MapReduce程序的 Hadoop-core-1.2.1.jar, 。访问以下链接 mvnrepository.com 下载jar包。我们假设下载的文件夹是 /home/hadoop/.
Step 3
以下命令用于编译 ProcessUnits.java 程序并创建该程序的jar包。
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .
Step 5
以下命令用于将名为*sample.txt*的输入文件复制到HDFS的输入目录中。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
Step 7
以下命令用于通过从输入目录中获取输入文件来运行Eleunit_max应用程序。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
等待一段时间,直到文件执行完成。在执行之后,结果将包含输入拆分的数量、Map任务的数量、reducer任务的数量等,如下所示。
INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read = 61
FILE: Number of bytes written = 279400
FILE: Number of read operations = 0
FILE: Number of large read operations = 0
FILE: Number of write operations = 0
HDFS: Number of bytes read = 546
HDFS: Number of bytes written = 40
HDFS: Number of read operations = 9
HDFS: Number of large read operations = 0
HDFS: Number of write operations = 2 Job Counters
Launched map tasks = 2
Launched reduce tasks = 1
Data-local map tasks = 2
Total time spent by all maps in occupied slots (ms) = 146137
Total time spent by all reduces in occupied slots (ms) = 441
Total time spent by all map tasks (ms) = 14613
Total time spent by all reduce tasks (ms) = 44120
Total vcore-seconds taken by all map tasks = 146137
Total vcore-seconds taken by all reduce tasks = 44120
Total megabyte-seconds taken by all map tasks = 149644288
Total megabyte-seconds taken by all reduce tasks = 45178880
Map-Reduce Framework
Map input records = 5
Map output records = 5
Map output bytes = 45
Map output materialized bytes = 67
Input split bytes = 208
Combine input records = 5
Combine output records = 5
Reduce input groups = 5
Reduce shuffle bytes = 6
Reduce input records = 5
Reduce output records = 5
Spilled Records = 10
Shuffled Maps = 2
Failed Shuffles = 0
Merged Map outputs = 2
GC time elapsed (ms) = 948
CPU time spent (ms) = 5160
Physical memory (bytes) snapshot = 47749120
Virtual memory (bytes) snapshot = 2899349504
Total committed heap usage (bytes) = 277684224
File Output Format Counters
Bytes Written = 40
Important Commands
所有 Hadoop 命令都可使用 $HADOOP_HOME/bin/hadoop 命令调用。在不带任何参数的情况下运行 Hadoop 脚本将打印所有命令的描述。
Usage − hadoop [--config confdir] COMMAND
下表列出了可用选项及其说明。
Sr.No. |
Option & Description |
1 |
namenode -format 格式化 DFS 文件系统。 |
2 |
secondarynamenode 运行 DFS 次要 NameNode。 |
3 |
namenode Runs the DFS namenode. |
4 |
datanode Runs a DFS datanode. |
5 |
dfsadmin 运行 DFS 管理客户机。 |
6 |
mradmin 运行 Map-Reduce 管理客户机。 |
7 |
fsck 运行 DFS 文件系统检查实用程序。 |
8 |
fs 运行通用文件系统用户客户端。 |
9 |
balancer 运行集群平衡实用程序。 |
10 |
oiv 将离线 fsimage 查看器应用于 fsimage。 |
11 |
fetchdt 从 NameNode 提取委托标记。 |
12 |
jobtracker 运行 MapReduce 作业追踪器节点。 |
13 |
pipes Runs a Pipes job. |
14 |
tasktracker 运行 MapReduce 任务追踪器节点。 |
15 |
historyserver 将作业历史记录服务器作为独立守护程序运行。 |
16 |
job Manipulates the MapReduce jobs. |
17 |
queue Gets information regarding JobQueues. |
18 |
version Prints the version. |
19 |
jar <jar> 运行 jar 文件。 |
20 |
distcp <srcurl> <desturl> 递归复制文件或目录。 |
21 |
distcp2 <srcurl> <desturl> DistCp 版本 2。 |
22 |
archive -archiveName NAME -p <parent path> <src> <dest>*创建 hadoop 存档。 |
23 |
classpath 打印获取 Hadoop jar 和所需库所需的类路径。 |
24 |
daemonlog 获取/设置每个守护程序的日志级别 |
How to Interact with MapReduce Jobs
用法 - hadoop job [GENERIC_OPTIONS]
以下是 Hadoop 作业中可用的通用选项。
Sr.No. |
GENERIC_OPTION & Description |
1 |
-submit <job-file> Submits the job. |
2 |
-status <job-id> 打印映射和归约完成百分比以及所有作业计数器。 |
3 |
-counter <job-id> <group-name> <countername> 打印计数器值。 |
4 |
-kill <job-id> Kills the job. |
5 |
-events <job-id> <fromevent-> <-of-events> 打印作业跟踪器在给定范围内收到的事件详细信息。 |
6 |
-history [all] <jobOutputDir> - history < jobOutputDir> 打印作业详细信息、失败和已终止提示详细信息。可以通过指定 [all] 选项查看作业的更多详细信息,例如每个任务的成功任务和任务尝试。 |
7 |
-list[all] 显示所有作业。-list 仅显示尚未完成的作业。 |
8 |
-kill-task <task-id> 终止任务。已终止的任务不会计入失败的尝试。 |
9 |
-fail-task <task-id> 使任务失败。失败的任务会算入失败的尝试。 |
10 |
-set-priority <job-id> <priority> 更改作业的优先级。允许的优先级值为 VERY_HIGH、HIGH、NORMAL、LOW、VERY_LOW |
To see the status of job
$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004