Apache Spark 简明教程
Apache Spark - Introduction
各行业都在广泛使用 Hadoop 来分析其数据集。原因在于 Hadoop 框架基于一个简单的编程模型(MapReduce),并且它提供了一个可扩展、灵活、容错且具有成本效益的计算解决方案。这里,主要顾虑是在处理大型数据集的速度,即查询之间的等待时间和运行程序的等待时间。
Apache 软件基金会推出了 Spark 来加速 Hadoop 计算软件流程。
与普遍的看法相反, Spark is not a modified version of Hadoop 并不真正依赖于 Hadoop,因为它有自己的集群管理。Hadoop 只是实现 Spark 的方式之一。
Spark 以两种方式使用 Hadoop - 一种是 storage ,第二种是 processing 。由于 Spark 有自己的集群管理计算,所以它只将 Hadoop 用于存储目的。
Apache Spark
Apache Spark 是一种闪电般快速的集群计算技术,专为快速计算而设计。它基于 Hadoop MapReduce,并扩展了 MapReduce 模型以有效地将其用于更多类型的计算,包括交互式查询和流处理。Spark 的主要特征是其 in-memory cluster computing ,它提高了应用程序的处理速度。
Spark 旨在涵盖广泛的工作负载,例如批处理应用程序、迭代算法、交互式查询和流处理。除了在各自系统中支持所有这些工作负载外,它还减轻了维护单独工具的管理负担。
Evolution of Apache Spark
Spark 是 Hadoop 的一个子项目,于 2009 年在加州大学伯克利分校 AMPLab 由 Matei Zaharia 开发。它于 2010 年根据 BSD 许可证开源。它于 2013 年捐赠给 Apache 软件基金会,现在 Apache Spark 已成为自 2014 年 2 月起的一个顶级 Apache 项目。
Features of Apache Spark
Apache Spark 具有以下功能。
-
Speed - Spark 帮助在 Hadoop 集群中运行应用程序,速度在内存中提高多达 100 倍,在磁盘上运行时速度提高 10 倍。通过减少对磁盘的读/写操作次数,可以做到这一点。它将中间处理数据存储在内存中。
-
Supports multiple languages - Spark 在 Java、Scala 或 Python 中提供了内置的 API。因此,你可以使用不同的语言编写应用程序。Spark 提供了 80 个高级运算符用于交互式查询。
-
Advanced Analytics - Spark 不仅支持“映射”和“还原”。它还支持 SQL 查询、流数据、机器学习 (ML) 和图形算法。
Spark Built on Hadoop
下图展示了使用 Hadoop 组件构建 Spark 的三种方法。
如以下说明所示,共有三种 Spark 部署方法。
-
Standalone − Spark 独立部署表示 Spark 占据 HDFS(Hadoop 分布式文件系统)之上的位置,并明确分配空间给 HDFS。在此,Spark 和 MapReduce 将并排运行以覆盖群集上的所有 Spark 作业。
-
Hadoop Yarn − Hadoop Yarn 部署仅仅意味着 Spark 在 Yarn 上运行,无需预安装或 root 访问权限。它有助于将 Spark 集成到 Hadoop 生态系统或 Hadoop 堆栈中。它允许其他组件在堆栈之上运行。
-
Spark in MapReduce (SIMR) − Spark in MapReduce 用于在独立部署之外启动 spark 作业。通过 SIMR,用户可以启动 Spark 并使用其 shell,而无需任何管理访问权限。
Apache Spark - RDD
Resilient Distributed Datasets
弹性分布式数据集 (RDD) 是 Spark 的一个基本数据结构。它是对象的不可变分布式集合。RDD 中的每个数据集都被划分为逻辑分区,这些分区可以在群集的不同节点上计算。RDD 可以包含任何类型的 Python、Java 或 Scala 对象,包括用户定义的类。
形式上,RDD 是只读的分区记录集合。RDD 可以通过对稳定存储器中的数据或其他 RDD 上的确定性操作来创建。RDD 是一个容错元素集合,可以并行对其进行操作。
有两种方法可以创建 RDD − parallelizing 驱动程序程序中的现有集合或 referencing a dataset 位于外部存储系统中,例如共享文件系统、HDFS、HBase 或提供 Hadoop 输入格式的任何数据源。
Spark 利用 RDD 的概念实现了更快速、高效的 MapReduce 操作。让我们首先讨论 MapReduce 操作如何进行以及为什么它们效率不高。
Data Sharing is Slow in MapReduce
MapReduce 被广泛采用,用于在群集上通过并行分布式算法处理和生成大型数据集。它允许用户编写并行计算,使用一组高级运算符,而无需担心工作分布和容错能力。
不幸的是,在大多数当前框架中,在计算之间(例如两个 MapReduce 作业之间)重用数据的唯一方法是将其写入外部稳定存储系统(例如 HDFS)。尽管此框架提供了许多用于访问群集计算资源的抽象,但用户仍然希望获得更多。
Iterative 和 Interactive 应用程序都需要并行作业之间更快的 data 共享。由于 replication, serialization 和 disk IO ,MapReduce 中 data 共享速度很慢。关于存储系统,大多数 Hadoop 应用程序花费超过 90% 的时间执行 HDFS 读写操作。
Iterative Operations on MapReduce
在多阶段应用程序中跨多个计算重用中间结果。下图解释了在 MapReduce 上执行迭代操作时当前框架如何工作。由于复制数据、磁盘 I/O 和序列化,这会产生大量的开销,从而导致系统变慢。
Interactive Operations on MapReduce
用户针对相同数据子集运行即席查询。每个查询将对持久性存储执行磁盘 I/O,这会占据应用执行时间的大部分。
下图说明了当前框架在 MapReduce 上执行交互式查询时如何工作。
Data Sharing using Spark RDD
由于 replication, serialization 和 disk IO ,MapReduce 中的数据共享较慢。大多数 Hadoop 应用在执行 HDFS 读写操作时会花费 90% 以上的时间。
认识到这个问题后,研究人员开发了一个名为 Apache Spark 的专门框架。Spark 的关键思想是 *R*esilient *D*istributed *D*atasets (RDD);它支持内存内处理计算。这意味着它将内存的状态存储为作业间的对象,且此对象可在这些作业中共享。内存内的数据共享比网络和磁盘快 10 到 100 倍。
现在,我们尝试找出 Spark RDD 中迭代和交互式操作如何执行。
Apache Spark - Installation
Spark 是 Hadoop 的子项目。因此,最好将 Spark 安装到基于 Linux 的系统中。以下步骤显示如何安装 Apache Spark。
Step 1: Verifying Java Installation
Java 安装是安装 Spark 的必备事项之一。尝试以下命令以验证 JAVA 版本。
$java -version
如果系统中已安装 Java,您将看到以下响应:
java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)
如果您系统中尚未安装 Java,请在继续下一步操作之前安装 Java。
Step 2: Verifying Scala installation
您应使用 Scala 语言来实现 Spark。因此,让我们使用以下命令验证 Scala 安装。
$scala -version
如果系统中已安装 Scala,您将看到以下响应:
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
如果您系统中尚未安装 Scala,则继续执行下一步操作以安装 Scala。
Step 3: Downloading Scala
下载 Scala 的最新版本,请访问以下链接 Download Scala 。对于本教程,我们使用 scala-2.11.6 版本。下载后,您将在下载文件夹中找到 Scala tar 文件。
Step 4: Installing Scala
按照以下给定的步骤安装 Scala。
Step 5: Downloading Apache Spark
访问以下链接 Download Spark 下载 Spark 最新版本。本教程中,我们使用 spark-1.3.1-bin-hadoop2.6 版本。下载后,你将在下载文件夹中找到 Spark tar 文件。
Step 6: Installing Spark
按照以下步骤执行 Spark 安装。
Step 7: Verifying the Spark Installation
输入以下命令打开 Spark shell。
$spark-shell
如果 Spark 安装成功,你将看到以下输出。
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
Apache Spark - Core Programming
Spark Core 是整个项目的底层。它提供了分布式任务分发、调度和基本 I/O 功能。Spark 使用一种称为 RDD(弹性分布式数据集)的特殊基本数据结构,它是分布在多台机器上的数据的逻辑集合。RDD 可以通过两种方式创建:一是引用外部存储系统中的数据集,二是针对现有的 RDD 应用转换(例如,map、filter、reducer、join)。
通过语言集成的 API 公开了 RDD 抽象。这简化了编程复杂度,这是因为应用程序处理 RDD 的方式类似于处理本地数据集合。
Spark Shell
Spark 提供了一个交互式 shell——一种交互式分析数据的强大工具。此工具既可以在 Scala 中使用,也可以在 Python 语言中使用。Spark 的主要抽象是称为弹性分布式数据集 (RDD) 的分布式项集合。RDD 可以从 Hadoop 输入格式(例如,HDFS 文件)或通过转换其他 RDD 创建。
RDD Transformations
RDD 转换返回对新 RDD 的指针,并允许你创建 RDD 之间的依赖关系。依赖关系链(依赖关系串)中的每个 RDD 都具有一个用于计算其数据的函数,并具有对其父 RDD 的指针(依赖关系)。
Spark 比较懒,所以在调用一些将触发作业创建和执行的转换或动作之前,不会执行任何操作。查看单词计数示例的以下代码段。
因此,RDD 转换不是一组数据,而是程序中的一个步骤(可能是唯一的步骤),告诉 Spark 如何获取数据并对其进行处理。
Programming with RDD
我们借助示例,来看看 RDD 编程中一些 RDD 转换和动作的实现。
Example
考虑一个单词计数的示例——它计算一个文档中出现的每个单词。将以下文本视为一个输入,在主目录中将其保存为 input.txt 文件。
input.txt ——输入文件。
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
按照以下给出的过程执行给定的示例。
Open Spark-Shell
使用以下命令打开 Spark Shell。通常,Spark 是使用 Scala 构建的。因此,Spark 程序在 Scala 环境中运行。
$ spark-shell
如果 Spark Shell 顺利打开,那么将会看到以下输出。查看输出的最后一行“Spark context available as sc”,这意味着 Spark 容器已自动创建名为 sc 的 Spark context 对象。在开始程序的第一步之前,应该创建 SparkContext 对象。
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
Create an RDD
首先,我们必须使用 Spark-Scala API 读取输入文件并创建一个 RDD。
以下命令用于从给定位置读取文件。此处,使用 inputfile 的名称创建了新的 RDD。在 textFile(“”) 方法中作为参数提供的 String 是输入文件名所在绝对路径。但是,如果只给出了文件名,则表示输入文件位于当前位置。
scala> val inputfile = sc.textFile("input.txt")
Execute Word count Transformation
我们的目标是统计文件中出现的单词。创建一个平面映射来将每行拆分到词组中( flatMap(line ⇒ line.split(“ ”) )。
接下来,使用映射功能( map(word ⇒ (word, 1) )将每个单词作为具有值 ‘1’ 的键读取(<key, value> = <word,1>)。
最后,通过添加类似键的键值来缩减这些键( reduceByKey(+) )。
以下命令用于执行单词计数逻辑。执行此操作之后,不会看到任何输出,因为这不是一个动作,而是一个转换;指向新的 RDD 或告诉 Spark 如何处理给定数据。
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
Caching the Transformations
可以使用 persist() 或 cache() 方法将 RDD 标记为持久化。在动作中首次对其进行计算时,它将保存在节点的内存中。使用以下命令将中间转换存储在内存中。
scala> counts.cache()
Applying the Action
应用一个动作,例如存储所有转换,会生成一个文本文件。saveAsTextFile(“ ”) 方法的 String 参数是输出文件夹的绝对路径。尝试以下命令,将输出存储到文本文件中。在以下示例中,“output”文件夹位于当前位置。
scala> counts.saveAsTextFile("output")
Checking the Output
打开另一个终端进入主目录(在另一个终端中执行 Spark 的地方)。使用以下命令检查输出目录。
[hadoop@localhost ~]$ cd output/
[hadoop@localhost output]$ ls -1
part-00000
part-00001
_SUCCESS
以下命令用于查看 Part-00000 文件的输出。
[hadoop@localhost output]$ cat part-00000
UN Persist the Storage
在取消持久化之前,如果你想查看该应用程序使用的存储空间,请在浏览器中使用以下 URL。
http://localhost:4040
以下屏幕将显示该应用程序使用的存储空间,这些应用程序正在 Spark shell 上运行。
如果你要取消持久化特定 RDD 的存储空间,请使用以下命令。
Scala> counts.unpersist()
你将看到如下输出:
15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14
要验证浏览器中的存储空间,请使用以下 URL。
http://localhost:4040/
你将看到以下屏幕。它显示了该应用程序使用的存储空间,这些应用程序正在 Spark shell 上运行。
Apache Spark - Deployment
Spark 应用程序使用 spark-submit,这是一个 shell 命令,用于在群集上部署 Spark 应用程序。它通过统一的界面,使用所有各自的群集管理器。因此,你无需为每个管理器配置应用程序。
Example
我们使用 shell 命令,来看看之前使用的单词计数的相同示例。在这里,我们考虑相同的示例作为 Spark 应用程序。
Sample Input
以下文本是输入数据,该文件名为 in.txt 。
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
查看以下程序:
SparkWordCount.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")
/* Transform the inputRDD into countRDD */
val count = input.flatMap(line ⇒ line.split(" "))
.map(word ⇒ (word, 1))
.reduceByKey(_ + _)
/* saveAsTextFile method is an action that effects on the RDD */
count.saveAsTextFile("outfile")
System.out.println("OK");
}
}
将上述程序保存在名为 SparkWordCount.scala 的文件中,并将其放置在名为 spark-application 的用户定义目录中。
Note - 将 inputRDD 转换为 countRDD 时,我们使用 flatMap() 对文本文件中的行进行标记化,使用 map() 方法计算单词频率,并使用 reduceByKey() 方法计算每个单词的重复次数。
使用以下步骤提交该应用程序。通过终端执行 spark-application 目录中的所有步骤。
Step 1: Download Spark Ja
编译需要 Spark 核心 jar,因此,请从以下链接下载 spark-core_2.10-1.3.0.jar Spark core jar ,并将 jar 文件从下载目录移动到 spark-application 目录。
Step 2: Compile program
使用以下命令编译上面的程序。该命令应从 spark-application 目录执行。在这里, /usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar 是从 Spark 库获取的 Hadoop 支持 jar。
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
Step 3: Create a JAR
使用以下命令创建 spark 应用程序的 jar 文件。在这里, wordcount 是 jar 文件的文件名。
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
Step 4: Submit spark application
使用以下命令提交 spark 应用程序:
spark-submit --class SparkWordCount --master local wordcount.jar
如果执行成功,你将找到给定的以下输出。以下输出中出现的 OK 是为了识别用户,这是程序的最后一行。如果你仔细阅读以下输出,你将发现不同信息,例如:
-
在端口 42954 上成功启动服务 'sparkDriver'
-
MemoryStore 已启动,容量为 267.3 MB
-
Started SparkUI at [role="bare"]http://192.168.1.217:4040
-
Added JAR file:/home/hadoop/piapplication/count.jar
-
ResultStage 1(SparkPi.scala:11 处的 saveAsTextFile)在 0.566 秒内完成
-
已在 [role="bare"] [role="bare"]http://192.168.1.217:4040 处停止 Spark 网络 UI
-
MemoryStore cleared
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
Step 5: Checking output
程序成功执行后,你将在 spark-application 目录中找到名为 outfile 的目录。
以下命令用于打开和检查 outfile 目录中的文件列表。
$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS
检查 part-00000 文件中输出的命令为 −
$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
检查 part-00001 文件中输出的命令为 −
$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
转到以下部分详细了解 'spark-submit' 命令。