Apache Spark 简明教程
Apache Spark - Deployment
Spark 应用程序使用 spark-submit,这是一个 shell 命令,用于在群集上部署 Spark 应用程序。它通过统一的界面,使用所有各自的群集管理器。因此,你无需为每个管理器配置应用程序。
Example
我们使用 shell 命令,来看看之前使用的单词计数的相同示例。在这里,我们考虑相同的示例作为 Spark 应用程序。
Sample Input
以下文本是输入数据,该文件名为 in.txt 。
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
查看以下程序:
SparkWordCount.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")
/* Transform the inputRDD into countRDD */
val count = input.flatMap(line ⇒ line.split(" "))
.map(word ⇒ (word, 1))
.reduceByKey(_ + _)
/* saveAsTextFile method is an action that effects on the RDD */
count.saveAsTextFile("outfile")
System.out.println("OK");
}
}
将上述程序保存在名为 SparkWordCount.scala 的文件中,并将其放置在名为 spark-application 的用户定义目录中。
Note - 将 inputRDD 转换为 countRDD 时,我们使用 flatMap() 对文本文件中的行进行标记化,使用 map() 方法计算单词频率,并使用 reduceByKey() 方法计算每个单词的重复次数。
使用以下步骤提交该应用程序。通过终端执行 spark-application 目录中的所有步骤。
Step 1: Download Spark Ja
编译需要 Spark 核心 jar,因此,请从以下链接下载 spark-core_2.10-1.3.0.jar Spark core jar ,并将 jar 文件从下载目录移动到 spark-application 目录。
Step 2: Compile program
使用以下命令编译上面的程序。该命令应从 spark-application 目录执行。在这里, /usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar 是从 Spark 库获取的 Hadoop 支持 jar。
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
Step 3: Create a JAR
使用以下命令创建 spark 应用程序的 jar 文件。在这里, wordcount 是 jar 文件的文件名。
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
Step 4: Submit spark application
使用以下命令提交 spark 应用程序:
spark-submit --class SparkWordCount --master local wordcount.jar
如果执行成功,你将找到给定的以下输出。以下输出中出现的 OK 是为了识别用户,这是程序的最后一行。如果你仔细阅读以下输出,你将发现不同信息,例如:
-
在端口 42954 上成功启动服务 'sparkDriver'
-
MemoryStore 已启动,容量为 267.3 MB
-
Started SparkUI at [role="bare"]http://192.168.1.217:4040
-
Added JAR file:/home/hadoop/piapplication/count.jar
-
ResultStage 1(SparkPi.scala:11 处的 saveAsTextFile)在 0.566 秒内完成
-
已在 [role="bare"] [role="bare"]http://192.168.1.217:4040 处停止 Spark 网络 UI
-
MemoryStore cleared
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
Step 5: Checking output
程序成功执行后,你将在 spark-application 目录中找到名为 outfile 的目录。
以下命令用于打开和检查 outfile 目录中的文件列表。
$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS
检查 part-00000 文件中输出的命令为 −
$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
检查 part-00001 文件中输出的命令为 −
$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
转到以下部分详细了解 'spark-submit' 命令。