Pyspark 简明教程

PySpark - RDD

现在,我们已安装并在系统上配置了 PySpark,我们可以使用 Python 在 Apache Spark 上进行编程。但在开始之前,让我们了解 Spark 中的一个基本概念 - RDD。

Now that we have installed and configured PySpark on our system, we can program in Python on Apache Spark. However before doing so, let us understand a fundamental concept in Spark - RDD.

RDD 代表 Resilient Distributed Dataset ,它们是运行在多个节点上并对其进行操作以在集群上进行并行处理的元素。RDD 是不可变元素,这意味着一旦创建了 RDD,您便无法对其进行修改。RDD 也具有容错能力,因此在发生任何故障时,它们会自动恢复。您可以对这些 RDD 执行多个操作以实现特定任务。

RDD stands for Resilient Distributed Dataset, these are the elements that run and operate on multiple nodes to do parallel processing on a cluster. RDDs are immutable elements, which means once you create an RDD you cannot change it. RDDs are fault tolerant as well, hence in case of any failure, they recover automatically. You can apply multiple operations on these RDDs to achieve a certain task.

要对这些 RDD 执行操作,有两种方法 −

To apply operations on these RDD’s, there are two ways −

  1. Transformation and

  2. Action

让我们详细了解这两种方法。

Let us understand these two ways in detail.

Transformation − 这些操作可应用于 RDD 以创建新的 RDD。Filter、groupBy 和映射都是变换类型的示例。

Transformation − These are the operations, which are applied on a RDD to create a new RDD. Filter, groupBy and map are the examples of transformations.

Action − 这些操作应用于 RDD,指示 Spark 执行计算并将结果发回驱动程序。

Action − These are the operations that are applied on RDD, which instructs Spark to perform computation and send the result back to the driver.

在 PySpark 中应用任何操作时,我们需要首先创建一个 PySpark RDD 。以下代码块详细说明了 PySpark RDD 类 −

To apply any operation in PySpark, we need to create a PySpark RDD first. The following code block has the detail of a PySpark RDD Class −

class pyspark.RDD (
   jrdd,
   ctx,
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

让我们看看如何使用 PySpark 运行一些基本操作。Python 文件中的以下代码创建 RDD words,用于存储一组所提到的单词。

Let us see how to run a few basic operations using PySpark. The following code in a Python file creates RDD words, which stores a set of words mentioned.

words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)

现在我们将在 words 上运行一些操作。

We will now run a few operations on words.

count()

返回 RDD 中元素的数量。

Number of elements in the RDD is returned.

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

Command − count() 命令为 −

Command − The command for count() is −

$SPARK_HOME/bin/spark-submit count.py

Output − 以上命令的输出为−

Output − The output for the above command is −

Number of elements in RDD → 8

collect()

返回 RDD 中的所有元素。

All the elements in the RDD are returned.

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Command − collect() 命令为 −

Command − The command for collect() is −

$SPARK_HOME/bin/spark-submit collect.py

Output − 以上命令的输出为−

Output − The output for the above command is −

Elements in RDD -> [
   'scala',
   'java',
   'hadoop',
   'spark',
   'akka',
   'spark vs hadoop',
   'pyspark',
   'pyspark and spark'
]

foreach(f)

仅返回满足 foreach 中函数条件的那些元素。在以下示例中,我们在 foreach 中调用打印函数,该函数打印 RDD 中的所有元素。

Returns only those elements which meet the condition of the function inside foreach. In the following example, we call a print function in foreach, which prints all the elements in the RDD.

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f)
----------------------------------------foreach.py---------------------------------------

Command − foreach(f) 命令为 −

Command − The command for foreach(f) is −

$SPARK_HOME/bin/spark-submit foreach.py

Output − 以上命令的输出为−

Output − The output for the above command is −

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

filter(f)

返回包含满足 filter 中函数要求的元素的新 RDD。在以下示例中,我们过滤掉包含“spark”的字符串。

A new RDD is returned containing the elements, which satisfies the function inside the filter. In the following example, we filter out the strings containing ''spark".

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Command − filter(f) 命令为 −

Command − The command for filter(f) is −

$SPARK_HOME/bin/spark-submit filter.py

Output − 以上命令的输出为−

Output − The output for the above command is −

Fitered RDD -> [
   'spark',
   'spark vs hadoop',
   'pyspark',
   'pyspark and spark'
]

map(f, preservesPartitioning = False)

通过将函数应用于 RDD 中的每个元素来返回新 RDD。在以下示例中,我们形成一对键值并使用值 1 映射每个字符串。

A new RDD is returned by applying a function to each element in the RDD. In the following example, we form a key value pair and map every string with a value of 1.

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

Command − map(f, preservesPartitioning=False) 命令为 −

Command − The command for map(f, preservesPartitioning=False) is −

$SPARK_HOME/bin/spark-submit map.py

Output − 上述命令的输出为 −

Output − The output of the above command is −

Key value pair -> [
   ('scala', 1),
   ('java', 1),
   ('hadoop', 1),
   ('spark', 1),
   ('akka', 1),
   ('spark vs hadoop', 1),
   ('pyspark', 1),
   ('pyspark and spark', 1)
]

reduce(f)

在执行指定的交换和结合二元操作后,将返回 RDD 中的元素。在以下示例中,我们从运算符导入包 add,并将其应用于“num”以执行简单的加法操作。

After performing the specified commutative and associative binary operation, the element in the RDD is returned. In the following example, we are importing add package from the operator and applying it on ‘num’ to carry out a simple addition operation.

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

Command − reduce(f) 命令为 −

Command − The command for reduce(f) is −

$SPARK_HOME/bin/spark-submit reduce.py

Output − 上述命令的输出为 −

Output − The output of the above command is −

Adding all the elements -> 15

join(other, numPartitions = None)

它返回 RDD 以及一对具有匹配键和该特定键的所有值的元素。在以下示例中,在两个不同的 RDD 中有两对元素。在连接这两个 RDD 后,我们将获得一个 RDD,其中包含具有匹配键及其值的元素。

It returns RDD with a pair of elements with the matching keys and all the values for that particular key. In the following example, there are two pair of elements in two different RDDs. After joining these two RDDs, we get an RDD with elements having matching keys and their values.

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

Command − join(other, numPartitions = None) 命令为 −

Command − The command for join(other, numPartitions = None) is −

$SPARK_HOME/bin/spark-submit join.py

Output − 以上命令的输出为−

Output − The output for the above command is −

Join RDD -> [
   ('spark', (1, 2)),
   ('hadoop', (4, 5))
]

cache()

使用默认存储级别 (MEMORY_ONLY) 使此 RDD 持久化。您还可以检查 RDD 是否已缓存。

Persist this RDD with the default storage level (MEMORY_ONLY). You can also check if the RDD is cached or not.

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Cache app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
words.cache()
caching = words.persist().is_cached
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

Command − cache() 命令为 −

Command − The command for cache() is −

$SPARK_HOME/bin/spark-submit cache.py

Output − 以上程序的输出为 −

Output − The output for the above program is −

Words got cached -> True

以下这些操作是 PySpark RDD 上执行的一些最重要的操作。

These were some of the most important operations that are done on PySpark RDD.