Map Reduce 简明教程
MapReduce - Introduction
MapReduce 是一种编程模型,用于编写可以在多个节点上并行处理大数据的应用程序。MapReduce 为分析大量复杂数据提供了分析功能。
What is Big Data?
大数据是大量数据集的集合,无法使用传统计算技术进行处理。例如,Facebook 或 YouTube 每天需要收集和管理的数据量,属于大数据的范畴。然而,大数据不仅仅是规模和容量,还涉及以下一个或多个方面:速度、多样性、容量和复杂性。
Why MapReduce?
传统企业系统通常有一个集中式服务器来存储和处理数据。以下插图描绘了传统企业系统的示意图。传统模型显然不适合处理大量的可扩展数据,也无法容纳标准的数据库服务器。此外,集中式系统在同时处理多个文件时会造成太多瓶颈。
谷歌使用一种名为 MapReduce 的算法解决了这个瓶颈问题。MapReduce 将一个任务分成小部分并将其分配给多台计算机。稍后,结果收集到一处并集成形成结果数据集。
How MapReduce Works?
MapReduce 算法包含两项重要任务,即 Map 和 Reduce。
-
Map 任务获取一组数据并将它们转换成另一组数据,其中各个元素被分解成元组(键值对)。
-
Reduce 任务获取 Map 的输出作为输入,并将这些数据元组(键值对)组合成更小的一组元组。
reduce 任务始终在 map 作业之后执行。
现在让我们仔细研究每个阶段并尝试理解它们的意义。
-
Input Phase - 在这里,我们有一个记录阅读器,它转换输入文件中每个记录,并将解析后的数据以键值对的形式发送给映射器。
-
Map - Map 是一个用户定义的函数,它获取一系列键值对并处理其中的每一个,以生成零个或更多键值对。
-
Intermediate Keys - 映射器生成的键值对称为中间键。
-
Combiner - 组合器是一种局部 Reducer,它将 map 阶段中类似的数据分组到可识别的集合中。它获取映射器的中间键作为输入,并应用用户定义的代码在一个映射器的较小范围内聚合值。它不是 MapReduce 算法的主要部分;它是可选的。
-
Shuffle and Sort - Reducer 任务从 Shuffle and Sort 步骤开始。它将分组后的键值对下载到本地机器,Reducer 在本地机器上运行。各个键值对按键排序到更大的数据列表中。数据列表将等效的键分组在一起,以便在 Reducer 任务中轻松迭代它们的值。
-
Reducer - Reducer 获取分组后的键值对数据作为输入,并对其中每一个运行一个 Reducer 函数。在这里,数据可以以多种方式聚合、筛选和组合,并且需要广泛的处理。执行完成后,它会给最后一步提供零个或更多键值对。
-
Output Phase - 在输出阶段,我们有一个输出格式化器,它将 Reducer 函数中的最终键值对转换为最终形式,并使用记录写入器将它们写入文件。
让我们尝试借助一小部分示意图理解 Map 和 Reduce 这两个任务 −
MapReduce - Algorithm
MapReduce 算法包含两项重要任务,即 Map 和 Reduce。
-
Map 任务由映射程序类完成
-
还原任务由还原程序类完成。
映射程序类接受输入,对输入进行标记化、映射和排序。映射程序类的输出作为还原程序类的输入,还原程序类反过来搜索匹配对并减少它们。
MapReduce 实施各种数学算法,将任务划分为较小的部分并将其分配给多个系统。从技术角度而言,MapReduce 算法有助于将 Map 和 Reduce 任务发送到集群中的适当服务器。
这些数学算法可能包括以下内容:
-
Sorting
-
Searching
-
Indexing
-
TF-IDF
Sorting
排序是处理和分析数据的基本 MapReduce 算法之一。MapReduce 实施排序算法,通过键自动对映射程序的输出键值对进行排序。
-
排序方法在映射程序类本身中实现。
-
在混合和排序阶段,在映射程序类中标记值之后, Context 类(用户定义类)将匹配的值键作为集合收集起来。
-
为了收集类似的键值对(中间键),映射程序类借助 RawComparator 类对键值对进行排序。
-
Hadoop 会自动对给定 Reducer 的中级键值对集合进行排序,以形成键值(K2,{V2,V2,…}),然后才会将它们呈交给 Reducer。
Searching
搜索在 MapReduce 算法中扮演着重要的角色。它有助于合并器阶段(可选)和 Reducer 阶段。让我们尝试借助一个示例了解搜索的运作方式。
Example
以下示例展示了 MapReduce 如何采用搜索算法找出给定员工数据集中薪水最高员工的详细信息。
-
假设我们有四个不同文件中的员工数据 - A、B、C 和 D。我们还要假设所有这四个文件中都有重复的员工记录,因为员工数据是从所有数据库表中反复导入的。参见以下图示。
-
The Map phase 处理每个输入文件,并提供键值对形式的员工数据(<k, v>:<员工姓名,工资>)。参见以下图示。
-
The combiner phase (搜索技术)将接受 Map 阶段的输入,作为带有员工姓名和工资的键值对。使用搜索技术,合并器将检查所有员工的工资,以找出每个文件中工资最高的员工。请参阅以下代码片段。
<k: employee name, v: salary>
Max= the salary of an first employee. Treated as max salary
if(v(second employee).salary > Max){
Max = v(salary);
}
else{
Continue checking;
}
预期结果如下:
<satish,26000><gopal,50000><kiran,45000><manisha,45000> |
<satish, 26000> |
<satish, 26000> |
<gopal, 50000> |
<gopal, 50000> |
<kiran, 45000> |
<kiran, 45000> |
<manisha, 45000> |
<manisha, 45000> |
-
Reducer phase − 您将从每个文件中找出工资最高的员工。为了避免冗余,检查所有 <k,v> 对,并消除重复项(如果有)。相同的算法用于四个 <k,v> 对之间,这些对来自四个输入文件。最终输出应如下所示 −
<gopal, 50000>
Indexing
通常,索引用于指向特定数据及其地址。它对特定 Mapper 的输入文件执行批处理索引。
MapReduce 中通常使用的索引技术称为 inverted index. Google 和 Bing 等搜索引擎使用反向索引技术。让我们尝试借助一个简单示例了解索引的运作方式。
Example
以下文本是反向索引的输入。其中 T[0]、T[1] 和 t[2] 是文件名,其内容置于双引号中。
T[0] = "it is what it is"
T[1] = "what is it"
T[2] = "it is a banana"
应用索引算法后,我们得到以下输出 −
"a": {2}
"banana": {2}
"is": {0, 1, 2}
"it": {0, 1, 2}
"what": {0, 1}
此处 "a": {2} 表示术语 "a" 出现在 T[2] 文件中。类似地,"is": {0, 1, 2} 表示术语 "is" 出现在文件 T[0]、T[1] 和 T[2] 中。
TF-IDF
TF-IDF 是一种文本处理算法,是术语频率 - 逆向文件频率的缩写。它是常见的 Web 分析算法之一。在此,术语“频率”是指术语在文档中出现的次数。
Term Frequency (TF)
它测量特定术语在文档中出现的频率。它通过将单词在文档中出现的次数除以该文档中单词总数来计算。
TF(the) = (Number of times term the ‘the’ appears in a document) / (Total number of terms in the document)
MapReduce - Installation
MapReduce仅在类Linux的操作系统上运行,并内置于Hadoop框架中。我们需要执行以下步骤来安装Hadoop框架。
Verifying JAVA Installation
在安装Hadoop之前,必须在系统中安装Java。使用以下命令检查您的系统是否已安装Java。
$ java –version
如果系统上已安装 Java,您将看到以下响应 -
java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)
如果您尚未在系统中安装Java,请按照以下步骤操作。
Installing Java
Step 2
使用以下命令来解压jdk-7u71-linux-x64.gz的内容。
$ cd Downloads/
$ ls
jdk-7u71-linux-x64.gz
$ tar zxf jdk-7u71-linux-x64.gz
$ ls
jdk1.7.0_71 jdk-7u71-linux-x64.gz
Step 3
要向所有用户提供Java,您必须将其移动到“/usr/local/”位置。转到root并键入以下命令:
$ su
password:
# mv jdk1.7.0_71 /usr/local/java
# exit
Step 4
为设置 PATH 和 JAVA_HOME 变量,将以下命令添加到 ~/.bashrc 文件中。
export JAVA_HOME=/usr/local/java
export PATH=$PATH:$JAVA_HOME/bin
将所有更改应用到当前运行的系统。
$ source ~/.bashrc
Step 5
使用以下命令配置Java备用:
# alternatives --install /usr/bin/java java usr/local/java/bin/java 2
# alternatives --install /usr/bin/javac javac usr/local/java/bin/javac 2
# alternatives --install /usr/bin/jar jar usr/local/java/bin/jar 2
# alternatives --set java usr/local/java/bin/java
# alternatives --set javac usr/local/java/bin/javac
# alternatives --set jar usr/local/java/bin/jar
现在使用命令 java -version 从终端验证安装。
Verifying Hadoop Installation
在安装MapReduce之前,必须在您的系统上安装Hadoop。让我们使用以下命令验证Hadoop安装:
$ hadoop version
如果已在您的系统上安装 Hadoop,则会收到以下回复:
Hadoop 2.4.1
--
Subversion https://svn.apache.org/repos/asf/hadoop/common -r 1529768
Compiled by hortonmu on 2013-10-07T06:28Z
Compiled with protoc 2.5.0
From source with checksum 79e53ce7994d1628b240f09af91e1af4
如果您的系统上尚未安装Hadoop,请继续执行以下步骤。
Downloading Hadoop
从Apache软件基金会下载Hadoop 2.4.1,并使用以下命令解压其内容。
$ su
password:
# cd /usr/local
# wget http://apache.claz.org/hadoop/common/hadoop-2.4.1/
hadoop-2.4.1.tar.gz
# tar xzf hadoop-2.4.1.tar.gz
# mv hadoop-2.4.1/* to hadoop/
# exit
Installing Hadoop in Pseudo Distributed mode
以下步骤用于在伪分布模式下安装 Hadoop 2.4.1。
Step 1 − Setting up Hadoop
您可以通过将以下命令附加到~/.bashrc文件来设置Hadoop环境变量。
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin
将所有更改应用到当前运行的系统。
$ source ~/.bashrc
Step 2 − Hadoop Configuration
您可以在位置 “$HADOOP_HOME/etc/hadoop” 中找到所有 Hadoop 配置文件。根据您的 Hadoop 基础架构,您需要在这些配置文件中进行适当的更改。
$ cd $HADOOP_HOME/etc/hadoop
为了使用 Java 开发 Hadoop 程序,您必须替换 hadoop-env.sh 文件中的 JAVA_HOME 值为系统中 Java 的位置,以重置 Java 环境变量。
export JAVA_HOME=/usr/local/java
您必须编辑以下文件以配置 Hadoop -
-
core-site.xml
-
hdfs-site.xml
-
yarn-site.xml
-
mapred-site.xml
core-site.xml
core-site.xml 包含以下信息 -
-
Hadoop 实例使用的端口号
-
分配给文件系统的大小
-
存储数据的内存限制
-
Size of Read/Write buffers
打开 core-site.xml,并在 <configuration> 和 </configuration> 标记之间添加以下属性。
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000 </value>
</property>
</configuration>
hdfs-site.xml
hdfs-site.xml 包含以下信息 -
-
Value of replication data
-
The namenode path
-
您本地文件系统的 data 节点路径(您想存储 Hadoop infra 的位置)
让我们假设以下数据。
dfs.replication (data replication value) = 1
(In the following path /hadoop/ is the user name.
hadoopinfra/hdfs/namenode is the directory created by hdfs file system.)
namenode path = //home/hadoop/hadoopinfra/hdfs/namenode
(hadoopinfra/hdfs/datanode is the directory created by hdfs file system.)
datanode path = //home/hadoop/hadoopinfra/hdfs/datanode
打开该文件,并在 <configuration>、</configuration> 标记之间添加以下属性。
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.name.dir</name>
<value>file:///home/hadoop/hadoopinfra/hdfs/namenode</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>file:///home/hadoop/hadoopinfra/hdfs/datanode </value>
</property>
</configuration>
Note − 在上述文件中,所有属性值都是用户定义的,您可以根据 Hadoop 基础架构进行更改。
yarn-site.xml
此文件用于将纱线配置到 Hadoop 中。打开 yarn-site.xml 文件并在 <configuration>,</configuration> 标记之间添加以下属性。
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
mapred-site.xml
此文件用于指定我们正在使用的 MapReduce 框架。默认情况下,Hadoop 包含 yarn-site.xml 的模板。首先,您需要使用以下命令将文件从 mapred-site.xml.template 复制到 mapred-site.xml 文件。
$ cp mapred-site.xml.template mapred-site.xml
打开 mapred-site.xml 文件并在 <configuration>,</configuration> 标记中添加以下属性。
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
Verifying Hadoop Installation
以下步骤用于验证 Hadoop 安装。
Step 1 − Name Node Setup
使用 “hdfs namenode -format” 命令设置 name 节点,如下所示:
$ cd ~
$ hdfs namenode -format
预期结果如下:
10/24/14 21:30:55 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = localhost/192.168.1.11
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 2.4.1
...
...
10/24/14 21:30:56 INFO common.Storage: Storage directory
/home/hadoop/hadoopinfra/hdfs/namenode has been successfully formatted.
10/24/14 21:30:56 INFO namenode.NNStorageRetentionManager: Going to
retain 1 images with txid >= 0
10/24/14 21:30:56 INFO util.ExitUtil: Exiting with status 0
10/24/14 21:30:56 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at localhost/192.168.1.11
************************************************************/
Step 2 − Verifying Hadoop dfs
执行以下命令以启动您的 Hadoop 文件系统。
$ start-dfs.sh
预期输出如下所示 −
10/24/14 21:37:56
Starting namenodes on [localhost]
localhost: starting namenode, logging to /home/hadoop/hadoop-
2.4.1/logs/hadoop-hadoop-namenode-localhost.out
localhost: starting datanode, logging to /home/hadoop/hadoop-
2.4.1/logs/hadoop-hadoop-datanode-localhost.out
Starting secondary namenodes [0.0.0.0]
Step 3 − Verifying Yarn Script
以下命令用于启动 Yarn 脚本。执行此命令将启动您的 Yarn 守护程序。
$ start-yarn.sh
预期输出如下所示 −
starting yarn daemons
starting resourcemanager, logging to /home/hadoop/hadoop-
2.4.1/logs/yarn-hadoop-resourcemanager-localhost.out
localhost: starting node manager, logging to /home/hadoop/hadoop-
2.4.1/logs/yarn-hadoop-nodemanager-localhost.out
MapReduce - API
在本课程中,我们将仔细研究涉及 MapReduce 编程操作的类及其方法。我们将主要关注以下内容:
-
JobContext Interface
-
Job Class
-
Mapper Class
-
Reducer Class
JobContext Interface
JobContext 接口是所有类的超级接口,定义 MapReduce 中的不同作业。它为您提供了在任务运行时提供给任务的作业的可读视图。
以下是 JobContext 接口的子接口。
S.No. |
Subinterface Description |
1. |
MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 定义给定给映射器的上下文。 |
2. |
ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT>定义传递给 Reducer 的上下文。 |
Job 类是实现 JobContext 接口的主类。
Job Class
Job 类是 MapReduce API 中最重要类。它允许用户配置、提交和控制作业执行,以及查询状态。set 方法只在作业提交前有效,随后它们会抛出 IllegalStateException。
通常,用户会创建应用程序,描述作业的各个方面,然后提交作业并监控其进度。
以下是提交作业的示例:
// Create a new Job
Job job = new Job(new Configuration());
job.setJarByClass(MyJob.class);
// Specify various job-specific parameters
job.setJobName("myjob");
job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));
job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);
// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true);
Constructors
以下是 Job 类的构造函数摘要。
S.No |
Constructor Summary |
1 |
Job() |
2 |
Job(Configuration conf) |
3 |
Job(Configuration conf, String jobName) |
Methods
以下是 Job 类中一些重要的函数:
S.No |
Method Description |
1 |
*getJobName()*User-specified job name. |
2 |
*getJobState()*返回作业的当前状态。 |
3 |
*isComplete()*检查作业是否完成。 |
4 |
*setInputFormatClass()*设置作业的 InputFormat。 |
5 |
*setJobName(String name)*设置用户指定的作业名称。 |
6 |
*setOutputFormatClass()*设置作业的输出格式。 |
7 |
*setMapperClass(Class)*设置作业的映射器。 |
8 |
*setReducerClass(Class)*设置作业的化简器。 |
9 |
*setPartitionerClass(Class)*设置作业的分区程序。 |
10 |
*setCombinerClass(Class)*设置作业的合并器。 |
Mapper Class
Mapper 类定义映射作业。将输入键值对映射到一组中间键值对。映射是将输入记录转换为中间记录的单独任务。转换后的中间记录不必与输入记录的类型相同。给定的输入对可以映射到 0 个或任意多个输出对。
Reducer Class
Reducer 类在 MapReduce 中定义了 Reduce 作业。它将一组共享键的中间值减少到更小的一组值。Reducer 实现可以通过 JobContext.getConfiguration() 方法访问作业的配置。Reducer 有三个主要阶段−Shuffle、Sort 和 Reduce。
-
Shuffle − Reducer 使用 HTTP 跨网络从每个 Mapper 复制排序的输出。
-
Sort − 框架按键对 Reducer 输入进行归并排序(由于不同的 Mapper 可能输出相同的键)。Shuffle 和 sort 阶段同时发生,即在获取输出时,对其进行归并。
-
Reduce − 在此阶段,为排序输入中的每个 <key, (值的集合)> 调用 reduce (Object, Iterable, Context) 方法。
MapReduce - Hadoop Implementation
MapReduce 是一个框架,用于编写应用程序,以便在大量商用硬件集群上可靠地处理海量数据。本章将带您了解在 Hadoop 框架中使用 Java 进行 MapReduce 操作。
MapReduce Algorithm
MapReduce 范式通常基于将 map-reduce 程序发送到实际数据所在的计算机。
-
在 MapReduce 作业期间,Hadoop 会将 Map 和 Reduce 任务发送到集群中的相应服务器。
-
该框架会管理数据传递的所有细节,如发出任务、验证任务完成情况,以及在节点间在集群周围复制数据。
-
大多数计算都会在具有本地磁盘数据的节点上进行,这样可以减少网络流量。
-
在完成给定任务后,集群会收集并减少数据以形成适当的结果,并将它发送回 Hadoop 服务器。
Inputs and Outputs (Java Perspective)
MapReduce 框架对键值对进行操作,即该框架将作业的输入视为一组键值对,并将一组键值对作为作业的输出生成,想象中不同类型。
键和值类必须是可序列化框架,因此需要实现 Writable 接口。此外,键类必须实现 WritableComparable 接口,以利于框架进行排序。
MapReduce 作业的输入和输出格式都是键值对形式−
(Input) <k1, v1> → map → <k2, v2>→ reduce → <k3, v3> (Output)。
Input |
Output |
|
Map |
<k1, v1> |
list (<k2, v2>) |
Reduce |
<k2, list(v2)> |
list (<k3, v3>) |
MapReduce Implementation
下表显示了有关组织用电量的资料。该表包括连续五年的每月用电量和年平均值。
Jan |
Feb |
Mar |
Apr |
May |
Jun |
Jul |
Aug |
Sep |
Oct |
Nov |
Dec |
Avg |
|
1979 |
23 |
23 |
2 |
43 |
24 |
25 |
26 |
26 |
26 |
26 |
25 |
26 |
25 |
1980 |
26 |
27 |
28 |
28 |
28 |
30 |
31 |
31 |
31 |
30 |
30 |
30 |
29 |
1981 |
31 |
32 |
32 |
32 |
33 |
34 |
35 |
36 |
36 |
34 |
34 |
34 |
34 |
1984 |
39 |
38 |
39 |
39 |
39 |
41 |
42 |
43 |
40 |
39 |
38 |
38 |
40 |
1985 |
38 |
39 |
39 |
39 |
39 |
41 |
41 |
41 |
00 |
40 |
39 |
39 |
45 |
我们需要编写应用程序,处理给定表中的输入数据,找到用电量最大的年份、用电量最小的年份,等等。对于记录数量有限的程序员来说,这项任务很简单,因为他们只需编写逻辑来生成所需的输出,并将数据传递到编写的应用程序。
现在我们来增加输入数据的规模。假设我们必须分析一个特定州的所有大型产业的用电量。当我们编写应用程序来处理这种大量数据时,
-
它们执行需要很长时间。
-
当我们把数据从源数据移动到网络服务器时,网络流量将会很大。
为了解决这些问题,我们有MapReduce框架。