Apache Spark 简明教程

Advanced Spark Programming

Spark 包含两种不同类型的共享变量 − 一种是 broadcast variables ,另一种是 accumulators

  1. Broadcast variables − 用于高效地分发大量值。

  2. Accumulators − 用于聚合特定收集的信息。

Broadcast Variables

广播变量允许程序员将只读变量缓存到每台计算机上,而不是随任务一起发送其副本。例如,它们可以用于以有效的方式向每个节点提供大型输入数据集的副本。Spark 还尝试使用高效广播算法分发广播变量,以减少通信成本。

Spark 操作通过一系列阶段执行,阶段之间由分布式“改组”操作分隔。Spark 自动广播每个阶段内任务所需的通用数据。

通过这种方式广播的数据以序列化形式缓存,并在运行每个任务之前反序列化。这意味着显式创建广播变量仅在跨多个阶段的任务需要相同数据时或以反序列化形式缓存数据很重要时才有用。

广播变量从变量 v 创建,方法是调用 SparkContext.broadcast(v) 。广播变量是 v 周围的包装器,可以通过调用 value 方法访问其值。下面给出的代码显示了这一点 −

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

Output

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

创建广播变量后,应在群集上运行的任何函数中使用它,而不是值 v ,以便 v 不被多次发送到节点。此外,对象 v 在广播后不应被修改,以确保所有节点获取广播变量的相同值。

Accumulators

Accumulator 是只能通过关联运算“添加”但因此可有效地并行支持的变量。它们可用于实现计数器(如在 MapReduce 中)或和。Spark 本机支持数字类型的 Accumulator,而程序员可以添加对新类型支持。如果 Accumulator 是使用名称创建的,则它们将显示在 Spark’s UI 。这对于理解正在运行的阶段的进度非常有用(注意 − 这尚不受 Python 支持)。

通过调用 SparkContext.accumulator(v) ,从初始值 v 创建 Accumulator。然后,群集上运行的任务可以使用 add 方法或 += 运算符(在 Scala 和 Python 中)向其添加。但是,它们无法读取其值。只有驱动程序才能使用其 value 方法读取 Accumulator 的值。

下面给出的代码显示了 Accumulator 用于对数组元素求和 −

scala> val accum = sc.accumulator(0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

如果您想查看以上代码的输出,请使用以下命令 −

scala> accum.value

Output

res2: Int = 10

Numeric RDD Operations

Spark 允许您使用一个预定义的 API 方法对数字数据执行不同的操作。Spark 的数字运算通过流算法实现,该算法允许一次一个元素地构建模型。

这些操作按 StatusCounter 对象计算,通过调用 status() 方法返回。

以下是 StatusCounter 中可用的数字方法列表。

S.No

Methods & Meaning

1

count() RDD 中的元素数量。

2

Mean() RDD 中元素的平均值。

3

Sum() RDD 中元素的总值。

4

Max() RDD 中所有元素中的最大值。

5

Min() RDD 中所有元素中的最小值。

6

Variance() Variance of the elements.

7

Stdev() Standard deviation.

如果您只想使用其中一种方法,您可以直接在 RDD 中调用相应的方法。