Pyspark 简明教程

PySpark - Broadcast & Accumulator

对于并行处理,Apache Spark 使用共享变量。当驱动程序将任务发送到集群上的执行程序时,共享变量的副本将转到集群的每个节点,以便可以将其用于执行任务。

Apache Spark 支持两种类型的共享变量−

  1. Broadcast

  2. Accumulator

我们详细了解它们。

Broadcast

广播变量用于在所有节点之间保存数据的副本。这个变量缓存在所有机器上,不发送给执行任务的机器。下面的代码块详细介绍了 PySpark 的 Broadcast 类的详细信息。

class pyspark.Broadcast (
   sc = None,
   value = None,
   pickle_registry = None,
   path = None
)

下面的示例展示了如何使用 Broadcast 变量。Broadcast 变量有一个名为 value 的属性,它存储数据并用于返回广播值。

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Broadcast app")
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
print "Stored data -> %s" % (data)
elem = words_new.value[2]
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command − 广播变量的命令如下 −

$SPARK_HOME/bin/spark-submit broadcast.py

Output − 以下命令的输出如下所示。

Stored data -> [
   'scala',
   'java',
   'hadoop',
   'spark',
   'akka'
]
Printing a particular element in RDD -> hadoop

Accumulator

累加器变量用于通过关联和交换运算汇聚信息。例如,你可以使用一个累加器进行求和运算或计数(在 MapReduce 中)。下面的代码块详细介绍了 PySpark 的 Accumulator 类的详细信息。

class pyspark.Accumulator(aid, value, accum_param)

下面的示例展示了如何使用累加器变量。累加器变量有一个名为 value 的属性,它类似于广播变量所具有的属性。它存储数据并用于返回累加器的值,但仅在驱动程序中可用。

在这个示例中,累加器变量被多个工作人员使用,并返回一个累加值。

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Accumulator app")
num = sc.accumulator(10)
def f(x):
   global num
   num+=x
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
final = num.value
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command − 累加器变量的命令如下 −

$SPARK_HOME/bin/spark-submit accumulator.py

Output − 以上命令的输出如下所示。

Accumulated value is -> 150