Pyspark 简明教程

PySpark - Quick Guide

PySpark – Introduction

在本章中,我们将了解什么是 Apache Spark 以及 PySpark 是如何开发的。

Spark – Overview

Apache Spark 是一个闪电般快速的实时处理框架。它执行内存计算以实时分析数据。它作为一个角色出现,因为 Apache Hadoop MapReduce 仅执行批处理并且缺乏实时处理功能。因此,引入了 Apache Spark,因为它可以实时执行流处理,并且还可以处理批处理。

除了实时和批处理之外,Apache Spark 还支持交互式查询和迭代算法。Apache Spark 有自己的集群管理器,它可以在其中托管其应用程序。它利用 Apache Hadoop 进行存储和处理。它使用 HDFS (Hadoop 分布式文件系统)进行存储,并且它也可以在 YARN 上运行 Spark 应用程序。

PySpark – Overview

Apache Spark 是用 Scala programming language 编写的。为了在 Spark 中支持 Python,Apache Spark 社区发布了一个工具 PySpark。使用 PySpark,您还可以使用 Python 编程语言来处理 RDDs 。这是因为一个名为 Py4j 的库,它可以实现此功能。

PySpark 提供 PySpark Shell ,它将 Python API 链接到 Spark 核心并初始化 Spark 上下文。当今大多数数据科学家和分析专家由于其丰富的库集而使用 Python。将 Python 与 Spark 集成对他们来说是一个福音。

PySpark - Environment Setup

在本章中,我们将了解 PySpark 的环境设置。

Note − 这里考虑的是在计算机上安装 Java 和 Scala。

现在,让我们按照以下步骤下载并设置 PySpark。

Step 1 − 转到官方 Apache Spark download 页面并下载那里提供的最新版本的 Apache Spark。在本教程中,我们使用 spark-2.1.0-bin-hadoop2.7

Step 2 − 现在,解压下载的 Spark tar 文件。默认情况下,它将被下载到下载目录。

# tar -xvf Downloads/spark-2.1.0-bin-hadoop2.7.tgz

它将创建一个目录 spark-2.1.0-bin-hadoop2.7 。在启动 PySpark 之前,需要设置以下环境以设置 Spark 路径和 Py4j path

export SPARK_HOME = /home/hadoop/spark-2.1.0-bin-hadoop2.7
export PATH = $PATH:/home/hadoop/spark-2.1.0-bin-hadoop2.7/bin
export PYTHONPATH = $SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
export PATH = $SPARK_HOME/python:$PATH

或者,要在全局范围内设置以上环境,请将它们放在 .bashrc file 中。然后运行以下命令以使环境生效。

# source .bashrc

现在,我们已设置所有环境,让我们转到 Spark 目录并通过运行以下命令调用 PySpark shell −

# ./bin/pyspark

这将启动 PySpark shell。

Python 2.7.12 (default, Nov 19 2016, 06:48:10)
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.
<<<

PySpark - SparkContext

SparkContext 是进入任何 spark 功能的入口点。当我们运行任何 Spark 应用程序时,将启动一个驱动程序,它有 main 函数,并且 SparkContext 在此初始化。然后,驱动程序在工作节点上的 executor 中运行操作。

SparkContext 使用 Py4J 启动 JVM 并创建一个 JavaSparkContext 。默认情况下,PySpark 将 SparkContext 设置为 ‘sc’ ,因此创建新的 SparkContext 将不起作用。

sparkcontext

以下代码块包含 PySpark 类和其他参数的详细信息,SparkContext 可以采用。

class pyspark.SparkContext (
   master = None,
   appName = None,
   sparkHome = None,
   pyFiles = None,
   environment = None,
   batchSize = 0,
   serializer = PickleSerializer(),
   conf = None,
   gateway = None,
   jsc = None,
   profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
)

Parameters

以下是 SparkContext 的参数。

  1. Master − 这是其连接到的集群的 URL。

  2. appName − 工作的名称。

  3. sparkHome − Spark 安装目录。

  4. pyFiles − 发送到集群并添加到 PYTHONPATH 中的 .zip 或 .py 文件。

  5. Environment − 工作器节点的环境变量。

  6. batchSize − 表示为单个 Java 对象的 Python 对象数。设置为 1 以禁用批处理,设置为 0 以根据对象大小自动选择批量大小,或设置为 -1 以使用无限批量大小。

  7. Serializer − RDD serializer.

  8. Conf − L{SparkConf} 的一个对象,以设置所有 Spark 属性。

  9. Gateway − 使用现有的网关和 JVM,否则初始化新的 JVM。

  10. JSC − JavaSparkContext 实例。

  11. profiler_cls − 用于进行分析的自定义 Profiler 类(默认值为 pyspark.profiler.BasicProfiler)。

在上述参数中, masterappname 使用最为频繁。PySpark 程序的前两行看起来如下所示 −

from pyspark import SparkContext
sc = SparkContext("local", "First App")

SparkContext Example – PySpark Shell

现在,您对 SparkContext 已有了足够的了解,让我们在 PySpark Shell 中运行一个简单的示例。在此示例中,我们将统计 README.md 文件中包含字符“a”或“b”的行数。因此,假设一个文件中共有 5 行,其中 3 行包含字符“a”,则输出将为 → Line with a: 3 。针对字符“b”也将执行相同操作。

Note − 我们不会在以下示例中创建任何 SparkContext 对象,因为默认情况下,PySpark shell 启动时,Spark 将自动创建一个名为 sc 的 SparkContext 对象。如果您尝试创建另一个 SparkContext 对象,您将收到以下错误 – "ValueError: Cannot run multiple SparkContexts at once".

pyspark shell
<<< logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
<<< logData = sc.textFile(logFile).cache()
<<< numAs = logData.filter(lambda s: 'a' in s).count()
<<< numBs = logData.filter(lambda s: 'b' in s).count()
<<< print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
Lines with a: 62, lines with b: 30

SparkContext Example - Python Program

让我们使用 Python 程序运行相同的示例。创建一个名为 firstapp.py 的 Python 文件,然后在该文件中输入以下代码。

----------------------------------------firstapp.py---------------------------------------
from pyspark import SparkContext
logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
sc = SparkContext("local", "first app")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
----------------------------------------firstapp.py---------------------------------------

然后,我们在终端中执行以下命令以运行该 Python 文件。我们将得到与上面相同的输出。

$SPARK_HOME/bin/spark-submit firstapp.py
Output: Lines with a: 62, lines with b: 30

PySpark - RDD

现在,我们已安装并在系统上配置了 PySpark,我们可以使用 Python 在 Apache Spark 上进行编程。但在开始之前,让我们了解 Spark 中的一个基本概念 - RDD。

RDD 代表 Resilient Distributed Dataset ,它们是运行在多个节点上并对其进行操作以在集群上进行并行处理的元素。RDD 是不可变元素,这意味着一旦创建了 RDD,您便无法对其进行修改。RDD 也具有容错能力,因此在发生任何故障时,它们会自动恢复。您可以对这些 RDD 执行多个操作以实现特定任务。

要对这些 RDD 执行操作,有两种方法 −

  1. Transformation and

  2. Action

让我们详细了解这两种方法。

Transformation − 这些操作可应用于 RDD 以创建新的 RDD。Filter、groupBy 和映射都是变换类型的示例。

Action − 这些操作应用于 RDD,指示 Spark 执行计算并将结果发回驱动程序。

在 PySpark 中应用任何操作时,我们需要首先创建一个 PySpark RDD 。以下代码块详细说明了 PySpark RDD 类 −

class pyspark.RDD (
   jrdd,
   ctx,
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

让我们看看如何使用 PySpark 运行一些基本操作。Python 文件中的以下代码创建 RDD words,用于存储一组所提到的单词。

words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)

现在我们将在 words 上运行一些操作。

count()

返回 RDD 中元素的数量。

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

Command − count() 命令为 −

$SPARK_HOME/bin/spark-submit count.py

Output − 以上命令的输出为−

Number of elements in RDD → 8

collect()

返回 RDD 中的所有元素。

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Command − collect() 命令为 −

$SPARK_HOME/bin/spark-submit collect.py

Output − 以上命令的输出为−

Elements in RDD -> [
   'scala',
   'java',
   'hadoop',
   'spark',
   'akka',
   'spark vs hadoop',
   'pyspark',
   'pyspark and spark'
]

foreach(f)

仅返回满足 foreach 中函数条件的那些元素。在以下示例中,我们在 foreach 中调用打印函数,该函数打印 RDD 中的所有元素。

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f)
----------------------------------------foreach.py---------------------------------------

Command − foreach(f) 命令为 −

$SPARK_HOME/bin/spark-submit foreach.py

Output − 以上命令的输出为−

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

filter(f)

返回包含满足 filter 中函数要求的元素的新 RDD。在以下示例中,我们过滤掉包含“spark”的字符串。

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Command − filter(f) 命令为 −

$SPARK_HOME/bin/spark-submit filter.py

Output − 以上命令的输出为−

Fitered RDD -> [
   'spark',
   'spark vs hadoop',
   'pyspark',
   'pyspark and spark'
]

map(f, preservesPartitioning = False)

通过将函数应用于 RDD 中的每个元素来返回新 RDD。在以下示例中,我们形成一对键值并使用值 1 映射每个字符串。

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

Command − map(f, preservesPartitioning=False) 命令为 −

$SPARK_HOME/bin/spark-submit map.py

Output − 上述命令的输出为 −

Key value pair -> [
   ('scala', 1),
   ('java', 1),
   ('hadoop', 1),
   ('spark', 1),
   ('akka', 1),
   ('spark vs hadoop', 1),
   ('pyspark', 1),
   ('pyspark and spark', 1)
]

reduce(f)

在执行指定的交换和结合二元操作后,将返回 RDD 中的元素。在以下示例中,我们从运算符导入包 add,并将其应用于“num”以执行简单的加法操作。

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

Command − reduce(f) 命令为 −

$SPARK_HOME/bin/spark-submit reduce.py

Output − 上述命令的输出为 −

Adding all the elements -> 15

join(other, numPartitions = None)

它返回 RDD 以及一对具有匹配键和该特定键的所有值的元素。在以下示例中,在两个不同的 RDD 中有两对元素。在连接这两个 RDD 后,我们将获得一个 RDD,其中包含具有匹配键及其值的元素。

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

Command − join(other, numPartitions = None) 命令为 −

$SPARK_HOME/bin/spark-submit join.py

Output − 以上命令的输出为−

Join RDD -> [
   ('spark', (1, 2)),
   ('hadoop', (4, 5))
]

cache()

使用默认存储级别 (MEMORY_ONLY) 使此 RDD 持久化。您还可以检查 RDD 是否已缓存。

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Cache app")
words = sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
words.cache()
caching = words.persist().is_cached
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

Command − cache() 命令为 −

$SPARK_HOME/bin/spark-submit cache.py

Output − 以上程序的输出为 −

Words got cached -> True

以下这些操作是 PySpark RDD 上执行的一些最重要的操作。

PySpark - Broadcast & Accumulator

对于并行处理,Apache Spark 使用共享变量。当驱动程序将任务发送到集群上的执行程序时,共享变量的副本将转到集群的每个节点,以便可以将其用于执行任务。

Apache Spark 支持两种类型的共享变量−

  1. Broadcast

  2. Accumulator

我们详细了解它们。

Broadcast

广播变量用于在所有节点之间保存数据的副本。这个变量缓存在所有机器上,不发送给执行任务的机器。下面的代码块详细介绍了 PySpark 的 Broadcast 类的详细信息。

class pyspark.Broadcast (
   sc = None,
   value = None,
   pickle_registry = None,
   path = None
)

下面的示例展示了如何使用 Broadcast 变量。Broadcast 变量有一个名为 value 的属性,它存储数据并用于返回广播值。

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Broadcast app")
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
print "Stored data -> %s" % (data)
elem = words_new.value[2]
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command − 广播变量的命令如下 −

$SPARK_HOME/bin/spark-submit broadcast.py

Output − 以下命令的输出如下所示。

Stored data -> [
   'scala',
   'java',
   'hadoop',
   'spark',
   'akka'
]
Printing a particular element in RDD -> hadoop

Accumulator

累加器变量用于通过关联和交换运算汇聚信息。例如,你可以使用一个累加器进行求和运算或计数(在 MapReduce 中)。下面的代码块详细介绍了 PySpark 的 Accumulator 类的详细信息。

class pyspark.Accumulator(aid, value, accum_param)

下面的示例展示了如何使用累加器变量。累加器变量有一个名为 value 的属性,它类似于广播变量所具有的属性。它存储数据并用于返回累加器的值,但仅在驱动程序中可用。

在这个示例中,累加器变量被多个工作人员使用,并返回一个累加值。

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Accumulator app")
num = sc.accumulator(10)
def f(x):
   global num
   num+=x
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
final = num.value
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command − 累加器变量的命令如下 −

$SPARK_HOME/bin/spark-submit accumulator.py

Output − 以上命令的输出如下所示。

Accumulated value is -> 150

PySpark - SparkConf

要在本地/集群上运行 Spark 应用程序,您需要设置一些配置和参数,SparkConf 的作用正是如此。它提供用于运行 Spark 应用程序的配置。以下代码块包含 PySpark 中 SparkConf 类的详细信息。

class pyspark.SparkConf (
   loadDefaults = True,
   _jvm = None,
   _jconf = None
)

最初,我们将使用 SparkConf() 创建一个 SparkConf 对象,它也将加载 spark. * Java 系统属性的值。现在,您可以使用 SparkConf 对象设置不同的参数,而它们的优先级将高于系统属性。

在 SparkConf 类中,有一些支持链接的 setter 方法。例如,您可以编写 conf.setAppName(“PySpark App”).setMaster(“local”) 。一旦我们将一个 SparkConf 对象传递给 Apache Spark,任何用户都无法修改它。

以下是 SparkConf 最常用的部分属性 −

  1. set(key, value) − 用于设置配置属性。

  2. setMaster(value) − 用于设置主 URL。

  3. setAppName(value) − 用于设置应用程序名称。

  4. get(key, defaultValue=None) − 获取某个键的配置值。

  5. setSparkHome(value) − 在工作程序节点上设置 Spark 安装路径。

我们考虑在 PySpark 程序中使用 SparkConf 的以下示例。在此示例中,我们设置 Spark 应用程序的名称为 PySpark App ,并将 Spark 应用程序的主 URL 设置为 → spark://master:7077

以下代码块在 Python 文件中添加时,会为运行 PySpark 应用程序设置基本配置。

---------------------------------------------------------------------------------------
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("PySpark App").setMaster("spark://master:7077")
sc = SparkContext(conf=conf)
---------------------------------------------------------------------------------------

PySpark - SparkFiles

在 Apache Spark 中,可以使用 sc.addFile 上传文件(sc 是您的默认 SparkContext)并使用 SparkFiles.get 获取工作程序上的路径。因此,SparkFiles 会解析通过 SparkContext.addFile() 添加的文件的路径。

SparkFiles 包含以下类方法−

  1. get(filename)

  2. getrootdirectory()

我们详细了解它们。

get(filename)

它指定通过 SparkContext.addFile() 添加的文件的路径。

getrootdirectory()

它指定包含通过 SparkContext.addFile() 添加的文件的根目录的路径。

----------------------------------------sparkfile.py------------------------------------
from pyspark import SparkContext
from pyspark import SparkFiles
finddistance = "/home/hadoop/examples_pyspark/finddistance.R"
finddistancename = "finddistance.R"
sc = SparkContext("local", "SparkFile App")
sc.addFile(finddistance)
print "Absolute Path -> %s" % SparkFiles.get(finddistancename)
----------------------------------------sparkfile.py------------------------------------

Command − 命令如下所示−

$SPARK_HOME/bin/spark-submit sparkfiles.py

Output − 以上命令的输出为−

Absolute Path ->
   /tmp/spark-f1170149-af01-4620-9805-f61c85fecee4/userFiles-641dfd0f-240b-4264-a650-4e06e7a57839/finddistance.R

PySpark - StorageLevel

StorageLevel 决定如何存储 RDD。在 Apache Spark 中,StorageLevel 决定 RDD 是应该存储在内存中,还是应该存储在磁盘上,或者同时存储在内存和磁盘上。它还决定是否对 RDD 进行序列化以及是否对 RDD 分区进行复制。

以下代码块包含 StorageLevel 的类定义 −

class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)

现在,要决定 RDD 的存储位置,有不同的存储级别,如下所示 −

  1. DISK_ONLY = StorageLevel(True, False, False, False, 1)

  2. DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)

  3. MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)

  4. MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)

  5. MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False, 1)

  6. MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)

  7. MEMORY_ONLY = StorageLevel(False, True, False, False, 1)

  8. MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)

  9. MEMORY_ONLY_SER = StorageLevel(False, True, False, False, 1)

  10. MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)

  11. OFF_HEAP = StorageLevel(True, True, True, False, 1)

我们考虑以下 StorageLevel 示例,其中我们使用存储级别 MEMORY_AND_DISK_2, ,这意味着 RDD 分区将具有 2 的副本。

------------------------------------storagelevel.py-------------------------------------
from pyspark import SparkContext
import pyspark
sc = SparkContext (
   "local",
   "storagelevel app"
)
rdd1 = sc.parallelize([1,2])
rdd1.persist( pyspark.StorageLevel.MEMORY_AND_DISK_2 )
rdd1.getStorageLevel()
print(rdd1.getStorageLevel())
------------------------------------storagelevel.py-------------------------------------

Command − 命令如下所示−

$SPARK_HOME/bin/spark-submit storagelevel.py

Output − 以上命令的输出如下 −

Disk Memory Serialized 2x Replicated

PySpark - MLlib

Apache Spark 提供了一个称为 MLlib 的机器学习 API。PySpark 在 Python 中也具有该机器学习 API。它支持不同种类的算法,如下所述 −

  1. mllib.classificationspark.mllib 包支持二元分类、多类分类和回归分析的各种方法。分类中一些最流行的算法包括 Random Forest, Naive Bayes, Decision Tree 等。

  2. mllib.clustering − 聚类是一个无监督的学习问题,你旨在根据相似性概念将实体子集彼此分组。

  3. mllib.fpm − 频繁模式匹配是挖掘频繁项、项集、子序列或其他子结构,它们通常是大规模数据集分析的第一步。多年来,这一直是数据挖掘中的一个活跃的研究课题。

  4. mllib.linalg − MLlib 线性代数实用程序。

  5. mllib.recommendation − 协同过滤通常用于推荐系统。这些技术旨在填补用户项关联矩阵的缺失项。

  6. spark.mllib − 它目前支持基于模型的协同过滤,其中用户和产品由一小组潜在因子描述,这些因子可用于预测缺失项。spark.mllib 使用交替最小二乘 (ALS) 算法来学习这些潜在因子。

  7. mllib.regression − 线性回归属于回归算法家族。回归的目标是找出变量之间的关系和依赖性。用于处理线性回归模型和模型摘要的接口类似于逻辑回归案例。

mllib 包中还有一些其他算法、类和函数。目前,让我们了解一下 pyspark.mllib 的演示。

以下示例使用 ALS 算法进行协同过滤,以构建推荐模型并根据训练数据对其进行评估。

Dataset used − test.data

1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,1,5.0
2,2,1.0
2,3,5.0
2,4,1.0
3,1,1.0
3,2,5.0
3,3,1.0
3,4,5.0
4,1,1.0
4,2,5.0
4,3,1.0
4,4,5.0
--------------------------------------recommend.py----------------------------------------
from __future__ import print_function
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
if __name__ == "__main__":
   sc = SparkContext(appName="Pspark mllib Example")
   data = sc.textFile("test.data")
   ratings = data.map(lambda l: l.split(','))\
      .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

   # Build the recommendation model using Alternating Least Squares
   rank = 10
   numIterations = 10
   model = ALS.train(ratings, rank, numIterations)

   # Evaluate the model on training data
   testdata = ratings.map(lambda p: (p[0], p[1]))
   predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
   ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
   MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
   print("Mean Squared Error = " + str(MSE))

   # Save and load model
   model.save(sc, "target/tmp/myCollaborativeFilter")
   sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
--------------------------------------recommend.py----------------------------------------

Command − 命令如下 −

$SPARK_HOME/bin/spark-submit recommend.py

Output − 以上命令的输出为 −

Mean Squared Error = 1.20536041839e-05

PySpark - Serializers

序列化用于在 Apache Spark 上进行性能优化。所有通过网络发送、写入磁盘或持久存储在内存中的数据应进行序列化。序列化在代价高昂的操作中起着重要作用。

PySpark 支持在性能优化中使用自定义序列化程序。PySpark 支持以下两种序列化程序:

MarshalSerializer

使用 Python 的 Marshal 序列化程序序列化对象。该序列化程序比 PickleSerializer 速度快,但支持的数据类型较少。

class pyspark.MarshalSerializer

PickleSerializer

使用 Python 的 Pickle 序列化程序序列化对象。该序列化程序支持几乎任何 Python 对象,但可能没有其他专门序列化程序那么快。

class pyspark.PickleSerializer

让我们看看 PySpark 序列化的示例。在此,我们使用 MarshalSerializer 序列化数据。

--------------------------------------serializing.py-------------------------------------
from pyspark.context import SparkContext
from pyspark.serializers import MarshalSerializer
sc = SparkContext("local", "serialization app", serializer = MarshalSerializer())
print(sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10))
sc.stop()
--------------------------------------serializing.py-------------------------------------

Command − 命令如下所示−

$SPARK_HOME/bin/spark-submit serializing.py

Output − 上述命令的输出为 −

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]