Apache Spark 简明教程
Advanced Spark Programming
Spark 包含两种不同类型的共享变量 − 一种是 broadcast variables ,另一种是 accumulators 。
Spark contains two different types of shared variables − one is broadcast variables and second is accumulators.
-
Broadcast variables − used to efficiently, distribute large values.
-
Accumulators − used to aggregate the information of particular collection.
Broadcast Variables
广播变量允许程序员将只读变量缓存到每台计算机上,而不是随任务一起发送其副本。例如,它们可以用于以有效的方式向每个节点提供大型输入数据集的副本。Spark 还尝试使用高效广播算法分发广播变量,以减少通信成本。
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node, a copy of a large input dataset, in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
Spark 操作通过一系列阶段执行,阶段之间由分布式“改组”操作分隔。Spark 自动广播每个阶段内任务所需的通用数据。
Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage.
通过这种方式广播的数据以序列化形式缓存,并在运行每个任务之前反序列化。这意味着显式创建广播变量仅在跨多个阶段的任务需要相同数据时或以反序列化形式缓存数据很重要时才有用。
The data broadcasted this way is cached in serialized form and is deserialized before running each task. This means that explicitly creating broadcast variables, is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.
广播变量从变量 v 创建,方法是调用 SparkContext.broadcast(v) 。广播变量是 v 周围的包装器,可以通过调用 value 方法访问其值。下面给出的代码显示了这一点 −
Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method. The code given below shows this −
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
Output −
Output −
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
创建广播变量后,应在群集上运行的任何函数中使用它,而不是值 v ,以便 v 不被多次发送到节点。此外,对象 v 在广播后不应被修改,以确保所有节点获取广播变量的相同值。
After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster, so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after its broadcast, in order to ensure that all nodes get the same value of the broadcast variable.
Accumulators
Accumulator 是只能通过关联运算“添加”但因此可有效地并行支持的变量。它们可用于实现计数器(如在 MapReduce 中)或和。Spark 本机支持数字类型的 Accumulator,而程序员可以添加对新类型支持。如果 Accumulator 是使用名称创建的,则它们将显示在 Spark’s UI 。这对于理解正在运行的阶段的进度非常有用(注意 − 这尚不受 Python 支持)。
Accumulators are variables that are only “added” to through an associative operation and can therefore, be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types. If accumulators are created with a name, they will be displayed in Spark’s UI. This can be useful for understanding the progress of running stages (NOTE − this is not yet supported in Python).
通过调用 SparkContext.accumulator(v) ,从初始值 v 创建 Accumulator。然后,群集上运行的任务可以使用 add 方法或 += 运算符(在 Scala 和 Python 中)向其添加。但是,它们无法读取其值。只有驱动程序才能使用其 value 方法读取 Accumulator 的值。
An accumulator is created from an initial value v by calling SparkContext.accumulator(v). Tasks running on the cluster can then add to it using the add method or the += operator (in Scala and Python). However, they cannot read its value. Only the driver program can read the accumulator’s value, using its value method.
下面给出的代码显示了 Accumulator 用于对数组元素求和 −
The code given below shows an accumulator being used to add up the elements of an array −
scala> val accum = sc.accumulator(0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
如果您想查看以上代码的输出,请使用以下命令 −
If you want to see the output of above code then use the following command −
scala> accum.value
Numeric RDD Operations
Spark 允许您使用一个预定义的 API 方法对数字数据执行不同的操作。Spark 的数字运算通过流算法实现,该算法允许一次一个元素地构建模型。
Spark allows you to do different operations on numeric data, using one of the predefined API methods. Spark’s numeric operations are implemented with a streaming algorithm that allows building the model, one element at a time.
这些操作按 StatusCounter 对象计算,通过调用 status() 方法返回。
These operations are computed and returned as a StatusCounter object by calling status() method.
以下是 StatusCounter 中可用的数字方法列表。
The following is a list of numeric methods available in StatusCounter.
S.No |
Methods & Meaning |
1 |
count() Number of elements in the RDD. |
2 |
Mean() Average of the elements in the RDD. |
3 |
Sum() Total value of the elements in the RDD. |
4 |
Max() Maximum value among all elements in the RDD. |
5 |
Min() Minimum value among all elements in the RDD. |
6 |
Variance() Variance of the elements. |
7 |
Stdev() Standard deviation. |
如果您只想使用其中一种方法,您可以直接在 RDD 中调用相应的方法。
If you want to use only one of these methods, you can call the corresponding method directly on RDD.