Pyspark 简明教程

PySpark - Broadcast & Accumulator

对于并行处理,Apache Spark 使用共享变量。当驱动程序将任务发送到集群上的执行程序时,共享变量的副本将转到集群的每个节点,以便可以将其用于执行任务。

For parallel processing, Apache Spark uses shared variables. A copy of shared variable goes on each node of the cluster when the driver sends a task to the executor on the cluster, so that it can be used for performing tasks.

Apache Spark 支持两种类型的共享变量−

There are two types of shared variables supported by Apache Spark −

  1. Broadcast

  2. Accumulator

我们详细了解它们。

Let us understand them in detail.

Broadcast

广播变量用于在所有节点之间保存数据的副本。这个变量缓存在所有机器上,不发送给执行任务的机器。下面的代码块详细介绍了 PySpark 的 Broadcast 类的详细信息。

Broadcast variables are used to save the copy of data across all nodes. This variable is cached on all the machines and not sent on machines with tasks. The following code block has the details of a Broadcast class for PySpark.

class pyspark.Broadcast (
   sc = None,
   value = None,
   pickle_registry = None,
   path = None
)

下面的示例展示了如何使用 Broadcast 变量。Broadcast 变量有一个名为 value 的属性,它存储数据并用于返回广播值。

The following example shows how to use a Broadcast variable. A Broadcast variable has an attribute called value, which stores the data and is used to return a broadcasted value.

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Broadcast app")
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
print "Stored data -> %s" % (data)
elem = words_new.value[2]
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command − 广播变量的命令如下 −

Command − The command for a broadcast variable is as follows −

$SPARK_HOME/bin/spark-submit broadcast.py

Output − 以下命令的输出如下所示。

Output − The output for the following command is given below.

Stored data -> [
   'scala',
   'java',
   'hadoop',
   'spark',
   'akka'
]
Printing a particular element in RDD -> hadoop

Accumulator

累加器变量用于通过关联和交换运算汇聚信息。例如,你可以使用一个累加器进行求和运算或计数(在 MapReduce 中)。下面的代码块详细介绍了 PySpark 的 Accumulator 类的详细信息。

Accumulator variables are used for aggregating the information through associative and commutative operations. For example, you can use an accumulator for a sum operation or counters (in MapReduce). The following code block has the details of an Accumulator class for PySpark.

class pyspark.Accumulator(aid, value, accum_param)

下面的示例展示了如何使用累加器变量。累加器变量有一个名为 value 的属性,它类似于广播变量所具有的属性。它存储数据并用于返回累加器的值,但仅在驱动程序中可用。

The following example shows how to use an Accumulator variable. An Accumulator variable has an attribute called value that is similar to what a broadcast variable has. It stores the data and is used to return the accumulator’s value, but usable only in a driver program.

在这个示例中,累加器变量被多个工作人员使用,并返回一个累加值。

In this example, an accumulator variable is used by multiple workers and returns an accumulated value.

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Accumulator app")
num = sc.accumulator(10)
def f(x):
   global num
   num+=x
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
final = num.value
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command − 累加器变量的命令如下 −

Command − The command for an accumulator variable is as follows −

$SPARK_HOME/bin/spark-submit accumulator.py

Output − 以上命令的输出如下所示。

Output − The output for the above command is given below.

Accumulated value is -> 150