Apache Spark 简明教程

Apache Spark - Deployment

Spark 应用程序使用 spark-submit,这是一个 shell 命令,用于在群集上部署 Spark 应用程序。它通过统一的界面,使用所有各自的群集管理器。因此,你无需为每个管理器配置应用程序。

Example

我们使用 shell 命令,来看看之前使用的单词计数的相同示例。在这里,我们考虑相同的示例作为 Spark 应用程序。

Sample Input

以下文本是输入数据,该文件名为 in.txt

people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful  as they love,
as they care as they share.

查看以下程序:

SparkWordCount.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._

object SparkWordCount {
   def main(args: Array[String]) {

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())

      /* local = master URL; Word Count = application name; */
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
      /* Map = variables to work nodes */
      /*creating an inputRDD to read text file (in.txt) through Spark context*/
      val input = sc.textFile("in.txt")
      /* Transform the inputRDD into countRDD */

      val count = input.flatMap(line ⇒ line.split(" "))
      .map(word ⇒ (word, 1))
      .reduceByKey(_ + _)

      /* saveAsTextFile method is an action that effects on the RDD */
      count.saveAsTextFile("outfile")
      System.out.println("OK");
   }
}

将上述程序保存在名为 SparkWordCount.scala 的文件中,并将其放置在名为 spark-application 的用户定义目录中。

Note - 将 inputRDD 转换为 countRDD 时,我们使用 flatMap() 对文本文件中的行进行标记化,使用 map() 方法计算单词频率,并使用 reduceByKey() 方法计算每个单词的重复次数。

使用以下步骤提交该应用程序。通过终端执行 spark-application 目录中的所有步骤。

Step 1: Download Spark Ja

编译需要 Spark 核心 jar,因此,请从以下链接下载 spark-core_2.10-1.3.0.jar Spark core jar ,并将 jar 文件从下载目录移动到 spark-application 目录。

Step 2: Compile program

使用以下命令编译上面的程序。该命令应从 spark-application 目录执行。在这里, /usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar 是从 Spark 库获取的 Hadoop 支持 jar。

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

Step 3: Create a JAR

使用以下命令创建 spark 应用程序的 jar 文件。在这里, wordcount 是 jar 文件的文件名。

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

Step 4: Submit spark application

使用以下命令提交 spark 应用程序:

spark-submit --class SparkWordCount --master local wordcount.jar

如果执行成功,你将找到给定的以下输出。以下输出中出现的 OK 是为了识别用户,这是程序的最后一行。如果你仔细阅读以下输出,你将发现不同信息,例如:

  1. 在端口 42954 上成功启动服务 'sparkDriver'

  2. MemoryStore 已启动,容量为 267.3 MB

  3. Started SparkUI at [role="bare"]http://192.168.1.217:4040

  4. Added JAR file:/home/hadoop/piapplication/count.jar

  5. ResultStage 1(SparkPi.scala:11 处的 saveAsTextFile)在 0.566 秒内完成

  6. 已在 [role="bare"] [role="bare"]http://192.168.1.217:4040 处停止 Spark 网络 UI

  7. MemoryStore cleared

15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

Step 5: Checking output

程序成功执行后,你将在 spark-application 目录中找到名为 outfile 的目录。

以下命令用于打开和检查 outfile 目录中的文件列表。

$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS

检查 part-00000 文件中输出的命令为 −

$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)

检查 part-00001 文件中输出的命令为 −

$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)

转到以下部分详细了解 'spark-submit' 命令。

Spark-submit Syntax

spark-submit [options] <app jar | python file> [app arguments]

Options