Apache Spark 简明教程

Apache Spark - Core Programming

Spark Core 是整个项目的底层。它提供了分布式任务分发、调度和基本 I/O 功能。Spark 使用一种称为 RDD(弹性分布式数据集)的特殊基本数据结构,它是分布在多台机器上的数据的逻辑集合。RDD 可以通过两种方式创建:一是引用外部存储系统中的数据集,二是针对现有的 RDD 应用转换(例如,map、filter、reducer、join)。

通过语言集成的 API 公开了 RDD 抽象。这简化了编程复杂度,这是因为应用程序处理 RDD 的方式类似于处理本地数据集合。

Spark Shell

Spark 提供了一个交互式 shell——一种交互式分析数据的强大工具。此工具既可以在 Scala 中使用,也可以在 Python 语言中使用。Spark 的主要抽象是称为弹性分布式数据集 (RDD) 的分布式项集合。RDD 可以从 Hadoop 输入格式(例如,HDFS 文件)或通过转换其他 RDD 创建。

Open Spark Shell

以下命令用于打开 Spark shell。

$ spark-shell

Create simple RDD

让我们从文本文件中创建一个简单的 RDD。使用以下命令创建简单的 RDD。

scala> val inputfile = sc.textFile(“input.txt”)

上述命令的输出为

inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12

Spark RDD API 引入了少数 Transformations 和少数 Actions 来操作 RDD。

RDD Transformations

RDD 转换返回对新 RDD 的指针,并允许你创建 RDD 之间的依赖关系。依赖关系链(依赖关系串)中的每个 RDD 都具有一个用于计算其数据的函数,并具有对其父 RDD 的指针(依赖关系)。

Spark 比较懒,所以在调用一些将触发作业创建和执行的转换或动作之前,不会执行任何操作。查看单词计数示例的以下代码段。

因此,RDD 转换不是一组数据,而是程序中的一个步骤(可能是唯一的步骤),告诉 Spark 如何获取数据并对其进行处理。

以下是 RDD 转换的列表。

S.No

Transformations & Meaning

1

map(func) 通过函数 func 传递源的每个元素形成一个新的分布式数据集。

2

filter(func) 返回通过选择源中 func 返回 true 的那些元素形成的新数据集。

3

flatMap(func) 类似于 map,但每个输入项可以映射到 0 个或更多个输出项(所以 func 应该返回一个 Seq 而不是单个项)。

4

mapPartitions(func) 类似于 map,但在 RDD 的每个分区(块)上单独运行,所以当在类型为 T 的 RDD 上运行时, func 必须为 Iterator<T> ⇒ Iterator<U> 类型。

5

mapPartitionsWithIndex(func) 类似于 map 分区,但还向 func 提供表示分区索引的整数值,所以当在类型为 T 的 RDD 上运行时, func 必须为 (Int, Iterator<T>) ⇒ Iterator<U> 类型。

6

sample(withReplacement, fraction, seed) 使用给定的随机数生成器种子有放回或无放回地对 fraction 的数据进行抽样。

7

union(otherDataset) 返回一个新数据集,其中包含源数据集中元素的并集和参数。

8

intersection(otherDataset) 返回一个新 RDD,其中包含源数据集中元素和参数的交集。

9

distinct([numTasks]) 返回一个新数据集,其中包含源数据集的各异元素。

10

groupByKey([numTasks]) 针对 (K, V) 对数据集调用时,返回 (K, Iterable<V>) 对数据集。 Note − 如果你按分组对每个键执行聚合(例如求和或平均数),使用 reduceByKey 或 aggregateByKey 将能产生更好的性能。

11

reduceByKey(func, [numTasks]) 针对 (K, V) 对数据集调用时,返回 (K, V) 对数据集,其中每个键的值使用给定 reduce 函数 func 进行聚合,类型必须为 (V, V) ⇒ V。与 groupByKey 一样,可以通过可选的第二个参数配置 reduce 任务的数量。

12

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 针对 (K, V) 对数据集调用时,返回 (K, U) 对数据集,其中每个键的值使用给定组合函数和一个中立的“零”值进行聚合。允许聚合值类型不同于输入值类型,同时避免了不必要的分配。与 groupByKey 一样,可以通过可选的第二个参数配置 reduce 任务的数量。

13

sortByKey([ascending], [numTasks]) 针对 K 实现 Ordered 的 (K, V) 对数据集调用时,返回按升序或降序(如布尔 ascending 参数中指定的)按键排序的 (K, V) 对数据集。

14

join(otherDataset, [numTasks]) 针对类型为 (K, V) 和 (K, W) 的数据集调用时,返回 (K, (V, W)) 对数据集,其中包含每个键的所有元素对。可通过 leftOuterJoin、rightOuterJoin 和 fullOuterJoin 来支持外部连接。

15

cogroup(otherDataset, [numTasks]) 针对类型为 (K, V) 和 (K, W) 的数据集调用时,返回 (K, (Iterable<V>, Iterable<W>)) 元组数据集。此操作也称为按组 With。

16

cartesian(otherDataset) 针对类型为 T 和 U 的数据集调用时,返回 (T, U) 对数据集(所有元素对)。

17

pipe(command, [envVars]) 通过 shell 命令(例如 Perl 或 bash 脚本)传送 RDD 的各个分区。RDD 元素将写入进程的标准输入,输出到其标准输出的行将作为字符串 RDD 返回。

18

coalesce(numPartitions) 将 RDD 中的分区数量减少到 numPartitions。这对于在大数据集上过滤后再高效地运行操作非常有用。

19

repartition(numPartitions) 随机地重新分配 RDD 中的数据以创建更多或更少的分区,并在分区之间平衡数据。此操作将始终在网络中混洗所有数据。

20

repartitionAndSortWithinPartitions(partitioner) 根据给定分区器对 RDD 进行重新分区,并在每个产生的分区内按键对记录进行排序。这比调用 repartition 然后在每个分区内进行排序更有效,因为它可以将排序推送到混洗机制中。

Actions

下表提供了操作(返回值)的列表。

S.No

Action & Meaning

1

reduce(func) 使用一个函数(它获取两个参数并返回一个参数)聚合数据集的元素。该函数应为交换律和结合律,以便可以并行正确计算。

2

func 在驱动程序中以数组形式返回数据集的所有元素。这在执行筛选或其他仅返回数据的一个充分小的子集的操作后通常非常有用。

3

collect() 返回数据集中的元素数量。

4

count() 返回数据集中的第一个元素(类似于 take (1))。

5

first() 返回一个包含数据集前 n 个元素的数组。

6

takeSample (withReplacement,num, [seed]) 返回一个包含数据集 num 个随机采样元素(可带或不带替换)的数组,或者选择性地预先指定一个随机数生成器种子。

7

takeOrdered(n, [ordering]) 使用它们的自然顺序或自定义比较器返回 RDD 的前 n 个元素。

8

saveAsTextFile(path) 以文本文件(或一组文本文件)的形式将数据集元素写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定目录。Spark 在每个元素上调用 toString,以将其转换为文件中的文本行。

9

saveAsSequenceFile(path) (Java and Scala) 以 Hadoop SequenceFile 的形式将数据集元素写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定路径。这适用于实现 Hadoop 的 Writable 接口的键值对 RDD。在 Scala 中,它也适用于隐式可转换为 Writable 的类型(Spark 包含基本类型(如 Int、Double、String 等)的转换)。

10

saveAsObjectFile(path) (Java and Scala) 使用 Java 序列化以简单格式写入数据集元素,然后可以使用 SparkContext.objectFile() 加载。

11

countByKey() 仅适用于类型为 (K, V) 的 RDD。返回一个哈希映射的 (K, Int) 对,其中包含每个键的数量。

12

foreach(func) 对数据集的每个元素运行函数 func 。通常情况下,这样做是为了产生副作用,例如更新 Accumulator 或与外部存储系统交互。 Note − 修改 foreach() 之外的除了 Accumulator 之外的变量可能会导致未定义的行为。有关更多详细信息,请参阅了解闭包。

Programming with RDD

我们借助示例,来看看 RDD 编程中一些 RDD 转换和动作的实现。

Example

考虑一个单词计数的示例——它计算一个文档中出现的每个单词。将以下文本视为一个输入,在主目录中将其保存为 input.txt 文件。

input.txt ——输入文件。

people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful  as they love,
as they care as they share.

按照以下给出的过程执行给定的示例。

Open Spark-Shell

使用以下命令打开 Spark Shell。通常,Spark 是使用 Scala 构建的。因此,Spark 程序在 Scala 环境中运行。

$ spark-shell

如果 Spark Shell 顺利打开,那么将会看到以下输出。查看输出的最后一行“Spark context available as sc”,这意味着 Spark 容器已自动创建名为 sc 的 Spark context 对象。在开始程序的第一步之前,应该创建 SparkContext 对象。

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>

Create an RDD

首先,我们必须使用 Spark-Scala API 读取输入文件并创建一个 RDD。

以下命令用于从给定位置读取文件。此处,使用 inputfile 的名称创建了新的 RDD。在 textFile(“”) 方法中作为参数提供的 String 是输入文件名所在绝对路径。但是,如果只给出了文件名,则表示输入文件位于当前位置。

scala> val inputfile = sc.textFile("input.txt")

Execute Word count Transformation

我们的目标是统计文件中出现的单词。创建一个平面映射来将每行拆分到词组中( flatMap(line ⇒ line.split(“ ”) )。

接下来,使用映射功能( map(word ⇒ (word, 1) )将每个单词作为具有值 ‘1’ 的键读取(<key, value> = <word,1>)。

最后,通过添加类似键的键值来缩减这些键( reduceByKey(+) )。

以下命令用于执行单词计数逻辑。执行此操作之后,不会看到任何输出,因为这不是一个动作,而是一个转换;指向新的 RDD 或告诉 Spark 如何处理给定数据。

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

Current RDD

在使用 RDD 时,如果想要了解当前 RDD,那么使用以下命令。它将显示有关当前 RDD 的描述及用于调试的依赖项。

scala> counts.toDebugString

Caching the Transformations

可以使用 persist() 或 cache() 方法将 RDD 标记为持久化。在动作中首次对其进行计算时,它将保存在节点的内存中。使用以下命令将中间转换存储在内存中。

scala> counts.cache()

Applying the Action

应用一个动作,例如存储所有转换,会生成一个文本文件。saveAsTextFile(“ ”) 方法的 String 参数是输出文件夹的绝对路径。尝试以下命令,将输出存储到文本文件中。在以下示例中,“output”文件夹位于当前位置。

scala> counts.saveAsTextFile("output")

Checking the Output

打开另一个终端进入主目录(在另一个终端中执行 Spark 的地方)。使用以下命令检查输出目录。

[hadoop@localhost ~]$ cd output/
[hadoop@localhost output]$ ls -1

part-00000
part-00001
_SUCCESS

以下命令用于查看 Part-00000 文件的输出。

[hadoop@localhost output]$ cat part-00000

Output

(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)

以下命令用于查看 Part-00001 文件的输出。

[hadoop@localhost output]$ cat part-00001

Output

(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)

UN Persist the Storage

在取消持久化之前,如果你想查看该应用程序使用的存储空间,请在浏览器中使用以下 URL。

http://localhost:4040

以下屏幕将显示该应用程序使用的存储空间,这些应用程序正在 Spark shell 上运行。

storage space

如果你要取消持久化特定 RDD 的存储空间,请使用以下命令。

Scala> counts.unpersist()

你将看到如下输出:

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14

要验证浏览器中的存储空间,请使用以下 URL。

http://localhost:4040/

你将看到以下屏幕。它显示了该应用程序使用的存储空间,这些应用程序正在 Spark shell 上运行。

storage space for application