Apache Kafka 简明教程
Apache Kafka - Integration With Spark
在本章中,我们将讨论如何将 Apache Kafka 与 Spark Streaming API 集成。
About Spark
Spark Streaming API 能够对实时数据流进行可扩展、高吞吐量、容错的流处理。可以从多个源(如 Kafka、Flume、Twitter 等等)获取数据,并且可以使用诸如 map、reduce、join 和 window 等高级函数之类的复杂算法对数据进行处理。最后,可以将处理过的数据输出到文件系统、数据库和实时仪表板。弹性分布式数据集 (RDD) 是 Spark 的基本数据结构。它是对象的可变分布式集合。RDD 中的每个数据集都分成逻辑分区,这些分区可以在集群的不同节点上计算。
Integration with Spark
Kafka 是 Spark 流处理的潜在消息和集成平台。Kafka 充当实时数据流的中央枢纽,并在 Spark Streaming 中使用复杂算法对这些数据流进行处理。一旦对数据进行了处理,Spark Streaming 便可以将结果发布到另一个 Kafka 主题中,或者将其存储在 HDFS、数据库或仪表板中。下图描绘了这个概念性的流程。
现在,让我们详细了解一下 Kafka-Spark API。
SparkConf API
它表示针对 Spark 应用程序的配置。用于以键值对形式设置各种 Spark 参数。
SparkConf 类有以下方法:
-
set(string key, string value) - 设置配置变量。
-
remove(string key) - 从配置中移除键。
-
setAppName(string name) - 为您的应用程序设置应用程序名称。
-
get(string key) - 获取键
StreamingContext API
这是 Spark 功能的主要入口点。SparkContext 表示与 Spark 集群的连接,并且可以用它在集群上创建 RDD、累加器和广播变量。签名定义如下所示。
public StreamingContext(String master, String appName, Duration batchDuration,
String sparkHome, scala.collection.Seq<String> jars,
scala.collection.Map<String,String> environment)
-
master - 要连接到的集群 URL(例如 mesos://host:port、spark://host:port、local[4])。
-
appName - 为您的作业取一个名称,以在集群 Web UI 上显示
-
batchDuration - 将流数据分成批次的时间间隔
public StreamingContext(SparkConf conf, Duration batchDuration)
通过为新的 SparkContext 提供必要的配置来创建一个 StreamingContext。
-
conf − Spark parameters
-
batchDuration - 将流数据分成批次的时间间隔
KafkaUtils API
KafkaUtils API 用于将 Kafka 集群连接到 Spark 流处理。此 API 有重要的 createStream 签名方法,其定义如下。
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
StreamingContext ssc, String zkQuorum, String groupId,
scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
上述方法用于创建一个输入流,从 Kafka 代理中提取消息。
-
ssc − StreamingContext object.
-
zkQuorum − Zookeeper quorum.
-
groupId - 此消费者的组 ID。
-
topics − 返回一个要消费的主题映射。
-
storageLevel − 用于存储接收到的对象的存储级别。
KafkaUtils API 具有另一个方法 createDirectStream,该方法用于创建输入流,直接从 Kafka 代理中提取消息,而不使用任何接收器。此流可以保证每个来自 Kafka 的消息都恰好包含在一次转换中。
示例应用程序以 Scala 编写。要编译应用程序,请下载并安装 sbt(类似于 maven)scala 构建工具。主要应用程序代码如下所示。
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
Build Script
spark-kafka 集成依赖于 spark、spark streaming 和 spark Kafka 集成 jar。创建一个新文件 build.sbt 并指定应用程序详细信息及其依赖项。sbt 将在编译和打包应用程序时下载必要的 jar。
name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
Submiting to Spark
启动 Kafka Producer CLI(在上一章中说明),创建一个名为 my-first-topic 的新主题并提供一些示例消息,如下所示。
Another spark test message
运行以下命令将应用程序提交到 spark 控制台。
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
该应用程序的示例输出如下所示。
spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..