Apache Kafka 简明教程
Apache Kafka - Integration With Spark
在本章中,我们将讨论如何将 Apache Kafka 与 Spark Streaming API 集成。
In this chapter, we will be discussing about how to integrate Apache Kafka with Spark Streaming API.
About Spark
Spark Streaming API 能够对实时数据流进行可扩展、高吞吐量、容错的流处理。可以从多个源(如 Kafka、Flume、Twitter 等等)获取数据,并且可以使用诸如 map、reduce、join 和 window 等高级函数之类的复杂算法对数据进行处理。最后,可以将处理过的数据输出到文件系统、数据库和实时仪表板。弹性分布式数据集 (RDD) 是 Spark 的基本数据结构。它是对象的可变分布式集合。RDD 中的每个数据集都分成逻辑分区,这些分区可以在集群的不同节点上计算。
Spark Streaming API enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Twitter, etc., and can be processed using complex algorithms such as high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dash-boards. Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster.
Integration with Spark
Kafka 是 Spark 流处理的潜在消息和集成平台。Kafka 充当实时数据流的中央枢纽,并在 Spark Streaming 中使用复杂算法对这些数据流进行处理。一旦对数据进行了处理,Spark Streaming 便可以将结果发布到另一个 Kafka 主题中,或者将其存储在 HDFS、数据库或仪表板中。下图描绘了这个概念性的流程。
Kafka is a potential messaging and integration platform for Spark streaming. Kafka act as the central hub for real-time streams of data and are processed using complex algorithms in Spark Streaming. Once the data is processed, Spark Streaming could be publishing results into yet another Kafka topic or store in HDFS, databases or dashboards. The following diagram depicts the conceptual flow.
现在,让我们详细了解一下 Kafka-Spark API。
Now, let us go through Kafka-Spark API’s in detail.
SparkConf API
它表示针对 Spark 应用程序的配置。用于以键值对形式设置各种 Spark 参数。
It represents configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
SparkConf 类有以下方法:
SparkConf class has the following methods −
-
set(string key, string value) − set configuration variable.
-
remove(string key) − remove key from the configuration.
-
setAppName(string name) − set application name for your application.
-
get(string key) − get key
StreamingContext API
这是 Spark 功能的主要入口点。SparkContext 表示与 Spark 集群的连接,并且可以用它在集群上创建 RDD、累加器和广播变量。签名定义如下所示。
This is the main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on the cluster. The signature is defined as shown below.
public StreamingContext(String master, String appName, Duration batchDuration,
String sparkHome, scala.collection.Seq<String> jars,
scala.collection.Map<String,String> environment)
-
master − cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
-
appName − a name for your job, to display on the cluster web UI
-
batchDuration − the time interval at which streaming data will be divided into batches
public StreamingContext(SparkConf conf, Duration batchDuration)
通过为新的 SparkContext 提供必要的配置来创建一个 StreamingContext。
Create a StreamingContext by providing the configuration necessary for a new SparkContext.
-
conf − Spark parameters
-
batchDuration − the time interval at which streaming data will be divided into batches
KafkaUtils API
KafkaUtils API 用于将 Kafka 集群连接到 Spark 流处理。此 API 有重要的 createStream 签名方法,其定义如下。
KafkaUtils API is used to connect the Kafka cluster to Spark streaming. This API has the signifi-cant method createStream signature defined as below.
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
StreamingContext ssc, String zkQuorum, String groupId,
scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
上述方法用于创建一个输入流,从 Kafka 代理中提取消息。
The above shown method is used to Create an input stream that pulls messages from Kafka Brokers.
-
ssc − StreamingContext object.
-
zkQuorum − Zookeeper quorum.
-
groupId − The group id for this consumer.
-
topics − return a map of topics to consume.
-
storageLevel − Storage level to use for storing the received objects.
KafkaUtils API 具有另一个方法 createDirectStream,该方法用于创建输入流,直接从 Kafka 代理中提取消息,而不使用任何接收器。此流可以保证每个来自 Kafka 的消息都恰好包含在一次转换中。
KafkaUtils API has another method createDirectStream, which is used to create an input stream that directly pulls messages from Kafka Brokers without using any receiver. This stream can guarantee that each message from Kafka is included in transformations exactly once.
示例应用程序以 Scala 编写。要编译应用程序,请下载并安装 sbt(类似于 maven)scala 构建工具。主要应用程序代码如下所示。
The sample application is done in Scala. To compile the application, please download and install sbt, scala build tool (similar to maven). The main application code is presented below.
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
Build Script
spark-kafka 集成依赖于 spark、spark streaming 和 spark Kafka 集成 jar。创建一个新文件 build.sbt 并指定应用程序详细信息及其依赖项。sbt 将在编译和打包应用程序时下载必要的 jar。
The spark-kafka integration depends on the spark, spark streaming and spark Kafka integration jar. Create a new file build.sbt and specify the application details and its dependency. The sbt will download the necessary jar while compiling and packing the application.
name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
Compilation / Packaging
运行以下命令来编译和打包应用程序的 jar 文件。我们需要将 jar 文件提交到 spark 控制台中以运行应用程序。
Run the following command to compile and package the jar file of the application. We need to submit the jar file into the spark console to run the application.
sbt package
Submiting to Spark
启动 Kafka Producer CLI(在上一章中说明),创建一个名为 my-first-topic 的新主题并提供一些示例消息,如下所示。
Start Kafka Producer CLI (explained in the previous chapter), create a new topic called my-first-topic and provide some sample messages as shown below.
Another spark test message
运行以下命令将应用程序提交到 spark 控制台。
Run the following command to submit the application to spark console.
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
该应用程序的示例输出如下所示。
The sample output of this application is shown below.
spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..