Hadoop 简明教程

Hadoop - Quick Guide

Hadoop - Big Data Overview

由于新技术、设备以及像社交网站那样的通信手段的出现,人类制作的数据量每年都在快速增长。我们从时间开始到 2003 年产生数据的数量为 50 亿千兆字节。如果将数据以磁盘的形式堆叠起来,它可以填满整个足球场。同等数量的数据在 2011 中每两天产生一次,在 2013 中每十分钟产生一次。这个比率仍在剧烈增长。尽管产生的所有这些信息都是有意义的,并且在处理时可能是有用的,但它却被忽视了。

What is Big Data?

Big data 是大量数据集的集合,无法使用传统计算技术进行处理。它不是单一技术或工具,而是一个完整的学科,其包括各种工具、技术和框架。

What Comes Under Big Data?

大数据涉及由不同设备和应用程序生成的数据。以下是属于大数据保护范围的一些领域。

  1. Black Box Data − 它是直升飞机、飞机和喷气式飞机等的组件。它捕捉飞行员的声音、麦克风和耳机的录音以及飞机的性能信息。

  2. Social Media Data − Facebook 和 Twitter 等社交媒体包含全球数百万用户发布的信息和观点。

  3. Stock Exchange Data − 股票交易所数据包含客户针对不同公司的股票做出的“买入”和“卖出”决策信息。

  4. Power Grid Data − 电网数据包含特定节点相对于基站消耗的信息。

  5. Transport Data − 交通数据包括车辆的型号、容量、距离和可用性。

  6. Search Engine Data − 搜索引擎从不同数据库检索大量数据。

big data

因此,大数据包含海量、高速和可扩展性多种数据。其中数据有三种类型。

  1. Structured data − 关系数据。

  2. Semi Structured data − XML 数据。

  3. Unstructured data − Word、PDF、文本、媒体日志。

Benefits of Big Data

  1. 营销机构会使用 Facebook 等社交网络中保存的信息,了解其活动、促销和其他广告媒体的响应。

  2. 产品公司和零售组织会使用社交媒体中的信息,例如消费者的偏好和产品认知,来规划其生产。

  3. 医院会使用患者既往病史相关数据来提供更好、更快的服务。

Big Data Technologies

大数据技术对于提供更准确的分析非常重要,这可能导致更具体决策,从而提高运营效率、降低成本,并降低业务风险。

为了利用大数据的强大优势,你需要一个基础结构,该基础结构可以实时管理和处理海量的结构化和非结构化数据,并且可以保护数据隐私和安全性。

市场上有许多来自不同供应商(包括 Amazon、IBM、Microsoft 等)的各种技术来处理大数据。在研究用于处理大数据的技术时,我们会研究以下两类技术 −

Operational Big Data

这包括诸如 MongoDB 之类的系统,这些系统提供了可针对实际、交互式工作负载(其中数据会首先捕获和存储)的操作功能。

NoSQL 大数据系统被设计用于利用在过去十年间出现的新的云计算架构,以实现经济高效地运行大规模计算。这让在操作大数据工作负载时更容易管理、成本更低、实施起来更快。

一些 NoSQL 系统能够基于实时数据以最少编码并无需数据科学家和额外基础结构来洞察模式和趋势。

Analytical Big Data

这些包括诸如海量并行处理 (MPP) 数据库系统和 MapReduce 之类的系统,这些系统为可能涉及大多数或所有数据的回顾和复杂分析提供了分析功能。

MapReduce 提供了一种新型数据分析方法,它补充了 SQL 所提供的功能,并提供了基于 MapReduce 的系统,该系统可以从单台服务器扩展到数千台高端和低端机器。

这两类技术是互补的,经常一起部署。

Operational vs. Analytical Systems

Operational

Analytical

Latency

1 毫秒 - 100 毫秒

1 分钟 - 100 分钟

Concurrency

1000 - 100,000

1 - 10

Access Pattern

Writes and Reads

Reads

Queries

Selective

Unselective

Data Scope

Operational

Retrospective

End User

Customer

Data Scientist

Technology

NoSQL

MapReduce, MPP Database

Big Data Challenges

大数据相关的重大挑战如下 −

  1. Capturing data

  2. Curation

  3. Storage

  4. Searching

  5. Sharing

  6. Transfer

  7. Analysis

  8. Presentation

为了应对上述挑战,组织通常会借助企业服务器。

Hadoop - Big Data Solutions

Traditional Approach

与此方法类似,一家企业将备有一台计算机来存储和处理海量数据。出于存储的目的,程序员将借助他们的数据库供应商(比如甲骨文和 IBM 等)的选择。在此方法中,用户与应用程序交互,应用程序再处理数据存储和分析部分。

traditional approach

Limitation

此方法对于那些处理少量的标准数据库服务器即可容纳或达到正在处理数据的处理器限制的数据的应用程序来说效果很好。但当处理大量的可扩展数据时,通过单一数据库瓶颈处理此类数据是一件繁重的工作。

Google’s Solution

Google 使用一种称为 MapReduce 的算法解决了此问题。该算法将任务分成小块,并将它们分配给多台计算机,并从它们那里收集结果,这些结果在集成后形成结果数据集。

mapreduce

Hadoop

使用Google提供的解决方案, Doug Cutting 和他的团队开发了一个名为 HADOOP 的开源项目。

Hadoop 使用MapReduce算法运行应用程序,其中数据与其他数据并行处理。简而言之,Hadoop 用于开发可以在海量数据上执行完整统计分析的应用程序。

hadoop framework

Hadoop - Introduction

Hadoop 是一个 Apache 开源框架,使用 Java 编写,它允许跨计算机集群使用简单的编程模型对大型数据集进行分布式处理。Hadoop 框架应用程序在提供跨计算机集群的分布式存储和计算的环境中运行。Hadoop 被设计为从单个服务器扩展到数千台机器,每台机器提供本地计算和存储。

Hadoop Architecture

Hadoop 的核心有两个主要层,即:

  1. Processing/Computation layer (MapReduce), and

  2. 存储层(Hadoop 分布式文件系统)。

hadoop architecture

MapReduce

MapReduce 是一种并行编程模型,用于编写分布式应用程序,由 Google 设计,用于在大量商品硬件集群(数千个节点)上可靠且容错地高效处理大量数据(多太字节数据集)。MapReduce 程序在 Hadoop 上运行,Hadoop 是一个 Apache 开源框架。

Hadoop Distributed File System

Hadoop 分布式文件系统(HDFS)基于 Google 文件系统(GFS),并提供了一个分布式文件系统,该文件系统设计为在商品硬件上运行。它与现有的分布式文件系统有许多相似之处。但是,与其他分布式文件系统的不同之处是很明显的。它具有极高的容错性,并且设计为部署在低成本硬件上。它提供对应用程序数据的高速访问,并且适用于具有大型数据集的应用程序。

除了上述两个核心组件之外,Hadoop 框架还包括以下两个模块:

  1. Hadoop Common − 这些是由其他 Hadoop 模块需要的 Java 库和实用工具。

  2. Hadoop YARN − 这是一个用于作业调度和集群资源管理的框架。

How Does Hadoop Work?

构建具有繁重配置来处理大规模处理的大型服务器非常昂贵,但是作为一种替代方案,你可以将许多具有单个 CPU 的商品计算机连接在一起,作为一个单一的功能分布式系统,实际上,集群机器可以并行读取数据集并提供更高的吞吐量。而且,它比一台高端服务器便宜。因此这是使用 Hadoop 背后的第一个激励因素,因为它可以在集群且低成本的机器上运行。

Hadoop 在一组计算机之间运行代码。这个过程包括 Hadoop 执行的以下几个核心任务 -

  1. 数据最初分为目录和文件。文件分为 128M 和 64M(最好为 128M)的统一大小块。

  2. 然后将这些文件分发到不同的集群节点以进行进一步处理。

  3. HDFS 作为本地文件系统的上层,监督处理。

  4. 复制块以处理硬件故障。

  5. 检查代码是否成功执行。

  6. 执行在 map 和 reduce 阶段之间进行的排序。

  7. 将排序好的数据发送到某台计算机。

  8. 为每个作业编写调试日志。

Advantages of Hadoop

  1. Hadoop 框架允许用户快速编写和测试分布式系统。它高效,并自动在机器之间分配数据和工作,进而利用 CPU 核心的底层并行性。

  2. Hadoop 不依赖于硬件来提供容错和高可用性 (FTHA),而是 Hadoop 库本身被设计为在应用程序层检测和处理故障。

  3. 服务器可以动态地从集群中添加或删除,并且 Hadoop 继续不间断地运行。

  4. Hadoop 的另一个一大优势是,除了是开源的,它还可以兼容所有基于 Java 的平台。

Hadoop - Enviornment Setup

Hadoop 受 GNU/Linux 平台及其变体支持。因此,我们必须安装 Linux 操作系统以设置 Hadoop 环境。如果你有除 Linux 以外的其他操作系统,则可以在其中安装 Virtualbox 软件,并在 Virtualbox 内使用 Linux。

Pre-installation Setup

在将 Hadoop 安装到 Linux 环境之前,我们需要使用 ` ssh `(安全 Shell)设置 Linux。按照以下步骤设置 Linux 环境。

Creating a User

开始时,建议为 Hadoop 创建一个单独的用户,以将 Hadoop 文件系统与 Unix 文件系统隔离。按照以下步骤创建用户:

  1. 使用命令“su”打开 root。

  2. 使用命令“useradd username”从 root 帐户创建用户。

  3. 现在可以使用命令“su username”打开一个现有用户帐户。

打开 Linux 终端并输入以下命令以创建用户。

$ su
   password:
# useradd hadoop
# passwd hadoop
   New passwd:
   Retype new passwd

SSH Setup and Key Generation

SSH 设置需要对集群执行不同的操作,例如启动、停止、分布式守护程序 shell 操作。要对不同 Hadoop 用户进行身份验证,需要为 Hadoop 用户提供公钥/私钥对,并与不同用户共享。

以下命令用于使用 SSH 生成密钥值对。将公钥窗体 id_rsa.pub 复制到 authorized_keys,并分别向所有者授予 authorized_keys 文件的读写权限。

$ ssh-keygen -t rsa
$ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
$ chmod 0600 ~/.ssh/authorized_keys

Installing Java

Java 是 Hadoop 的主要先决条件。首先,您应该使用 “java -version” 命令验证系统中 java 的存在。java 版本命令的语法如下。

$ java -version

如果一切正常,它将为您提供以下输出。

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

如果您的系统中未安装 java,请按照以下步骤安装 java。

Step 1

访问以下链接下载 java (JDK <最新版本> - X64.tar.gz):{s0}

然后 jdk-7u71-linux-x64.tar.gz 将下载到您的系统中。

Step 2

通常,您将在“下载”文件夹中找到下载的 java 文件。使用以下命令对其进行验证并解压缩 jdk-7u71-linux-x64.gz 文件。

$ cd Downloads/
$ ls
jdk-7u71-linux-x64.gz

$ tar zxf jdk-7u71-linux-x64.gz
$ ls
jdk1.7.0_71   jdk-7u71-linux-x64.gz

Step 3

为了让所有用户都可以使用 java,您必须将其移动到 “/usr/local/” 位置。打开 root,并键入以下命令。

$ su
password:
# mv jdk1.7.0_71 /usr/local/
# exit

Step 4

为设置 PATHJAVA_HOME 变量,将以下命令添加到 ~/.bashrc 文件。

export JAVA_HOME=/usr/local/jdk1.7.0_71
export PATH=$PATH:$JAVA_HOME/bin

现在将所有更改应用到当前正在运行的系统中。

$ source ~/.bashrc

Step 5

使用以下命令配置 java 替代项 −

# alternatives --install /usr/bin/java java usr/local/java/bin/java 2
# alternatives --install /usr/bin/javac javac usr/local/java/bin/javac 2
# alternatives --install /usr/bin/jar jar usr/local/java/bin/jar 2

# alternatives --set java usr/local/java/bin/java
# alternatives --set javac usr/local/java/bin/javac
# alternatives --set jar usr/local/java/bin/jar

现在,如上所述,从终端验证 java -version 命令。

Downloading Hadoop

使用以下命令从 Apache 软件基金会下载并解压 Hadoop 2.4.1。

$ su
password:
# cd /usr/local
# wget http://apache.claz.org/hadoop/common/hadoop-2.4.1/
hadoop-2.4.1.tar.gz
# tar xzf hadoop-2.4.1.tar.gz
# mv hadoop-2.4.1/* to hadoop/
# exit

Hadoop Operation Modes

下载 Hadoop 后,您可以在三种受支持模式中操作您的 Hadoop 集群 −

  1. Local/Standalone Mode − 在系统中下载 Hadoop 后,默认情况下,它配置为独立模式,并且可以作为单个 java 进程运行。

  2. Pseudo Distributed Mode − 它是单个机器上的分布式仿真。每个 Hadoop 守护进程,例如 hdfs、yarn、MapReduce 等,都将作为单独的 java 进程运行。此模式适用于开发。

  3. Fully Distributed Mode − 此模式分布广泛,至少有两台或多台机器作为集群。我们将在后面的章节中详细了解此模式。

Installing Hadoop in Standalone Mode

这里我们将讨论在独立模式下安装 Hadoop 2.4.1

没有正在运行的守护进程,所有内容都在单个 JVM 中运行。独立模式适用于在开发过程中运行 MapReduce 程序,因为它易于测试和调试。

Setting Up Hadoop

您可以通过将以下命令追加到 ~/.bashrc 文件来设置 Hadoop 环境变量。

export HADOOP_HOME=/usr/local/hadoop

在继续之前,您需要确保 Hadoop 正常工作。只需发出以下命令 −

$ hadoop version

如果您的设置一切正常,那么您应该看到以下结果 −

Hadoop 2.4.1
Subversion https://svn.apache.org/repos/asf/hadoop/common -r 1529768
Compiled by hortonmu on 2013-10-07T06:28Z
Compiled with protoc 2.5.0
From source with checksum 79e53ce7994d1628b240f09af91e1af4

这意味着您的 Hadoop 独立模式设置正常工作。默认情况下,Hadoop 配置为在单台机器上以非分布式方式运行。

Example

让我们查看一个 Hadoop 的简单示例。Hadoop 安装提供了以下示例 MapReduce jar 文件,它提供了 MapReduce 的基本功能,且可用于计算,如 Pi 值、给定文件列表中的单词计数等。

$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar

让我们有一个输入目录,我们将向其中推送几个文件,我们的要求是对这些文件中的单词总数进行计数。要计算单词总数,我们无需编写自己的 MapReduce,前提是 .jar 文件包含单词计数的实现。您可以使用同一个 .jar 文件尝试其他示例;仅需发出以下命令,即可检查 hadoop-mapreduce-examples-2.2.0.jar 文件支持的 MapReduce 函数程序。

$ hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduceexamples-2.2.0.jar

Step 1

在输入目录中创建临时内容文件。您可以在希望工作的任何位置创建这个输入目录。

$ mkdir input
$ cp $HADOOP_HOME/*.txt input
$ ls -l input

它将在您的输入目录中给出以下文件 −

total 24
-rw-r--r-- 1 root root 15164 Feb 21 10:14 LICENSE.txt
-rw-r--r-- 1 root root   101 Feb 21 10:14 NOTICE.txt
-rw-r--r-- 1 root root  1366 Feb 21 10:14 README.txt

这些文件已从 Hadoop 安装主目录中复制。在您的实验中,您可以拥有不同且较大型的文件集。

Step 2

让我们开始 Hadoop 进程,以计算输入目录中所有可用文件中的单词总数,如下所示 −

$ hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduceexamples-2.2.0.jar  wordcount input output

Step 3

步骤 2 将执行所需的处理,并将输出保存在 output/part-r00000 文件中,您可使用 − 进行检查

$cat output/*

它将列出输入目录中所有可用文件中的所有单词及其总计数。

"AS      4
"Contribution" 1
"Contributor" 1
"Derivative 1
"Legal 1
"License"      1
"License");     1
"Licensor"      1
"NOTICE”        1
"Not      1
"Object"        1
"Source”        1
"Work”    1
"You"     1
"Your")   1
"[]"      1
"control"       1
"printed        1
"submitted"     1
(50%)     1
(BIS),    1
(C)       1
(Don't)   1
(ECCN)    1
(INCLUDING      2
(INCLUDING,     2
.............

Installing Hadoop in Pseudo Distributed Mode

按照以下步骤在伪分布模式下安装 Hadoop 2.4.1。

Step 1 − Setting Up Hadoop

您可以通过将以下命令追加到 ~/.bashrc 文件来设置 Hadoop 环境变量。

export HADOOP_HOME=/usr/local/hadoop
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME

export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin
export HADOOP_INSTALL=$HADOOP_HOME

现在将所有更改应用到当前正在运行的系统中。

$ source ~/.bashrc

Step 2 − Hadoop Configuration

您可以在 “$HADOOP_HOME/etc/hadoop” 位置找到所有 Hadoop 配置文件。根据您的 Hadoop 基础架构,需要更改这些配置文件中的内容。

$ cd $HADOOP_HOME/etc/hadoop

为使用 java 开发 Hadoop 程序,您必须在 hadoop-env.sh 文件中通过将 JAVA_HOME 值替换为系统中 java 的位置来重置 java 环境变量。

export JAVA_HOME=/usr/local/jdk1.7.0_71

以下是要编辑以配置 Hadoop 的文件列表。

core-site.xml

core-site.xml 文件包含信息,例如用于 Hadoop 实例的端口号、分配给文件系统内存、用于存储数据的内存限制和读/写缓冲区的大小。

打开 core-site.xml,并在 <configuration>、</configuration> 标记之间添加以下属性。

<configuration>
   <property>
      <name>fs.default.name</name>
      <value>hdfs://localhost:9000</value>
   </property>
</configuration>

hdfs-site.xml

hdfs-site.xml 文件包含有关复制数据的值、名称节点路径和本地文件系统的 DataNode 路径的信息。也就是说,您想将 Hadoop 基础架构存储在什么位置。

让我们假设以下数据。

dfs.replication (data replication value) = 1

(In the below given path /hadoop/ is the user name.
hadoopinfra/hdfs/namenode is the directory created by hdfs file system.)
namenode path = //home/hadoop/hadoopinfra/hdfs/namenode

(hadoopinfra/hdfs/datanode is the directory created by hdfs file system.)
datanode path = //home/hadoop/hadoopinfra/hdfs/datanode

打开此文件,并在该文件中的 <configuration> </configuration> 标记之间添加以下属性。

<configuration>
   <property>
      <name>dfs.replication</name>
      <value>1</value>
   </property>

   <property>
      <name>dfs.name.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/namenode </value>
   </property>

   <property>
      <name>dfs.data.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/datanode </value>
   </property>
</configuration>

Note − 在上述文件中,所有属性值都是用户定义的,您可以根据 Hadoop 基础架构进行更改。

yarn-site.xml

此文件用于将 Yarn 配置到 Hadoop 中。打开 yarn-site.xml 文件并在该文件中的 <configuration>、</configuration> 标记之间添加以下属性。

<configuration>
   <property>
      <name>yarn.nodemanager.aux-services</name>
      <value>mapreduce_shuffle</value>
   </property>
</configuration>

mapred-site.xml

此文件用于指定我们使用的 MapReduce 框架。默认情况下,Hadoop 包含 yarn-site.xml 的模板。首先,需要使用以下命令将文件从 mapred-site.xml.template 复制到 * mapred-site.xml* 文件。

$ cp mapred-site.xml.template mapred-site.xml

打开 mapred-site.xml 文件,并在该文件中的 <configuration>、</configuration> 标记之间添加以下属性。

<configuration>
   <property>
      <name>mapreduce.framework.name</name>
      <value>yarn</value>
   </property>
</configuration>

Verifying Hadoop Installation

以下步骤用于验证 Hadoop 安装。

Step 1 − Name Node Setup

使用命令 “hdfs namenode -format” 设置名称节点,如下所示。

$ cd ~
$ hdfs namenode -format

预期结果如下所示。

10/24/14 21:30:55 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = localhost/192.168.1.11
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 2.4.1
...
...
10/24/14 21:30:56 INFO common.Storage: Storage directory
/home/hadoop/hadoopinfra/hdfs/namenode has been successfully formatted.
10/24/14 21:30:56 INFO namenode.NNStorageRetentionManager: Going to
retain 1 images with txid >= 0
10/24/14 21:30:56 INFO util.ExitUtil: Exiting with status 0
10/24/14 21:30:56 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at localhost/192.168.1.11
************************************************************/

Step 2 − Verifying Hadoop dfs

以下命令用于启动 DFS。执行此命令将启动您的 Hadoop 文件系统。

$ start-dfs.sh

预期输出如下所示 −

10/24/14 21:37:56
Starting namenodes on [localhost]
localhost: starting namenode, logging to /home/hadoop/hadoop
2.4.1/logs/hadoop-hadoop-namenode-localhost.out
localhost: starting datanode, logging to /home/hadoop/hadoop
2.4.1/logs/hadoop-hadoop-datanode-localhost.out
Starting secondary namenodes [0.0.0.0]

Step 3 − Verifying Yarn Script

以下命令用于启动 Yarn 脚本。执行此命令将启动您的 Yarn 守护程序。

$ start-yarn.sh

预期输出如下所示 −

starting yarn daemons
starting resourcemanager, logging to /home/hadoop/hadoop
2.4.1/logs/yarn-hadoop-resourcemanager-localhost.out
localhost: starting nodemanager, logging to /home/hadoop/hadoop
2.4.1/logs/yarn-hadoop-nodemanager-localhost.out

Step 4 − Accessing Hadoop on Browser

访问 Hadoop 的默认端口号为 50070。使用以下网址在浏览器上获取 Hadoop 服务。

http://localhost:50070/
hadoop on browser

Step 5 − Verify All Applications for Cluster

访问集群所有应用程序的默认端口号为 8088。使用以下网址访问此服务。

http://localhost:8088/
hadoop application cluster

Hadoop - HDFS Overview

Hadoop 文件系统是使用分布式文件系统设计开发的。它在商品硬件上运行。与其他分布式系统不同,HDFS 的容错性很高,并且设计为使用低成本硬件。

HDFS 拥有非常大量的数据,并且提供了更轻松的访问方式。为了存储如此庞大的数据,这些文件被存储在多台机器上。这些文件以冗余的方式存储,以在发生故障时防止系统出现可能的数据丢失。HDFS 还会使应用程序可用于并行处理。

Features of HDFS

  1. 它适用于分布式存储和处理。

  2. Hadoop 提供了一个命令界面来与 HDFS 进行交互。

  3. 名称节点和数据节点的内置服务器帮助用户轻松检查集群状态。

  4. 流式访问文件系统数据。

  5. HDFS 提供文件权限和身份验证。

HDFS Architecture

下面给出了 Hadoop 文件系统的体系结构。

hdfs architecture

HDFS 遵循主从架构,它具有以下元素。

Namenode

名称节点是包含 GNU/Linux 操作系统和名称节点软件的商品硬件。它是一种可以在商品硬件上运行的软件。拥有名称节点的系统充当主服务器,并执行以下任务:

  1. 管理文件系统命名空间。

  2. 规范客户端对文件的访问。

  3. 同样,它执行文件系统操作,诸如重命名、关闭以及打开文件和目录。

Datanode

数据节点是使用 GNU/Linux 操作系统和数据节点软件的商品硬件。在集群中的每个节点(商品硬件/系统)中,将有一个数据节点。这些节点管理其系统的存储数据。

  1. 数据节点根据客户端请求在文件系统上执行读写操作。

  2. 它们还按照名称节点的说明执行诸如块创建、删除和复制的操作。

Block

通常,用户数据存储在 HDFS 的文件中。文件系统中的文件将被划分为一个或多个段或者存储在各个数据节点中。这些文件段被称为块。换句话说,HDFS 可以读取或写入的最小数据量称为一个块。默认块大小是 64MB,但是可以根据需要增加 HDFS 配置中的块大小。

Goals of HDFS

Fault detection and recovery − 由于 HDFS 包含大量商品硬件,所以组件故障经常出现。因此,HDFS 应具备快速的自动故障检测和恢复机制。

Huge datasets − HDFS 每集群应包含数百个节点来管理具有庞大数据集的应用程序。

Hardware at data − 当计算在数据附近发生时,可以高效地完成请求的任务。尤其是在涉及巨大数据集时,它可以减少网络通信并增加吞吐量。

Hadoop - HDFS Operations

Starting HDFS

一开始,您必须格式化配置好的 HDFS 文件系统,打开名称节点(HDFS 服务器),然后执行以下命令。

$ hadoop namenode -format

格式化 HDFS 后,启动分布式文件系统。以下命令将启动名称节点以及作为集群的数据节点。

$ start-dfs.sh

Listing Files in HDFS

将信息加载到服务器后,我们可以使用 ‘ls’ 查找目录中的文件列表、文件状态。以下是 ls 的语法,您可以将其作为参数传递给目录或文件名。

$ $HADOOP_HOME/bin/hadoop fs -ls <args>

Inserting Data into HDFS

假设我们在本地系统中名为 file.txt 的文件中具有数据,该数据应该保存在 hdfs 文件系统中。按照以下步骤将所需文件插入到 Hadoop 文件系统中。

Step 1

您必须创建一个输入目录。

$ $HADOOP_HOME/bin/hadoop fs -mkdir /user/input

Step 2

使用 put 命令将数据文件从本地系统传输并存储到 Hadoop 文件系统。

$ $HADOOP_HOME/bin/hadoop fs -put /home/file.txt /user/input

Step 3

您可以使用 ls 命令验证文件。

$ $HADOOP_HOME/bin/hadoop fs -ls /user/input

Retrieving Data from HDFS

假设我们在 HDFS 中有一个名为 outfile 的文件。以下是从 Hadoop 文件系统中检索所需文件的简单演示。

Step 1

最初,使用 cat 命令从 HDFS 查看数据。

$ $HADOOP_HOME/bin/hadoop fs -cat /user/output/outfile

Step 2

使用 get 命令将文件从 HDFS 获取到本地文件系统。

$ $HADOOP_HOME/bin/hadoop fs -get /user/output/ /home/hadoop_tp/

Shutting Down the HDFS

您可以使用以下命令关闭 HDFS。

$ stop-dfs.sh

Hadoop - Command Reference

"$HADOOP_HOME/bin/hadoop fs" 中还有更多命令未在此处演示,尽管这些基本操作将帮助您入门。不带任何附加参数运行 ./bin/hadoop dfs 将列出 FsShell 系统可以运行的所有命令。此外,如果您遇到困难,*$HADOOP_HOME/bin/hadoop fs -help * commandName 命令将显示有关所讨论操作的简短使用方法摘要。

所有操作的表格如下所示。针对参数使用了以下约定 -

"<path>" means any file or directory name.
"<path>..." means one or more file or directory names.
"<file>" means any filename.
"<src>" and "<dest>" are path names in a directed operation.
"<localSrc>" and "<localDest>" are paths as above, but on the local file system.

所有其他文件和路径名称指的是 HDFS 内的对象。

Sr.No

Command & Description

1

-ls &lt;path&gt; 列出 path 指定的目录的内容,显示每个条目的名称、权限、所有者、大小和修改日期。

2

-lsr &lt;path&gt; 类似于 -ls,但递归显示 path 所有子目录中的条目。

3

-du &lt;path&gt; 以字节为单位显示与 path 匹配的所有文件的磁盘使用情况;文件名使用完整 HDFS 协议前缀报告。

4

-dus &lt;path&gt; 像 -du,但会输出路径中所有文件/目录的磁盘使用情况摘要。

5

-mv &lt;src&gt;&lt;dest&gt; 在 HDFS 中将源表示的文件或目录移到目标。

6

-cp &lt;src&gt; &lt;dest&gt; 在 HDFS 中将源标识的文件或目录复制到目标。

7

-rm &lt;path&gt; 删除路径标识的文件或空目录。

8

-rmr &lt;path&gt; 删除路径标识的文件或目录。递归删除任何子项(即路径的文件或子目录)。

9

-put &lt;localSrc&gt; &lt;dest&gt; 将本地文件系统中 localSrc 标识的文件或目录复制到 DFS 内的 dest。

10

-copyFromLocal &lt;localSrc&gt; &lt;dest&gt; 与 -put 相同

11

-moveFromLocal &lt;localSrc&gt; &lt;dest&gt; 与 -put 相同,但会在成功后删除本地副本。

12

-get [-crc] &lt;src&gt; &lt;localDest&gt; 将 HDFS 中 src 标识的文件或目录复制到 localDest 标识的本地文件系统路径。

13

-getmerge &lt;src&gt; &lt;localDest&gt; 检索与 HDFS 中路径 src 匹配的所有文件,并将它们复制到本地文件系统中 localDest 标识的单个合并文件中。

14

-cat &lt;filen-ame&gt; 将 filename 的内容显示在 stdout 上。

15

-copyToLocal &lt;src&gt; &lt;localDest&gt; 与 get 相同

16

-moveToLocal &lt;src&gt; &lt;localDest&gt; 与 get 类似,但在成功后会删除 HDFS 副本。

17

-mkdir &lt;path&gt; 在 HDFS 中创建一个名为路径的目录。在路径中创建任何不存在的父目录(例如,Linux 中的 mkdir -p)。

18

-setrep [-R] [-w] rep &lt;path&gt; 将路径标识的文件的目标复制因子设置为 rep。(实际的复制因子将随着时间的推移向目标移动)

19

-touchz &lt;path&gt; 在包含当前时间的路径处创建一个文件,作为时间戳。如果路径处已存在文件,则失败,除非文件大小为 0。

20

-test -[ezd] &lt;path&gt; 如果存在路径、长度为零、是目录,则返回 1;否则返回 0。

21

-stat [format] &lt;path&gt; 打印有关路径的信息。格式是一个字符串,接受以块为单位的文件大小(%b)、文件名(%n)、块大小(%o)、复制(%r)和修改日期(%y,%Y)。

22

-tail [-f] &lt;file2name&gt; 在 stdout 上显示文件最后 1KB。

23

-chmod [-R] mode,mode,&#8230;&#8203; &lt;path&gt;&#8230;&#8203; 更改由路径标识的一个或多个对象相关联的文件权限…通过 R 递归执行更改。模式为 3 位八进制模式或 {augo}+/-{rwxX}。在未指定范围且不应用 umask的情况下,它会假定。

24

-chown [-R] [owner][:[group]] &lt;path&gt;&#8230;&#8203; 设置由路径标识的文件或目录的所有用户和/或组…如果指定 -R,则递归设置所有者。

25

-chgrp [-R] group &lt;path&gt;&#8230;&#8203; 设置由路径标识的文件或目录的所有组…如果指定 -R,则递归设置组。

26

-help &lt;cmd-name&gt; 返回上面列出的其中一个命令的使用信息。您必须忽略 cmd 中的前导'-'字符。

Hadoop - MapReduce

MapReduce是一个框架,我们可以使用该框架编写应用程序来并行处理海量数据,在大规模商品硬件集群上,以可靠的方式处理数据。

What is MapReduce?

MapReduce是一种基于java的分布式计算的处理技术和程序模型。MapReduce算法包含两个重要任务,即Map和Reduce。Map获取一组数据并将它转换为另一组数据,其中各个元素被分解成元组(键/值对)。其次是reduce任务,它将map的输出作为输入,并将这些数据元组组合成更小的一组元组。正如MapReduce名称的顺序所暗示的那样,reduce任务总是在map作业之后执行。

MapReduce的主要优势在于,可以在多个计算节点上轻松扩展数据处理。在MapReduce模型中,数据处理原语称为映射器和还原器。将数据处理应用程序分解成映射器和还原器有时并非易事。但是,一旦我们在MapReduce形式中编写了一个应用程序,将应用程序扩展到在一个集群中成百上千甚至数万台机器上运行仅仅是一次配置更改。这种简单的可扩展性吸引了许多程序员使用MapReduce模型。

The Algorithm

  1. 通常,MapReduce范例基于将计算机发送到数据所在的位置!

  2. MapReduce程序执行三个阶段,分别是map阶段、shuffle阶段和reduce阶段。 Map stage - map或mapper的任务是处理输入数据。通常,输入数据以文件或目录的形式存在,并存储在Hadoop文件系统(HDFS)中。输入文件按行传递到mapper函数。mapper处理数据并创建许多小块数据。 Reduce stage - 此阶段是 * Shuffle * 阶段和 Reduce 阶段的组合。Reducer的任务是处理来自mapper的数据。处理后,它会生成一组新输出,将存储在HDFS中。

  3. 在 MapReduce 作业期间,Hadoop 会将映射和缩减任务发送到集群中的适当服务器。

  4. 该框架管理数据传递的所有详细信息,诸如发布任务、验证任务完成和在节点之间复制集群中的数据。

  5. 大多数计算发生在本地磁盘上有数据的节点上,这减少了网络流量。

  6. 在给定任务完成后,集群会收集并缩减数据以形成适当的结果,然后将其发送回 Hadoop 服务器。

mapreduce algorithm

Inputs and Outputs (Java Perspective)

MapReduce 框架使用 <key, value> 对工作,即,该框架将作业的输入视为一组 <key, value> 对,并且将一组 <key, value> 对作为作业的输出,可能是不同类型的。

密钥和值类应该是通过该框架串行化的,因此,需要实现 Writable 接口。此外,关键类必须实现 Writable-Comparable 接口以方便框架进行排序。 MapReduce job 的输入和输出类型——(输入) <k1, v1> → 映射 → <k2, v2> → 缩减 → <k3, v3>(输出)。

Input

Output

Map

<k1, v1>

list (<k2, v2>)

Reduce

<k2, list(v2)>

list (<k3, v3>)

Terminology

  1. PayLoad - 应用程序实现映射和缩减函数,并构成作业的核心。

  2. Mapper - 映射器将输入键/值对映射到一组中间键/值对。

  3. NamedNode - 管理 Hadoop 分布式文件系统 (HDFS) 的节点。

  4. DataNode - 在进行任何处理之前,提前提供数据的节点。

  5. MasterNode - JobTracker 运行并接受来自客户端的作业请求的节点。

  6. SlaveNode - 映射和缩减程序运行的节点。

  7. JobTracker - 调度作业并跟踪将作业分配给任务跟踪器。

  8. Task Tracker - 跟踪任务并将状态报告给 JobTracker。

  9. Job - 程序是在数据集上执行映射器和缩减器。

  10. Task - 在数据切片上执行映射器或缩减器的执行。

  11. Task Attempt - 在 SlaveNode 上尝试执行任务的特定实例。

Example Scenario

下面提供了一个组织的用电量数据。它包含月用电量和各个年份的年平均用电量。

Jan

Feb

Mar

Apr

May

Jun

Jul

Aug

Sep

Oct

Nov

Dec

Avg

1979

23

23

2

43

24

25

26

26

26

26

25

26

25

1980

26

27

28

28

28

30

31

31

31

30

30

30

29

1981

31

32

32

32

33

34

35

36

36

34

34

34

34

1984

39

38

39

39

39

41

42

43

40

39

38

38

40

1985

38

39

39

39

39

41

41

41

00

40

39

39

45

如果输入了上述数据,我们必须编写应用程序来处理它并产生结果,例如找到使用量最大的年份、使用量最小的年份,等等。这对具有有限数量记录的程序员来说是小菜一碟。他们只需编写逻辑以生成所需输出,并将数据传递给编写的应用程序。

但请想到自成立以来某一特定州的所有大规模行业的用电量所代表的数据。

当我们编写应用程序来处理此类数据时,

  1. 它们执行需要很长时间。

  2. 当我们将数据从源移动到网络服务器时会造成大量的网络流量。

为了解决这些问题,我们有MapReduce框架。

Input Data

上述数据保存为*sample.txt*并作为输入提供。输入文件如下图所示。

1979   23   23   2   43   24   25   26   26   26   26   25   26  25
1980   26   27   28  28   28   30   31   31   31   30   30   30  29
1981   31   32   32  32   33   34   35   36   36   34   34   34  34
1984   39   38   39  39   39   41   42   43   40   39   38   38  40
1985   38   39   39  39   39   41   41   41   00   40   39   39  45

Example Program

下面提供了使用MapReduce框架处理示例数据的程序。

package hadoop;

import java.util.*;

import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits {
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable ,/*Input key Type */
   Text,                /*Input value Type*/
   Text,                /*Output key Type*/
   IntWritable>        /*Output value Type*/
   {
      //Map function
      public void map(LongWritable key, Text value,
      OutputCollector<Text, IntWritable> output,

      Reporter reporter) throws IOException {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();

         while(s.hasMoreTokens()) {
            lasttoken = s.nextToken();
         }
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }

   //Reducer class
   public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {

      //Reduce function
      public void reduce( Text key, Iterator <IntWritable> values,
      OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
         int maxavg = 30;
         int val = Integer.MIN_VALUE;

         while (values.hasNext()) {
            if((val = values.next().get())>maxavg) {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }

   //Main function
   public static void main(String args[])throws Exception {
      JobConf conf = new JobConf(ProcessUnits.class);

      conf.setJobName("max_eletricityunits");
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);

      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));

      JobClient.runJob(conf);
   }
}

将上述程序保存为 ProcessUnits.java. 程序的编译和执行在下面进行了说明。

Compilation and Execution of Process Units Program

让我们假设我们处于Hadoop用户的home目录下(如/home/hadoop)。

按照以下步骤编译和执行上述程序。

Step 1

以下命令用于创建存储已编译Java类的目录。

$ mkdir units

Step 2

下载用于编译和执行MapReduce程序的 Hadoop-core-1.2.1.jar, 。访问以下链接 mvnrepository.com 下载jar包。我们假设下载的文件夹是 /home/hadoop/.

Step 3

以下命令用于编译 ProcessUnits.java 程序并创建该程序的jar包。

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .

Step 4

以下命令用于在HDFS中创建输入目录。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5

以下命令用于将名为*sample.txt*的输入文件复制到HDFS的输入目录中。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Step 6

以下命令用于验证输入目录中的文件。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7

以下命令用于通过从输入目录中获取输入文件来运行Eleunit_max应用程序。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

等待一段时间,直到文件执行完成。在执行之后,结果将包含输入拆分的数量、Map任务的数量、reducer任务的数量等,如下所示。

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49
   File System Counters

FILE: Number of bytes read = 61
FILE: Number of bytes written = 279400
FILE: Number of read operations = 0
FILE: Number of large read operations = 0
FILE: Number of write operations = 0
HDFS: Number of bytes read = 546
HDFS: Number of bytes written = 40
HDFS: Number of read operations = 9
HDFS: Number of large read operations = 0
HDFS: Number of write operations = 2 Job Counters


   Launched map tasks = 2
   Launched reduce tasks = 1
   Data-local map tasks = 2
   Total time spent by all maps in occupied slots (ms) = 146137
   Total time spent by all reduces in occupied slots (ms) = 441
   Total time spent by all map tasks (ms) = 14613
   Total time spent by all reduce tasks (ms) = 44120
   Total vcore-seconds taken by all map tasks = 146137
   Total vcore-seconds taken by all reduce tasks = 44120
   Total megabyte-seconds taken by all map tasks = 149644288
   Total megabyte-seconds taken by all reduce tasks = 45178880

Map-Reduce Framework

   Map input records = 5
   Map output records = 5
   Map output bytes = 45
   Map output materialized bytes = 67
   Input split bytes = 208
   Combine input records = 5
   Combine output records = 5
   Reduce input groups = 5
   Reduce shuffle bytes = 6
   Reduce input records = 5
   Reduce output records = 5
   Spilled Records = 10
   Shuffled Maps  = 2
   Failed Shuffles = 0
   Merged Map outputs = 2
   GC time elapsed (ms) = 948
   CPU time spent (ms) = 5160
   Physical memory (bytes) snapshot = 47749120
   Virtual memory (bytes) snapshot = 2899349504
   Total committed heap usage (bytes) = 277684224

File Output Format Counters

   Bytes Written = 40

Step 8

以下命令用于验证输出文件夹中的结果文件。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9

以下命令可用于查看 * Part-00000 * 文件中的输出。该文件由 HDFS 生成。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下是 MapReduce 程序生成的输出。

1981    34
1984    40
1985    45

Step 10

以下命令可用于将输出文件夹从 HDFS 复制到本地文件系统进行分析。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop

Important Commands

所有 Hadoop 命令都可使用 $HADOOP_HOME/bin/hadoop 命令调用。在不带任何参数的情况下运行 Hadoop 脚本将打印所有命令的描述。

Usage − hadoop [--config confdir] COMMAND

下表列出了可用选项及其说明。

Sr.No.

Option & Description

1

namenode -format 格式化 DFS 文件系统。

2

secondarynamenode 运行 DFS 次要 NameNode。

3

namenode Runs the DFS namenode.

4

datanode Runs a DFS datanode.

5

dfsadmin 运行 DFS 管理客户机。

6

mradmin 运行 Map-Reduce 管理客户机。

7

fsck 运行 DFS 文件系统检查实用程序。

8

fs 运行通用文件系统用户客户端。

9

balancer 运行集群平衡实用程序。

10

oiv 将离线 fsimage 查看器应用于 fsimage。

11

fetchdt 从 NameNode 提取委托标记。

12

jobtracker 运行 MapReduce 作业追踪器节点。

13

pipes Runs a Pipes job.

14

tasktracker 运行 MapReduce 任务追踪器节点。

15

historyserver 将作业历史记录服务器作为独立守护程序运行。

16

job Manipulates the MapReduce jobs.

17

queue Gets information regarding JobQueues.

18

version Prints the version.

19

jar &lt;jar&gt; 运行 jar 文件。

20

distcp &lt;srcurl&gt; &lt;desturl&gt; 递归复制文件或目录。

21

distcp2 &lt;srcurl&gt; &lt;desturl&gt; DistCp 版本 2。

22

archive -archiveName NAME -p &lt;parent path&gt; &lt;src&gt; <dest>*创建 hadoop 存档。

23

classpath 打印获取 Hadoop jar 和所需库所需的类路径。

24

daemonlog 获取/设置每个守护程序的日志级别

How to Interact with MapReduce Jobs

用法 - hadoop job [GENERIC_OPTIONS]

以下是 Hadoop 作业中可用的通用选项。

Sr.No.

GENERIC_OPTION & Description

1

-submit <job-file> Submits the job.

2

-status &lt;job-id&gt; 打印映射和归约完成百分比以及所有作业计数器。

3

-counter &lt;job-id&gt; &lt;group-name&gt; &lt;countername&gt; 打印计数器值。

4

-kill <job-id> Kills the job.

5

-events &lt;job-id&gt; &lt;fromevent-&gt; &lt;-of-events&gt; 打印作业跟踪器在给定范围内收到的事件详细信息。

6

-history [all] &lt;jobOutputDir&gt; - history &lt; jobOutputDir&gt; 打印作业详细信息、失败和已终止提示详细信息。可以通过指定 [all] 选项查看作业的更多详细信息,例如每个任务的成功任务和任务尝试。

7

-list[all] 显示所有作业。-list 仅显示尚未完成的作业。

8

-kill-task &lt;task-id&gt; 终止任务。已终止的任务不会计入失败的尝试。

9

-fail-task &lt;task-id&gt; 使任务失败。失败的任务会算入失败的尝试。

10

-set-priority &lt;job-id&gt; &lt;priority&gt; 更改作业的优先级。允许的优先级值为 VERY_HIGH、HIGH、NORMAL、LOW、VERY_LOW

To see the status of job

$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004

To see the history of job output-dir

$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME>
e.g.
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output

To kill the job

$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID>
e.g.
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004

Hadoop - Streaming

Hadoop 流式处理是 Hadoop 发行版中附带的一个实用工具。此实用工具允许你使用任何可执行文件或脚本作为映射器和/或规约器来创建和运行 Map/Reduce 作业。

Example Using Python

对于 Hadoop 流式处理,我们考虑单词计数问题。Hadoop 中的任何作业都必须具有两个阶段:映射器和规约器。我们已用 Python 脚本为映射器和规约器编写了代码,以便在 Hadoop 下运行它。你也可以用 Perl 和 Ruby 编写相同的代码。

Mapper Phase Code

!/usr/bin/python

import sys

# Input takes from standard input for myline in sys.stdin:
   # Remove whitespace either side
   myline = myline.strip()

   # Break the line into words
   words = myline.split()

   # Iterate the words list
   for myword in words:
      # Write the results to standard output
      print '%s\t%s' % (myword, 1)

确保此文件具有执行权限 (chmod +x /home/ expert/hadoop-1.2.1/mapper.py)。

Reducer Phase Code

#!/usr/bin/python

from operator import itemgetter
import sys

current_word = ""
current_count = 0
word = ""

# Input takes from standard input for myline in sys.stdin:
   # Remove whitespace either side
   myline = myline.strip()

   # Split the input we got from mapper.py word,
   count = myline.split('\t', 1)

   # Convert count variable to integer
   try:
      count = int(count)

   except ValueError:
      # Count was not a number, so silently ignore this line continue

   if current_word == word:
   current_count += count
   else:
      if current_word:
         # Write result to standard output print '%s\t%s' % (current_word, current_count)

      current_count = count
      current_word = word

# Do not forget to output the last word if needed!
if current_word == word:
   print '%s\t%s' % (current_word, current_count)

将映射器和规约器代码保存在 Hadoop 主目录中的 mapper.py 和 reducer.py 中。确保这些文件具有执行权限 (chmod +x mapper.py 和 chmod +x reducer.py)。由于 Python 对缩进很敏感,因此可以通过下面的链接下载相同的代码。

Execution of WordCount Program

$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.
2.1.jar \
   -input input_dirs \
   -output output_dir \
   -mapper <path/mapper.py \
   -reducer <path/reducer.py

使用“\”换行,以便于清晰阅读。

For Example,

./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py

How Streaming Works

在上述示例中,映射器和规约器都是从标准输入读取输入并向标准输出发送输出的 Python 脚本。此实用工具将创建一个 Map/Reduce 作业,将作业提交到适当的集群,并在作业完成之前监控作业进度。

当为映射器指定脚本时,每个映射器任务将在映射器初始化时作为单独的进程启动该脚本。随着映射器任务的运行,它会将其输入转换为行,并将这些行馈送给进程的标准输入 (STDIN)。同时,映射器从进程的标准输出 (STDOUT) 收集面向行的输出,并将每行转换成一个键值对,该键值对将作为映射器的输出被收集。默认情况下,一行开头到第一个制表符字符的前缀是键,该行的其余部分(不包括制表符字符)将是值。如果该行中没有制表符字符,则整行被视为键,而值为空。但是,可以根据需要进行自定义。

当为规约器指定脚本时,每个规约器任务都将作为单独的进程启动该脚本,然后初始化规约器。随着规约器任务的运行,它会将其输入键值对转换为行,并将这些行馈送给进程的标准输入 (STDIN)。同时,规约器从进程的标准输出 (STDOUT) 收集面向行的输出,将每行转换成一个键值对,该键值对将作为规约器的输出被收集。默认情况下,一行开头到第一个制表符字符的前缀是键,该行的其余部分(不包括制表符字符)是值。但是,可以根据具体要求进行自定义。

Important Commands

Parameters

Options

Description

-input directory/file-name

Required

Input location for mapper.

-output directory-name

Required

Output location for reducer.

-映射器可执行文件或脚本或 JavaClassName

Required

Mapper executable.

-规约器可执行文件或脚本或 JavaClassName

Required

Reducer executable.

-file file-name

Optional

使映射器、规约器或组合器可执行文件在计算节点上局部可用。

-inputformat JavaClassName

Optional

你提供的类应该返回 Text 类的键值对。如果未指定,则 TextInputFormat 将用作默认值。

-outputformat JavaClassName

Optional

你提供的类应采用文本类的键/值对。如果没有指定,则 TextOutputformat 将用作默认值。

-partitioner JavaClassName

Optional

确定一个键应发送到的减少方式的类。

-combiner streamingCommand or JavaClassName

Optional

用于地图输出的合并程序。

-cmdenv name=value

Optional

将环境变量传递到流式命令。

-inputreader

Optional

向后兼容:指定记录读取器类(而不是输入格式类)。

-verbose

Optional

Verbose output.

-lazyOutput

Optional

延迟创建输出。例如,如果输出格式基于 FileOutputFormat,则仅在首次调用 output.collect(或 Context.write)时才会创建输出文件。

-numReduceTasks

Optional

指定减速器数量。

-mapdebug

Optional

映射任务失败时要调用的脚本。

-reducedebug

Optional

减少任务失败时要调用的脚本。

Hadoop - Multi-Node Cluster

本章介绍了在分布式环境中设置 Hadoop 多节点群集。

由于无法演示整个集群,我们使用三个系统(一个主设备和两个从设备)解释 Hadoop 集群环境;以下是它们的 IP 地址。

  1. Hadoop Master: 192.168.1.15 (hadoop-master)

  2. Hadoop Slave: 192.168.1.16 (hadoop-slave-1)

  3. Hadoop Slave: 192.168.1.17 (hadoop-slave-2)

按照以下步骤设置 Hadoop 多节点集群。

Installing Java

Java 是 Hadoop 的主要先决条件。首先,您应该使用“java -version”验证您的系统中是否存在 java。java 版本命令的语法如下。

$ java -version

如果一切正常,它将为您提供以下输出。

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

如果您的系统中未安装 java,请按照以下步骤安装 java。

Step 1

访问以下链接下载 java (JDK <最新版本> - X64.tar.gz): www.oracle.com

然后 jdk-7u71-linux-x64.tar.gz 将下载到您的系统中。

Step 2

通常,您将在“下载”文件夹中找到下载的 java 文件。使用以下命令对其进行验证并解压缩 jdk-7u71-linux-x64.gz 文件。

$ cd Downloads/
$ ls
jdk-7u71-Linux-x64.gz

$ tar zxf jdk-7u71-Linux-x64.gz
$ ls
jdk1.7.0_71 jdk-7u71-Linux-x64.gz

Step 3

要使所有用户都能使用 java,您必须将其移至 “/usr/local/”的位置。打开 root 并键入以下命令。

$ su
password:
# mv jdk1.7.0_71 /usr/local/
# exit

Step 4

为设置 PATHJAVA_HOME 变量,将以下命令添加到 ~/.bashrc 文件。

export JAVA_HOME=/usr/local/jdk1.7.0_71
export PATH=PATH:$JAVA_HOME/bin

现在,如上所述,从终端验证 java -version 命令。按照上述过程在所有集群节点中安装 java。

Creating User Account

在主设备和从设备上创建一个系统用户帐户以使用 Hadoop 安装。

# useradd hadoop
# passwd hadoop

Mapping the nodes

您必须在所有节点上的 /etc/ 文件夹中编辑 hosts 文件,指定每个系统的 IP 地址以及它们的主机名。

# vi /etc/hosts
enter the following lines in the /etc/hosts file.

192.168.1.109 hadoop-master
192.168.1.145 hadoop-slave-1
192.168.56.1 hadoop-slave-2

Configuring Key Based Login

在每个节点中设置 ssh,以便它们无需任何密码提示即可相互通信。

# su hadoop
$ ssh-keygen -t rsa
$ ssh-copy-id -i ~/.ssh/id_rsa.pub tutorialspoint@hadoop-master
$ ssh-copy-id -i ~/.ssh/id_rsa.pub hadoop_tp1@hadoop-slave-1
$ ssh-copy-id -i ~/.ssh/id_rsa.pub hadoop_tp2@hadoop-slave-2
$ chmod 0600 ~/.ssh/authorized_keys
$ exit

Installing Hadoop

在主设备服务器中,使用以下命令下载并安装 Hadoop。

# mkdir /opt/hadoop
# cd /opt/hadoop/
# wget http://apache.mesi.com.ar/hadoop/common/hadoop-1.2.1/hadoop-1.2.0.tar.gz
# tar -xzf hadoop-1.2.0.tar.gz
# mv hadoop-1.2.0 hadoop
# chown -R hadoop /opt/hadoop
# cd /opt/hadoop/hadoop/

Configuring Hadoop

您必须通过进行如下所示的更改将 Hadoop 服务器进行配置。

core-site.xml

打开 core-site.xml 文件,并按如下所示进行编辑。

<configuration>
   <property>
      <name>fs.default.name</name>
      <value>hdfs://hadoop-master:9000/</value>
   </property>
   <property>
      <name>dfs.permissions</name>
      <value>false</value>
   </property>
</configuration>

hdfs-site.xml

打开 hdfs-site.xml 文件,并按如下所示进行编辑。

<configuration>
   <property>
      <name>dfs.data.dir</name>
      <value>/opt/hadoop/hadoop/dfs/name/data</value>
      <final>true</final>
   </property>

   <property>
      <name>dfs.name.dir</name>
      <value>/opt/hadoop/hadoop/dfs/name</value>
      <final>true</final>
   </property>

   <property>
      <name>dfs.replication</name>
      <value>1</value>
   </property>
</configuration>

mapred-site.xml

打开 mapred-site.xml 文件,并按如下所示进行编辑。

<configuration>
   <property>
      <name>mapred.job.tracker</name>
      <value>hadoop-master:9001</value>
   </property>
</configuration>

hadoop-env.sh

打开 hadoop-env.sh 文件,并按如下所示编辑 JAVA_HOME、HADOOP_CONF_DIR 和 HADOOP_OPTS。

Note − 根据您的系统配置设置 JAVA_HOME。

export JAVA_HOME=/opt/jdk1.7.0_17
export HADOOP_OPTS=-Djava.net.preferIPv4Stack=true
export HADOOP_CONF_DIR=/opt/hadoop/hadoop/conf

Installing Hadoop on Slave Servers

按照提供的命令,在所有从属服务器上安装Hadoop。

# su hadoop
$ cd /opt/hadoop
$ scp -r hadoop hadoop-slave-1:/opt/hadoop
$ scp -r hadoop hadoop-slave-2:/opt/hadoop

Configuring Hadoop on Master Server

打开主服务器并按照给定的命令配置它。

# su hadoop
$ cd /opt/hadoop/hadoop

Configuring Master Node

$ vi etc/hadoop/masters

hadoop-master

Configuring Slave Node

$ vi etc/hadoop/slaves

hadoop-slave-1
hadoop-slave-2

Format Name Node on Hadoop Master

# su hadoop
$ cd /opt/hadoop/hadoop
$ bin/hadoop namenode –format
11/10/14 10:58:07 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = hadoop-master/192.168.1.109
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 1.2.0
STARTUP_MSG: build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.2 -r 1479473;
compiled by 'hortonfo' on Mon May 6 06:59:37 UTC 2013
STARTUP_MSG: java = 1.7.0_71

************************************************************/
11/10/14 10:58:08 INFO util.GSet: Computing capacity for map BlocksMap
editlog=/opt/hadoop/hadoop/dfs/name/current/edits
………………………………………………….
………………………………………………….
………………………………………………….
11/10/14 10:58:08 INFO common.Storage: Storage directory
/opt/hadoop/hadoop/dfs/name has been successfully formatted.
11/10/14 10:58:08 INFO namenode.NameNode:
SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at hadoop-master/192.168.1.15
************************************************************/

Starting Hadoop Services

以下命令用于在Hadoop主服务器上启动所有Hadoop服务。

$ cd $HADOOP_HOME/sbin
$ start-all.sh

Adding a New DataNode in the Hadoop Cluster

以下是为Hadoop集群添加新节点的步骤。

Networking

为现有Hadoop集群添加新节点,并使用一些适当的网络配置。假设以下网络配置。

对于新节点配置−

IP address : 192.168.1.103
netmask : 255.255.255.0
hostname : slave3.in

Adding User and SSH Access

Add a User

在新节点上,添加“hadoop”用户并使用以下命令将Hadoop用户的密码设置为“hadoop123”或您想要的任何密码。

useradd hadoop
passwd hadoop

建立从主节点到新从属节点的非密码连接。

Execute the following on the master

mkdir -p $HOME/.ssh
chmod 700 $HOME/.ssh
ssh-keygen -t rsa -P '' -f $HOME/.ssh/id_rsa
cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys
chmod 644 $HOME/.ssh/authorized_keys
Copy the public key to new slave node in hadoop user $HOME directory
scp $HOME/.ssh/id_rsa.pub hadoop@192.168.1.103:/home/hadoop/

Execute the following on the slaves

登录到Hadoop。如果没有,请登录到Hadoop用户。

su hadoop ssh -X hadoop@192.168.1.103

将公钥的内容复制到文件 "$HOME/.ssh/authorized_keys" 中,然后通过执行以下命令更改相同文件的权限。

cd $HOME
mkdir -p $HOME/.ssh
chmod 700 $HOME/.ssh
cat id_rsa.pub >>$HOME/.ssh/authorized_keys
chmod 644 $HOME/.ssh/authorized_keys

从主计算机检查ssh登录。现在,检查是否可以从主节点通过ssh访问新节点,而无需密码。

ssh hadoop@192.168.1.103 or hadoop@slave3

Set Hostname of New Node

您可以在文件 /etc/sysconfig/network 中设置主机名。

On new slave3 machine

NETWORKING = yes
HOSTNAME = slave3.in

为了使更改生效,请重新启动计算机或使用各自的主机名对新计算机运行主机名命令(重新启动是一个好选择)。

在slave3节点计算机上−

主机名slave3.in

使用以下行更新群集所有计算机上的 /etc/hosts

192.168.1.102 slave3.in slave3

现在尝试使用主机名ping计算机,以检查是否解析为IP。

在新节点计算机上−

ping master.in

Start the DataNode on New Node

使用 $HADOOP_HOME/bin/hadoop-daemon.sh script 手动启动dataNode守护程序。它将自动联系主(NameNode)并加入群集。我们还应该将新节点添加到主服务器中的conf/slaves文件中。基于脚本的命令将识别新的节点。

Login to new node

su hadoop or ssh -X hadoop@192.168.1.103

Start HDFS on a newly added slave node by using the following command

./bin/hadoop-daemon.sh start datanode

Check the output of jps command on a new node. It looks as follows.

$ jps
7141 DataNode
10312 Jps

Removing a DataNode from the Hadoop Cluster

在运行时,我们可以在不丢失任何数据的情况下从集群中删除一个节点。HDFS 提供了一个解除配置的功能,它确保以安全的方式删除节点。要使用该功能,请按照以下步骤操作:

Step 1 − Login to master

登录到已安装 Hadoop 的主计算机用户。

$ su hadoop

Step 2 − Change cluster configuration

必须在启动集群之前配置一个排除文件。向我们的 ` $HADOOP_HOME/etc/hadoop/hdfs-site.xml ` 文件中添加一个名为 dfs.hosts.exclude 的键。与此键关联的值提供了一个文件在 NameNode 本地文件系统上的完整路径,此文件包含不被允许连接到 HDFS 的计算机列表。

例如,将这些行添加到 ` etc/hadoop/hdfs-site.xml ` 文件。

<property>
   <name>dfs.hosts.exclude</name>
   <value>/home/hadoop/hadoop-1.2.1/hdfs_exclude.txt</value>
   <description>DFS exclude</description>
</property>

Step 3 − Determine hosts to decommission

每个需要解除配置的计算机都应添加到 hdfs_exclude.txt 标识的文件中,每行一个域名。这会阻止它们连接到 NameNode。如果你想删除 DataNode2,` "/home/hadoop/hadoop-1.2.1/hdfs_exclude.txt" ` 文件的内容如下所示:

slave2.in

Step 4 − Force configuration reload

在不加引号的情况下运行命令 ` "$HADOOP_HOME/bin/hadoop dfsadmin -refreshNodes" `。

$ $HADOOP_HOME/bin/hadoop dfsadmin -refreshNodes

这将强制 NameNode 重新读取其配置,包括新更新的“排除”文件。它会在一段时间内解除配置节点,从而为每个节点的块复制到计划保持活动状态的计算机上提供时间。

在 ` slave2.in ` 中,检查 jps 命令输出。一段时间后,你将看到 DataNode 进程自动关闭。

Step 5 − Shutdown nodes

解除配置过程完成后,可以安全地关闭已解除配置的硬件以进行维护。运行 dfsadmin 报告命令以检查解除配置的状态。以下命令将描述解除配置节点的状态和连接到该集群的节点。

$ $HADOOP_HOME/bin/hadoop dfsadmin -report

Step 6 − Edit excludes file again

机器解除配置后,可以从“排除”文件中删除它们。再次运行 ` "$HADOOP_HOME/bin/hadoop dfsadmin -refreshNodes" ` 将把排除文件读回到 NameNode 中;允许 DataNode 在维护完成后重新加入该集群,或者在该集群再次需要额外容量时重新加入该集群,等等。

` Special Note ` - 如果遵循以上过程,并且 tasktracker 进程仍在该节点上运行,则需要将其关闭。一种方法是像在上述步骤中所做的那样断开计算机连接。主计算机将自动识别该进程并宣布该进程已死亡。无需遵循相同过程来删除 tasktracker,因为它不如 DataNode 那样重要。DataNode 包含你要在不丢失任何数据的情况下安全删除的数据。

可以通过在任何时间点使用以下命令动态运行/关闭 tasktracker。

$ $HADOOP_HOME/bin/hadoop-daemon.sh stop tasktracker
$HADOOP_HOME/bin/hadoop-daemon.sh start tasktracker