Pyspark 简明教程
PySpark – Introduction
在本章中,我们将了解什么是 Apache Spark 以及 PySpark 是如何开发的。
In this chapter, we will get ourselves acquainted with what Apache Spark is and how was PySpark developed.
Spark – Overview
Apache Spark 是一个闪电般快速的实时处理框架。它执行内存计算以实时分析数据。它作为一个角色出现,因为 Apache Hadoop MapReduce 仅执行批处理并且缺乏实时处理功能。因此,引入了 Apache Spark,因为它可以实时执行流处理,并且还可以处理批处理。
Apache Spark is a lightning fast real-time processing framework. It does in-memory computations to analyze data in real-time. It came into picture as Apache Hadoop MapReduce was performing batch processing only and lacked a real-time processing feature. Hence, Apache Spark was introduced as it can perform stream processing in real-time and can also take care of batch processing.
除了实时和批处理之外,Apache Spark 还支持交互式查询和迭代算法。Apache Spark 有自己的集群管理器,它可以在其中托管其应用程序。它利用 Apache Hadoop 进行存储和处理。它使用 HDFS (Hadoop 分布式文件系统)进行存储,并且它也可以在 YARN 上运行 Spark 应用程序。
Apart from real-time and batch processing, Apache Spark supports interactive queries and iterative algorithms also. Apache Spark has its own cluster manager, where it can host its application. It leverages Apache Hadoop for both storage and processing. It uses HDFS (Hadoop Distributed File system) for storage and it can run Spark applications on YARN as well.
PySpark – Overview
Apache Spark 是用 Scala programming language 编写的。为了在 Spark 中支持 Python,Apache Spark 社区发布了一个工具 PySpark。使用 PySpark,您还可以使用 Python 编程语言来处理 RDDs 。这是因为一个名为 Py4j 的库,它可以实现此功能。
Apache Spark is written in Scala programming language. To support Python with Spark, Apache Spark Community released a tool, PySpark. Using PySpark, you can work with RDDs in Python programming language also. It is because of a library called Py4j that they are able to achieve this.
PySpark 提供 PySpark Shell ,它将 Python API 链接到 Spark 核心并初始化 Spark 上下文。当今大多数数据科学家和分析专家由于其丰富的库集而使用 Python。将 Python 与 Spark 集成对他们来说是一个福音。
PySpark offers PySpark Shell which links the Python API to the spark core and initializes the Spark context. Majority of data scientists and analytics experts today use Python because of its rich library set. Integrating Python with Spark is a boon to them.
PySpark - Environment Setup
在本章中,我们将了解 PySpark 的环境设置。
In this chapter, we will understand the environment setup of PySpark.
Note − 这里考虑的是在计算机上安装 Java 和 Scala。
Note − This is considering that you have Java and Scala installed on your computer.
现在,让我们按照以下步骤下载并设置 PySpark。
Let us now download and set up PySpark with the following steps.
Step 1 − 转到官方 Apache Spark download 页面并下载那里提供的最新版本的 Apache Spark。在本教程中,我们使用 spark-2.1.0-bin-hadoop2.7 。
Step 1 − Go to the official Apache Spark download page and download the latest version of Apache Spark available there. In this tutorial, we are using spark-2.1.0-bin-hadoop2.7.
Step 2 − 现在,解压下载的 Spark tar 文件。默认情况下,它将被下载到下载目录。
Step 2 − Now, extract the downloaded Spark tar file. By default, it will get downloaded in Downloads directory.
# tar -xvf Downloads/spark-2.1.0-bin-hadoop2.7.tgz
它将创建一个目录 spark-2.1.0-bin-hadoop2.7 。在启动 PySpark 之前,需要设置以下环境以设置 Spark 路径和 Py4j path 。
It will create a directory spark-2.1.0-bin-hadoop2.7. Before starting PySpark, you need to set the following environments to set the Spark path and the Py4j path.
export SPARK_HOME = /home/hadoop/spark-2.1.0-bin-hadoop2.7
export PATH = $PATH:/home/hadoop/spark-2.1.0-bin-hadoop2.7/bin
export PYTHONPATH = $SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
export PATH = $SPARK_HOME/python:$PATH
或者,要在全局范围内设置以上环境,请将它们放在 .bashrc file 中。然后运行以下命令以使环境生效。
Or, to set the above environments globally, put them in the .bashrc file. Then run the following command for the environments to work.
# source .bashrc
现在,我们已设置所有环境,让我们转到 Spark 目录并通过运行以下命令调用 PySpark shell −
Now that we have all the environments set, let us go to Spark directory and invoke PySpark shell by running the following command −
# ./bin/pyspark
这将启动 PySpark shell。
This will start your PySpark shell.
Python 2.7.12 (default, Nov 19 2016, 06:48:10)
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.
<<<
PySpark - SparkContext
SparkContext 是进入任何 spark 功能的入口点。当我们运行任何 Spark 应用程序时,将启动一个驱动程序,它有 main 函数,并且 SparkContext 在此初始化。然后,驱动程序在工作节点上的 executor 中运行操作。
SparkContext is the entry point to any spark functionality. When we run any Spark application, a driver program starts, which has the main function and your SparkContext gets initiated here. The driver program then runs the operations inside the executors on worker nodes.
SparkContext 使用 Py4J 启动 JVM 并创建一个 JavaSparkContext 。默认情况下,PySpark 将 SparkContext 设置为 ‘sc’ ,因此创建新的 SparkContext 将不起作用。
SparkContext uses Py4J to launch a JVM and creates a JavaSparkContext. By default, PySpark has SparkContext available as ‘sc’, so creating a new SparkContext won’t work.

以下代码块包含 PySpark 类和其他参数的详细信息,SparkContext 可以采用。
The following code block has the details of a PySpark class and the parameters, which a SparkContext can take.
class pyspark.SparkContext (
master = None,
appName = None,
sparkHome = None,
pyFiles = None,
environment = None,
batchSize = 0,
serializer = PickleSerializer(),
conf = None,
gateway = None,
jsc = None,
profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
)
Parameters
以下是 SparkContext 的参数。
Following are the parameters of a SparkContext.
-
Master − It is the URL of the cluster it connects to.
-
appName − Name of your job.
-
sparkHome − Spark installation directory.
-
pyFiles − The .zip or .py files to send to the cluster and add to the PYTHONPATH.
-
Environment − Worker nodes environment variables.
-
batchSize − The number of Python objects represented as a single Java object. Set 1 to disable batching, 0 to automatically choose the batch size based on object sizes, or -1 to use an unlimited batch size.
-
Serializer − RDD serializer.
-
Conf − An object of L{SparkConf} to set all the Spark properties.
-
Gateway − Use an existing gateway and JVM, otherwise initializing a new JVM.
-
JSC − The JavaSparkContext instance.
-
profiler_cls − A class of custom Profiler used to do profiling (the default is pyspark.profiler.BasicProfiler).
在上述参数中, master 和 appname 使用最为频繁。PySpark 程序的前两行看起来如下所示 −
Among the above parameters, master and appname are mostly used. The first two lines of any PySpark program looks as shown below −
from pyspark import SparkContext
sc = SparkContext("local", "First App")
SparkContext Example – PySpark Shell
现在,您对 SparkContext 已有了足够的了解,让我们在 PySpark Shell 中运行一个简单的示例。在此示例中,我们将统计 README.md 文件中包含字符“a”或“b”的行数。因此,假设一个文件中共有 5 行,其中 3 行包含字符“a”,则输出将为 → Line with a: 3 。针对字符“b”也将执行相同操作。
Now that you know enough about SparkContext, let us run a simple example on PySpark shell. In this example, we will be counting the number of lines with character 'a' or 'b' in the README.md file. So, let us say if there are 5 lines in a file and 3 lines have the character 'a', then the output will be → Line with a: 3. Same will be done for character ‘b’.
Note − 我们不会在以下示例中创建任何 SparkContext 对象,因为默认情况下,PySpark shell 启动时,Spark 将自动创建一个名为 sc 的 SparkContext 对象。如果您尝试创建另一个 SparkContext 对象,您将收到以下错误 – "ValueError: Cannot run multiple SparkContexts at once".
Note − We are not creating any SparkContext object in the following example because by default, Spark automatically creates the SparkContext object named sc, when PySpark shell starts. In case you try to create another SparkContext object, you will get the following error – "ValueError: Cannot run multiple SparkContexts at once".

<<< logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
<<< logData = sc.textFile(logFile).cache()
<<< numAs = logData.filter(lambda s: 'a' in s).count()
<<< numBs = logData.filter(lambda s: 'b' in s).count()
<<< print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
Lines with a: 62, lines with b: 30
SparkContext Example - Python Program
让我们使用 Python 程序运行相同的示例。创建一个名为 firstapp.py 的 Python 文件,然后在该文件中输入以下代码。
Let us run the same example using a Python program. Create a Python file called firstapp.py and enter the following code in that file.
----------------------------------------firstapp.py---------------------------------------
from pyspark import SparkContext
logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
sc = SparkContext("local", "first app")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
----------------------------------------firstapp.py---------------------------------------
然后,我们在终端中执行以下命令以运行该 Python 文件。我们将得到与上面相同的输出。
Then we will execute the following command in the terminal to run this Python file. We will get the same output as above.
$SPARK_HOME/bin/spark-submit firstapp.py
Output: Lines with a: 62, lines with b: 30
PySpark - RDD
现在,我们已安装并在系统上配置了 PySpark,我们可以使用 Python 在 Apache Spark 上进行编程。但在开始之前,让我们了解 Spark 中的一个基本概念 - RDD。
Now that we have installed and configured PySpark on our system, we can program in Python on Apache Spark. However before doing so, let us understand a fundamental concept in Spark - RDD.
RDD 代表 Resilient Distributed Dataset ,它们是运行在多个节点上并对其进行操作以在集群上进行并行处理的元素。RDD 是不可变元素,这意味着一旦创建了 RDD,您便无法对其进行修改。RDD 也具有容错能力,因此在发生任何故障时,它们会自动恢复。您可以对这些 RDD 执行多个操作以实现特定任务。
RDD stands for Resilient Distributed Dataset, these are the elements that run and operate on multiple nodes to do parallel processing on a cluster. RDDs are immutable elements, which means once you create an RDD you cannot change it. RDDs are fault tolerant as well, hence in case of any failure, they recover automatically. You can apply multiple operations on these RDDs to achieve a certain task.
要对这些 RDD 执行操作,有两种方法 −
To apply operations on these RDD’s, there are two ways −
-
Transformation and
-
Action
让我们详细了解这两种方法。
Let us understand these two ways in detail.
Transformation − 这些操作可应用于 RDD 以创建新的 RDD。Filter、groupBy 和映射都是变换类型的示例。
Transformation − These are the operations, which are applied on a RDD to create a new RDD. Filter, groupBy and map are the examples of transformations.
Action − 这些操作应用于 RDD,指示 Spark 执行计算并将结果发回驱动程序。
Action − These are the operations that are applied on RDD, which instructs Spark to perform computation and send the result back to the driver.
在 PySpark 中应用任何操作时,我们需要首先创建一个 PySpark RDD 。以下代码块详细说明了 PySpark RDD 类 −
To apply any operation in PySpark, we need to create a PySpark RDD first. The following code block has the detail of a PySpark RDD Class −
class pyspark.RDD (
jrdd,
ctx,
jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)
让我们看看如何使用 PySpark 运行一些基本操作。Python 文件中的以下代码创建 RDD words,用于存储一组所提到的单词。
Let us see how to run a few basic operations using PySpark. The following code in a Python file creates RDD words, which stores a set of words mentioned.
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
现在我们将在 words 上运行一些操作。
We will now run a few operations on words.
count()
返回 RDD 中元素的数量。
Number of elements in the RDD is returned.
----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------
Command − count() 命令为 −
Command − The command for count() is −
$SPARK_HOME/bin/spark-submit count.py
Output − 以上命令的输出为−
Output − The output for the above command is −
Number of elements in RDD → 8
collect()
返回 RDD 中的所有元素。
All the elements in the RDD are returned.
----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------
Command − collect() 命令为 −
Command − The command for collect() is −
$SPARK_HOME/bin/spark-submit collect.py
Output − 以上命令的输出为−
Output − The output for the above command is −
Elements in RDD -> [
'scala',
'java',
'hadoop',
'spark',
'akka',
'spark vs hadoop',
'pyspark',
'pyspark and spark'
]
foreach(f)
仅返回满足 foreach 中函数条件的那些元素。在以下示例中,我们在 foreach 中调用打印函数,该函数打印 RDD 中的所有元素。
Returns only those elements which meet the condition of the function inside foreach. In the following example, we call a print function in foreach, which prints all the elements in the RDD.
----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f)
----------------------------------------foreach.py---------------------------------------
Command − foreach(f) 命令为 −
Command − The command for foreach(f) is −
$SPARK_HOME/bin/spark-submit foreach.py
Output − 以上命令的输出为−
Output − The output for the above command is −
scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark
filter(f)
返回包含满足 filter 中函数要求的元素的新 RDD。在以下示例中,我们过滤掉包含“spark”的字符串。
A new RDD is returned containing the elements, which satisfies the function inside the filter. In the following example, we filter out the strings containing ''spark".
----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------
Command − filter(f) 命令为 −
Command − The command for filter(f) is −
$SPARK_HOME/bin/spark-submit filter.py
Output − 以上命令的输出为−
Output − The output for the above command is −
Fitered RDD -> [
'spark',
'spark vs hadoop',
'pyspark',
'pyspark and spark'
]
map(f, preservesPartitioning = False)
通过将函数应用于 RDD 中的每个元素来返回新 RDD。在以下示例中,我们形成一对键值并使用值 1 映射每个字符串。
A new RDD is returned by applying a function to each element in the RDD. In the following example, we form a key value pair and map every string with a value of 1.
----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------
Command − map(f, preservesPartitioning=False) 命令为 −
Command − The command for map(f, preservesPartitioning=False) is −
$SPARK_HOME/bin/spark-submit map.py
Output − 上述命令的输出为 −
Output − The output of the above command is −
Key value pair -> [
('scala', 1),
('java', 1),
('hadoop', 1),
('spark', 1),
('akka', 1),
('spark vs hadoop', 1),
('pyspark', 1),
('pyspark and spark', 1)
]
reduce(f)
在执行指定的交换和结合二元操作后,将返回 RDD 中的元素。在以下示例中,我们从运算符导入包 add,并将其应用于“num”以执行简单的加法操作。
After performing the specified commutative and associative binary operation, the element in the RDD is returned. In the following example, we are importing add package from the operator and applying it on ‘num’ to carry out a simple addition operation.
----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------
Command − reduce(f) 命令为 −
Command − The command for reduce(f) is −
$SPARK_HOME/bin/spark-submit reduce.py
Output − 上述命令的输出为 −
Output − The output of the above command is −
Adding all the elements -> 15
join(other, numPartitions = None)
它返回 RDD 以及一对具有匹配键和该特定键的所有值的元素。在以下示例中,在两个不同的 RDD 中有两对元素。在连接这两个 RDD 后,我们将获得一个 RDD,其中包含具有匹配键及其值的元素。
It returns RDD with a pair of elements with the matching keys and all the values for that particular key. In the following example, there are two pair of elements in two different RDDs. After joining these two RDDs, we get an RDD with elements having matching keys and their values.
----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------
Command − join(other, numPartitions = None) 命令为 −
Command − The command for join(other, numPartitions = None) is −
$SPARK_HOME/bin/spark-submit join.py
Output − 以上命令的输出为−
Output − The output for the above command is −
Join RDD -> [
('spark', (1, 2)),
('hadoop', (4, 5))
]
cache()
使用默认存储级别 (MEMORY_ONLY) 使此 RDD 持久化。您还可以检查 RDD 是否已缓存。
Persist this RDD with the default storage level (MEMORY_ONLY). You can also check if the RDD is cached or not.
----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Cache app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words.cache()
caching = words.persist().is_cached
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------
Command − cache() 命令为 −
Command − The command for cache() is −
$SPARK_HOME/bin/spark-submit cache.py
Output − 以上程序的输出为 −
Output − The output for the above program is −
Words got cached -> True
以下这些操作是 PySpark RDD 上执行的一些最重要的操作。
These were some of the most important operations that are done on PySpark RDD.
PySpark - Broadcast & Accumulator
对于并行处理,Apache Spark 使用共享变量。当驱动程序将任务发送到集群上的执行程序时,共享变量的副本将转到集群的每个节点,以便可以将其用于执行任务。
For parallel processing, Apache Spark uses shared variables. A copy of shared variable goes on each node of the cluster when the driver sends a task to the executor on the cluster, so that it can be used for performing tasks.
Apache Spark 支持两种类型的共享变量−
There are two types of shared variables supported by Apache Spark −
-
Broadcast
-
Accumulator
我们详细了解它们。
Let us understand them in detail.
Broadcast
广播变量用于在所有节点之间保存数据的副本。这个变量缓存在所有机器上,不发送给执行任务的机器。下面的代码块详细介绍了 PySpark 的 Broadcast 类的详细信息。
Broadcast variables are used to save the copy of data across all nodes. This variable is cached on all the machines and not sent on machines with tasks. The following code block has the details of a Broadcast class for PySpark.
class pyspark.Broadcast (
sc = None,
value = None,
pickle_registry = None,
path = None
)
下面的示例展示了如何使用 Broadcast 变量。Broadcast 变量有一个名为 value 的属性,它存储数据并用于返回广播值。
The following example shows how to use a Broadcast variable. A Broadcast variable has an attribute called value, which stores the data and is used to return a broadcasted value.
----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Broadcast app")
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
print "Stored data -> %s" % (data)
elem = words_new.value[2]
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------
Command − 广播变量的命令如下 −
Command − The command for a broadcast variable is as follows −
$SPARK_HOME/bin/spark-submit broadcast.py
Output − 以下命令的输出如下所示。
Output − The output for the following command is given below.
Stored data -> [
'scala',
'java',
'hadoop',
'spark',
'akka'
]
Printing a particular element in RDD -> hadoop
Accumulator
累加器变量用于通过关联和交换运算汇聚信息。例如,你可以使用一个累加器进行求和运算或计数(在 MapReduce 中)。下面的代码块详细介绍了 PySpark 的 Accumulator 类的详细信息。
Accumulator variables are used for aggregating the information through associative and commutative operations. For example, you can use an accumulator for a sum operation or counters (in MapReduce). The following code block has the details of an Accumulator class for PySpark.
class pyspark.Accumulator(aid, value, accum_param)
下面的示例展示了如何使用累加器变量。累加器变量有一个名为 value 的属性,它类似于广播变量所具有的属性。它存储数据并用于返回累加器的值,但仅在驱动程序中可用。
The following example shows how to use an Accumulator variable. An Accumulator variable has an attribute called value that is similar to what a broadcast variable has. It stores the data and is used to return the accumulator’s value, but usable only in a driver program.
在这个示例中,累加器变量被多个工作人员使用,并返回一个累加值。
In this example, an accumulator variable is used by multiple workers and returns an accumulated value.
----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Accumulator app")
num = sc.accumulator(10)
def f(x):
global num
num+=x
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
final = num.value
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------
Command − 累加器变量的命令如下 −
Command − The command for an accumulator variable is as follows −
$SPARK_HOME/bin/spark-submit accumulator.py
Output − 以上命令的输出如下所示。
Output − The output for the above command is given below.
Accumulated value is -> 150
PySpark - SparkConf
要在本地/集群上运行 Spark 应用程序,您需要设置一些配置和参数,SparkConf 的作用正是如此。它提供用于运行 Spark 应用程序的配置。以下代码块包含 PySpark 中 SparkConf 类的详细信息。
To run a Spark application on the local/cluster, you need to set a few configurations and parameters, this is what SparkConf helps with. It provides configurations to run a Spark application. The following code block has the details of a SparkConf class for PySpark.
class pyspark.SparkConf (
loadDefaults = True,
_jvm = None,
_jconf = None
)
最初,我们将使用 SparkConf() 创建一个 SparkConf 对象,它也将加载 spark. * Java 系统属性的值。现在,您可以使用 SparkConf 对象设置不同的参数,而它们的优先级将高于系统属性。
Initially, we will create a SparkConf object with SparkConf(), which will load the values from spark.* Java system properties as well. Now you can set different parameters using the SparkConf object and their parameters will take priority over the system properties.
在 SparkConf 类中,有一些支持链接的 setter 方法。例如,您可以编写 conf.setAppName(“PySpark App”).setMaster(“local”) 。一旦我们将一个 SparkConf 对象传递给 Apache Spark,任何用户都无法修改它。
In a SparkConf class, there are setter methods, which support chaining. For example, you can write conf.setAppName(“PySpark App”).setMaster(“local”). Once we pass a SparkConf object to Apache Spark, it cannot be modified by any user.
以下是 SparkConf 最常用的部分属性 −
Following are some of the most commonly used attributes of SparkConf −
-
set(key, value) − To set a configuration property.
-
setMaster(value) − To set the master URL.
-
setAppName(value) − To set an application name.
-
get(key, defaultValue=None) − To get a configuration value of a key.
-
setSparkHome(value) − To set Spark installation path on worker nodes.
我们考虑在 PySpark 程序中使用 SparkConf
的以下示例。在此示例中,我们设置 Spark 应用程序的名称为 PySpark App ,并将 Spark 应用程序的主 URL 设置为 → spark://master:7077 。
Let us consider the following example of using SparkConf in a PySpark program. In this example, we are setting the spark application name as PySpark App and setting the master URL for a spark application to → spark://master:7077.
以下代码块在 Python 文件中添加时,会为运行 PySpark 应用程序设置基本配置。
The following code block has the lines, when they get added in the Python file, it sets the basic configurations for running a PySpark application.
---------------------------------------------------------------------------------------
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("PySpark App").setMaster("spark://master:7077")
sc = SparkContext(conf=conf)
---------------------------------------------------------------------------------------
PySpark - SparkFiles
在 Apache Spark 中,可以使用 sc.addFile 上传文件(sc
是您的默认 SparkContext
)并使用 SparkFiles.get 获取工作程序上的路径。因此,SparkFiles
会解析通过 SparkContext.addFile() 添加的文件的路径。
In Apache Spark, you can upload your files using sc.addFile (sc is your default SparkContext) and get the path on a worker using SparkFiles.get. Thus, SparkFiles resolve the paths to files added through SparkContext.addFile().
SparkFiles
包含以下类方法−
SparkFiles contain the following classmethods −
-
get(filename)
-
getrootdirectory()
我们详细了解它们。
Let us understand them in detail.
get(filename)
它指定通过 SparkContext.addFile()
添加的文件的路径。
It specifies the path of the file that is added through SparkContext.addFile().
getrootdirectory()
它指定包含通过 SparkContext.addFile()
添加的文件的根目录的路径。
It specifies the path to the root directory, which contains the file that is added through the SparkContext.addFile().
----------------------------------------sparkfile.py------------------------------------
from pyspark import SparkContext
from pyspark import SparkFiles
finddistance = "/home/hadoop/examples_pyspark/finddistance.R"
finddistancename = "finddistance.R"
sc = SparkContext("local", "SparkFile App")
sc.addFile(finddistance)
print "Absolute Path -> %s" % SparkFiles.get(finddistancename)
----------------------------------------sparkfile.py------------------------------------
Command − 命令如下所示−
Command − The command is as follows −
$SPARK_HOME/bin/spark-submit sparkfiles.py
Output − 以上命令的输出为−
Output − The output for the above command is −
Absolute Path ->
/tmp/spark-f1170149-af01-4620-9805-f61c85fecee4/userFiles-641dfd0f-240b-4264-a650-4e06e7a57839/finddistance.R
PySpark - StorageLevel
StorageLevel 决定如何存储 RDD。在 Apache Spark 中,StorageLevel 决定 RDD 是应该存储在内存中,还是应该存储在磁盘上,或者同时存储在内存和磁盘上。它还决定是否对 RDD 进行序列化以及是否对 RDD 分区进行复制。
StorageLevel decides how RDD should be stored. In Apache Spark, StorageLevel decides whether RDD should be stored in the memory or should it be stored over the disk, or both. It also decides whether to serialize RDD and whether to replicate RDD partitions.
以下代码块包含 StorageLevel 的类定义 −
The following code block has the class definition of a StorageLevel −
class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)
现在,要决定 RDD 的存储位置,有不同的存储级别,如下所示 −
Now, to decide the storage of RDD, there are different storage levels, which are given below −
-
DISK_ONLY = StorageLevel(True, False, False, False, 1)
-
DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
-
MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)
-
MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
-
MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False, 1)
-
MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)
-
MEMORY_ONLY = StorageLevel(False, True, False, False, 1)
-
MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
-
MEMORY_ONLY_SER = StorageLevel(False, True, False, False, 1)
-
MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)
-
OFF_HEAP = StorageLevel(True, True, True, False, 1)
我们考虑以下 StorageLevel 示例,其中我们使用存储级别 MEMORY_AND_DISK_2, ,这意味着 RDD 分区将具有 2 的副本。
Let us consider the following example of StorageLevel, where we use the storage level MEMORY_AND_DISK_2, which means RDD partitions will have replication of 2.
------------------------------------storagelevel.py-------------------------------------
from pyspark import SparkContext
import pyspark
sc = SparkContext (
"local",
"storagelevel app"
)
rdd1 = sc.parallelize([1,2])
rdd1.persist( pyspark.StorageLevel.MEMORY_AND_DISK_2 )
rdd1.getStorageLevel()
print(rdd1.getStorageLevel())
------------------------------------storagelevel.py-------------------------------------
Command − 命令如下所示−
Command − The command is as follows −
$SPARK_HOME/bin/spark-submit storagelevel.py
Output − 以上命令的输出如下 −
Output − The output for the above command is given below −
Disk Memory Serialized 2x Replicated
PySpark - MLlib
Apache Spark 提供了一个称为 MLlib 的机器学习 API。PySpark 在 Python 中也具有该机器学习 API。它支持不同种类的算法,如下所述 −
Apache Spark offers a Machine Learning API called MLlib. PySpark has this machine learning API in Python as well. It supports different kind of algorithms, which are mentioned below −
-
mllib.classification − The spark.mllib package supports various methods for binary classification, multiclass classification and regression analysis. Some of the most popular algorithms in classification are Random Forest, Naive Bayes, Decision Tree, etc.
-
mllib.clustering − Clustering is an unsupervised learning problem, whereby you aim to group subsets of entities with one another based on some notion of similarity.
-
mllib.fpm − Frequent pattern matching is mining frequent items, itemsets, subsequences or other substructures that are usually among the first steps to analyze a large-scale dataset. This has been an active research topic in data mining for years.
-
mllib.linalg − MLlib utilities for linear algebra.
-
mllib.recommendation − Collaborative filtering is commonly used for recommender systems. These techniques aim to fill in the missing entries of a user item association matrix.
-
spark.mllib − It ¬currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. spark.mllib uses the Alternating Least Squares (ALS) algorithm to learn these latent factors.
-
mllib.regression − Linear regression belongs to the family of regression algorithms. The goal of regression is to find relationships and dependencies between variables. The interface for working with linear regression models and model summaries is similar to the logistic regression case.
mllib 包中还有一些其他算法、类和函数。目前,让我们了解一下 pyspark.mllib 的演示。
There are other algorithms, classes and functions also as a part of the mllib package. As of now, let us understand a demonstration on pyspark.mllib.
以下示例使用 ALS 算法进行协同过滤,以构建推荐模型并根据训练数据对其进行评估。
The following example is of collaborative filtering using ALS algorithm to build the recommendation model and evaluate it on training data.
Dataset used − test.data
Dataset used − test.data
1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,1,5.0
2,2,1.0
2,3,5.0
2,4,1.0
3,1,1.0
3,2,5.0
3,3,1.0
3,4,5.0
4,1,1.0
4,2,5.0
4,3,1.0
4,4,5.0
--------------------------------------recommend.py----------------------------------------
from __future__ import print_function
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
if __name__ == "__main__":
sc = SparkContext(appName="Pspark mllib Example")
data = sc.textFile("test.data")
ratings = data.map(lambda l: l.split(','))\
.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)
# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))
# Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
--------------------------------------recommend.py----------------------------------------
Command − 命令如下 −
Command − The command will be as follows −
$SPARK_HOME/bin/spark-submit recommend.py
Output − 以上命令的输出为 −
Output − The output of the above command will be −
Mean Squared Error = 1.20536041839e-05
PySpark - Serializers
序列化用于在 Apache Spark 上进行性能优化。所有通过网络发送、写入磁盘或持久存储在内存中的数据应进行序列化。序列化在代价高昂的操作中起着重要作用。
Serialization is used for performance tuning on Apache Spark. All data that is sent over the network or written to the disk or persisted in the memory should be serialized. Serialization plays an important role in costly operations.
PySpark 支持在性能优化中使用自定义序列化程序。PySpark 支持以下两种序列化程序:
PySpark supports custom serializers for performance tuning. The following two serializers are supported by PySpark −
MarshalSerializer
使用 Python 的 Marshal 序列化程序序列化对象。该序列化程序比 PickleSerializer 速度快,但支持的数据类型较少。
Serializes objects using Python’s Marshal Serializer. This serializer is faster than PickleSerializer, but supports fewer datatypes.
class pyspark.MarshalSerializer
PickleSerializer
使用 Python 的 Pickle 序列化程序序列化对象。该序列化程序支持几乎任何 Python 对象,但可能没有其他专门序列化程序那么快。
Serializes objects using Python’s Pickle Serializer. This serializer supports nearly any Python object, but may not be as fast as more specialized serializers.
class pyspark.PickleSerializer
让我们看看 PySpark 序列化的示例。在此,我们使用 MarshalSerializer 序列化数据。
Let us see an example on PySpark serialization. Here, we serialize the data using MarshalSerializer.
--------------------------------------serializing.py-------------------------------------
from pyspark.context import SparkContext
from pyspark.serializers import MarshalSerializer
sc = SparkContext("local", "serialization app", serializer = MarshalSerializer())
print(sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10))
sc.stop()
--------------------------------------serializing.py-------------------------------------
Command − 命令如下所示−
Command − The command is as follows −
$SPARK_HOME/bin/spark-submit serializing.py
Output − 上述命令的输出为 −
Output − The output of the above command is −
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]