Pyspark 简明教程

PySpark - SparkContext

SparkContext 是进入任何 spark 功能的入口点。当我们运行任何 Spark 应用程序时,将启动一个驱动程序,它有 main 函数,并且 SparkContext 在此初始化。然后,驱动程序在工作节点上的 executor 中运行操作。

SparkContext is the entry point to any spark functionality. When we run any Spark application, a driver program starts, which has the main function and your SparkContext gets initiated here. The driver program then runs the operations inside the executors on worker nodes.

SparkContext 使用 Py4J 启动 JVM 并创建一个 JavaSparkContext 。默认情况下,PySpark 将 SparkContext 设置为 ‘sc’ ,因此创建新的 SparkContext 将不起作用。

SparkContext uses Py4J to launch a JVM and creates a JavaSparkContext. By default, PySpark has SparkContext available as ‘sc’, so creating a new SparkContext won’t work.

sparkcontext

以下代码块包含 PySpark 类和其他参数的详细信息,SparkContext 可以采用。

The following code block has the details of a PySpark class and the parameters, which a SparkContext can take.

class pyspark.SparkContext (
   master = None,
   appName = None,
   sparkHome = None,
   pyFiles = None,
   environment = None,
   batchSize = 0,
   serializer = PickleSerializer(),
   conf = None,
   gateway = None,
   jsc = None,
   profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
)

Parameters

以下是 SparkContext 的参数。

Following are the parameters of a SparkContext.

  1. Master − It is the URL of the cluster it connects to.

  2. appName − Name of your job.

  3. sparkHome − Spark installation directory.

  4. pyFiles − The .zip or .py files to send to the cluster and add to the PYTHONPATH.

  5. Environment − Worker nodes environment variables.

  6. batchSize − The number of Python objects represented as a single Java object. Set 1 to disable batching, 0 to automatically choose the batch size based on object sizes, or -1 to use an unlimited batch size.

  7. Serializer − RDD serializer.

  8. Conf − An object of L{SparkConf} to set all the Spark properties.

  9. Gateway − Use an existing gateway and JVM, otherwise initializing a new JVM.

  10. JSC − The JavaSparkContext instance.

  11. profiler_cls − A class of custom Profiler used to do profiling (the default is pyspark.profiler.BasicProfiler).

在上述参数中, masterappname 使用最为频繁。PySpark 程序的前两行看起来如下所示 −

Among the above parameters, master and appname are mostly used. The first two lines of any PySpark program looks as shown below −

from pyspark import SparkContext
sc = SparkContext("local", "First App")

SparkContext Example – PySpark Shell

现在,您对 SparkContext 已有了足够的了解,让我们在 PySpark Shell 中运行一个简单的示例。在此示例中,我们将统计 README.md 文件中包含字符“a”或“b”的行数。因此,假设一个文件中共有 5 行,其中 3 行包含字符“a”,则输出将为 → Line with a: 3 。针对字符“b”也将执行相同操作。

Now that you know enough about SparkContext, let us run a simple example on PySpark shell. In this example, we will be counting the number of lines with character 'a' or 'b' in the README.md file. So, let us say if there are 5 lines in a file and 3 lines have the character 'a', then the output will be → Line with a: 3. Same will be done for character ‘b’.

Note − 我们不会在以下示例中创建任何 SparkContext 对象,因为默认情况下,PySpark shell 启动时,Spark 将自动创建一个名为 sc 的 SparkContext 对象。如果您尝试创建另一个 SparkContext 对象,您将收到以下错误 – "ValueError: Cannot run multiple SparkContexts at once".

Note − We are not creating any SparkContext object in the following example because by default, Spark automatically creates the SparkContext object named sc, when PySpark shell starts. In case you try to create another SparkContext object, you will get the following error – "ValueError: Cannot run multiple SparkContexts at once".

pyspark shell
<<< logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
<<< logData = sc.textFile(logFile).cache()
<<< numAs = logData.filter(lambda s: 'a' in s).count()
<<< numBs = logData.filter(lambda s: 'b' in s).count()
<<< print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
Lines with a: 62, lines with b: 30

SparkContext Example - Python Program

让我们使用 Python 程序运行相同的示例。创建一个名为 firstapp.py 的 Python 文件,然后在该文件中输入以下代码。

Let us run the same example using a Python program. Create a Python file called firstapp.py and enter the following code in that file.

----------------------------------------firstapp.py---------------------------------------
from pyspark import SparkContext
logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
sc = SparkContext("local", "first app")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
----------------------------------------firstapp.py---------------------------------------

然后,我们在终端中执行以下命令以运行该 Python 文件。我们将得到与上面相同的输出。

Then we will execute the following command in the terminal to run this Python file. We will get the same output as above.

$SPARK_HOME/bin/spark-submit firstapp.py
Output: Lines with a: 62, lines with b: 30