Pyspark 简明教程
PySpark - SparkContext
SparkContext 是进入任何 spark 功能的入口点。当我们运行任何 Spark 应用程序时,将启动一个驱动程序,它有 main 函数,并且 SparkContext 在此初始化。然后,驱动程序在工作节点上的 executor 中运行操作。
SparkContext 使用 Py4J 启动 JVM 并创建一个 JavaSparkContext 。默认情况下,PySpark 将 SparkContext 设置为 ‘sc’ ,因此创建新的 SparkContext 将不起作用。
以下代码块包含 PySpark 类和其他参数的详细信息,SparkContext 可以采用。
class pyspark.SparkContext (
master = None,
appName = None,
sparkHome = None,
pyFiles = None,
environment = None,
batchSize = 0,
serializer = PickleSerializer(),
conf = None,
gateway = None,
jsc = None,
profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
)
Parameters
以下是 SparkContext 的参数。
-
Master − 这是其连接到的集群的 URL。
-
appName − 工作的名称。
-
sparkHome − Spark 安装目录。
-
pyFiles − 发送到集群并添加到 PYTHONPATH 中的 .zip 或 .py 文件。
-
Environment − 工作器节点的环境变量。
-
batchSize − 表示为单个 Java 对象的 Python 对象数。设置为 1 以禁用批处理,设置为 0 以根据对象大小自动选择批量大小,或设置为 -1 以使用无限批量大小。
-
Serializer − RDD serializer.
-
Conf − L{SparkConf} 的一个对象,以设置所有 Spark 属性。
-
Gateway − 使用现有的网关和 JVM,否则初始化新的 JVM。
-
JSC − JavaSparkContext 实例。
-
profiler_cls − 用于进行分析的自定义 Profiler 类(默认值为 pyspark.profiler.BasicProfiler)。
在上述参数中, master 和 appname 使用最为频繁。PySpark 程序的前两行看起来如下所示 −
from pyspark import SparkContext
sc = SparkContext("local", "First App")
SparkContext Example – PySpark Shell
现在,您对 SparkContext 已有了足够的了解,让我们在 PySpark Shell 中运行一个简单的示例。在此示例中,我们将统计 README.md 文件中包含字符“a”或“b”的行数。因此,假设一个文件中共有 5 行,其中 3 行包含字符“a”,则输出将为 → Line with a: 3 。针对字符“b”也将执行相同操作。
Note − 我们不会在以下示例中创建任何 SparkContext 对象,因为默认情况下,PySpark shell 启动时,Spark 将自动创建一个名为 sc 的 SparkContext 对象。如果您尝试创建另一个 SparkContext 对象,您将收到以下错误 – "ValueError: Cannot run multiple SparkContexts at once".
<<< logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
<<< logData = sc.textFile(logFile).cache()
<<< numAs = logData.filter(lambda s: 'a' in s).count()
<<< numBs = logData.filter(lambda s: 'b' in s).count()
<<< print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
Lines with a: 62, lines with b: 30
SparkContext Example - Python Program
让我们使用 Python 程序运行相同的示例。创建一个名为 firstapp.py 的 Python 文件,然后在该文件中输入以下代码。
----------------------------------------firstapp.py---------------------------------------
from pyspark import SparkContext
logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
sc = SparkContext("local", "first app")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
----------------------------------------firstapp.py---------------------------------------
然后,我们在终端中执行以下命令以运行该 Python 文件。我们将得到与上面相同的输出。
$SPARK_HOME/bin/spark-submit firstapp.py
Output: Lines with a: 62, lines with b: 30