Pyspark 简明教程

PySpark - RDD

现在,我们已安装并在系统上配置了 PySpark,我们可以使用 Python 在 Apache Spark 上进行编程。但在开始之前,让我们了解 Spark 中的一个基本概念 - RDD。

RDD 代表 Resilient Distributed Dataset ,它们是运行在多个节点上并对其进行操作以在集群上进行并行处理的元素。RDD 是不可变元素,这意味着一旦创建了 RDD,您便无法对其进行修改。RDD 也具有容错能力,因此在发生任何故障时,它们会自动恢复。您可以对这些 RDD 执行多个操作以实现特定任务。

要对这些 RDD 执行操作,有两种方法 −

  1. Transformation and

  2. Action

让我们详细了解这两种方法。

Transformation − 这些操作可应用于 RDD 以创建新的 RDD。Filter、groupBy 和映射都是变换类型的示例。

Action − 这些操作应用于 RDD,指示 Spark 执行计算并将结果发回驱动程序。

在 PySpark 中应用任何操作时,我们需要首先创建一个 PySpark RDD 。以下代码块详细说明了 PySpark RDD 类 −

class pyspark.RDD (
   jrdd,
   ctx,
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

让我们看看如何使用 PySpark 运行一些基本操作。Python 文件中的以下代码创建 RDD words,用于存储一组所提到的单词。

words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)

现在我们将在 words 上运行一些操作。

count()

返回 RDD 中元素的数量。

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

Command − count() 命令为 −

$SPARK_HOME/bin/spark-submit count.py

Output − 以上命令的输出为−

Number of elements in RDD → 8

collect()

返回 RDD 中的所有元素。

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Command − collect() 命令为 −

$SPARK_HOME/bin/spark-submit collect.py

Output − 以上命令的输出为−

Elements in RDD -> [
   'scala',
   'java',
   'hadoop',
   'spark',
   'akka',
   'spark vs hadoop',
   'pyspark',
   'pyspark and spark'
]

foreach(f)

仅返回满足 foreach 中函数条件的那些元素。在以下示例中,我们在 foreach 中调用打印函数,该函数打印 RDD 中的所有元素。

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f)
----------------------------------------foreach.py---------------------------------------

Command − foreach(f) 命令为 −

$SPARK_HOME/bin/spark-submit foreach.py

Output − 以上命令的输出为−

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

filter(f)

返回包含满足 filter 中函数要求的元素的新 RDD。在以下示例中,我们过滤掉包含“spark”的字符串。

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Command − filter(f) 命令为 −

$SPARK_HOME/bin/spark-submit filter.py

Output − 以上命令的输出为−

Fitered RDD -> [
   'spark',
   'spark vs hadoop',
   'pyspark',
   'pyspark and spark'
]

map(f, preservesPartitioning = False)

通过将函数应用于 RDD 中的每个元素来返回新 RDD。在以下示例中,我们形成一对键值并使用值 1 映射每个字符串。

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

Command − map(f, preservesPartitioning=False) 命令为 −

$SPARK_HOME/bin/spark-submit map.py

Output − 上述命令的输出为 −

Key value pair -> [
   ('scala', 1),
   ('java', 1),
   ('hadoop', 1),
   ('spark', 1),
   ('akka', 1),
   ('spark vs hadoop', 1),
   ('pyspark', 1),
   ('pyspark and spark', 1)
]

reduce(f)

在执行指定的交换和结合二元操作后,将返回 RDD 中的元素。在以下示例中,我们从运算符导入包 add,并将其应用于“num”以执行简单的加法操作。

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

Command − reduce(f) 命令为 −

$SPARK_HOME/bin/spark-submit reduce.py

Output − 上述命令的输出为 −

Adding all the elements -> 15

join(other, numPartitions = None)

它返回 RDD 以及一对具有匹配键和该特定键的所有值的元素。在以下示例中,在两个不同的 RDD 中有两对元素。在连接这两个 RDD 后,我们将获得一个 RDD,其中包含具有匹配键及其值的元素。

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

Command − join(other, numPartitions = None) 命令为 −

$SPARK_HOME/bin/spark-submit join.py

Output − 以上命令的输出为−

Join RDD -> [
   ('spark', (1, 2)),
   ('hadoop', (4, 5))
]

cache()

使用默认存储级别 (MEMORY_ONLY) 使此 RDD 持久化。您还可以检查 RDD 是否已缓存。

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Cache app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
words.cache()
caching = words.persist().is_cached
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

Command − cache() 命令为 −

$SPARK_HOME/bin/spark-submit cache.py

Output − 以上程序的输出为 −

Words got cached -> True

以下这些操作是 PySpark RDD 上执行的一些最重要的操作。