Apache Kafka 简明教程

Apache Kafka - Quick Guide

Apache Kafka - Introduction

大数据中使用大量数据。关于数据,我们有两个主要挑战。第一个挑战是如何收集大量数据,第二个挑战是如何分析收集到的数据。为了应对这些挑战,你肯定需要一个消息传递系统。

Kafka 是专为分布式高吞吐量系统而设计的。Kafka 作为更传统的消息代理的替代品往往能很好地工作。与其他消息传递系统相比,Kafka 具有更好的吞吐量、内置分区、复制和固有的容错性,这使其非常适合大规模消息处理应用程序。

What is a Messaging System?

消息传递系统负责在应用程序之间传输数据,因此应用程序可以专注于数据,而不用担心如何共享数据。分布式消息传递基于可靠消息排队的概念。消息在客户端应用程序和消息传递系统之间异步排队。有两种消息传递模式可用 - 一种是点对点,另一种是发布订阅 (pub-sub) 消息传递系统。大多数消息传递模式遵循 pub-sub

Point to Point Messaging System

在点对点系统中,消息保存在队列中。一个或多个消费者可以消费队列中的消息,但特定消息只能被最多一个消费者消费。一旦消费者读取了队列中的消息,它就会从该队列中消失。此系统的典型示例是订单处理系统,其中每个订单将由一个订单处理程序处理,但多个订单处理程序也可以同时工作。下图描绘了结构。

point to point messaging system

Publish-Subscribe Messaging System

在发布订阅系统中,消息保存在主题中。与点对点系统不同,消费者可以订阅一个或多个主题并消费该主题中的所有消息。在发布订阅系统中,消息生产者称为发布者,消息消费者称为订阅者。一个现实生活中的例子是 Dish TV,它发布不同的频道,如体育、电影、音乐等,任何人都可以订阅他们自己的一组频道,并在他们订阅的频道可用时获取它们。

publish subscribe messaging system

What is Kafka?

Apache Kafka 是一个分布式发布订阅消息传递系统,也是一个可以处理大量数据的健壮队列,它使你可以将消息从一个端点传递到另一个端点。Kafka 适用于离线和在线消息消费。Kafka 消息保存在磁盘上并在集群内复制以防止数据丢失。Kafka 构建在 ZooKeeper 同步服务的基础之上。它与 Apache Storm 和 Spark 集成得很好,可用于实时流数据分析。

Benefits

以下是 Kafka 的一些优点 −

  1. Reliability - Kafka 是分布式的、分区的、复制的和容错的。

  2. Scalability - Kafka 消息传递系统无需停机即可轻松扩展。

  3. Durability - Kafka 使用分布式提交日志,这意味着消息会尽可能快地保存在磁盘上,因此它具有持久性。

  4. Performance - Kafka 对发布和订阅消息都具有很高的吞吐量。即使存储了大量 TB 的消息,它也能保持稳定的性能。

Kafka 非常快,并且保证零停机时间和零数据丢失。

Use Cases

Kafka 可以用于许多用例。其中一些如下所示 −

  1. Metrics - Kafka 通常用于运营监控数据。这涉及汇总分布式应用程序的统计信息以生成运营数据的集中馈送。

  2. Log Aggregation Solution - Kafka 可以跨整个组织使用,收集来自多个服务的日志,并以标准格式向多个消费者提供它们。

  3. Stream Processing - Storm 和 Spark Streaming 等流行框架从主题中读取数据,对其进行处理,并将处理后的数据写入新主题,供用户和应用程序使用。Kafka 的强大持久性在流处理的情况下也非常有用。

Need for Kafka

Apache Kafka 是一个用于处理所有实时数据馈送的统一平台。Kafka 支持低延迟的消息传递,并且在发生机器故障时可保证故障容错。它能够处理大量不同的消费者。Kafka 非常快,每秒可执行 200 万次写入。Kafka 将所有数据保存到磁盘,这意味着实际上所有写入都进入操作系统的页面缓存(RAM)。这使得将数据从页面缓存传输到网络套接字非常高效。

Apache Kafka - Fundamentals

在深入了解 Kafka 之前,你必须了解主题、代理、生产者和消费者等主要术语。以下示意图说明了主要术语,表格详细描述了示意图组件。

fundamentals

在上面的示意图中,一个主题被配置为三个分区。分区 1 具有两个偏移量因子 0 和 1。分区 2 具有四个偏移量因子 0、1、2 和 3。分区 3 具有一个偏移量因子 0。副本的 ID 与托管它的服务器的 ID 相同。

假设主题的复制因子被设置为 3,那么 Kafka 将为每个分区创建 3 个相同的副本并将它们放在群集内,以用于所有操作。为了平衡集群中的负载,每个代理会存储一个或多个分区。多个生产者和消费者可以同时发布和检索消息。

S.No

Components and Description

1

Topics 属于特定类别的消息流被称为主题。数据存储在主题中,主题被分成分区。对于每个主题,Kafka 最少保留一个分区。每个此类分区包含按不可变顺序排列的消息。分区被实现为一组大小相等的分段文件。

2

Partition 主题可能有许多分区,因此它可以处理任意数量的数据。

3

Partition offset 每条分区分段消息都有一个唯一的序列 ID,称为偏移量。

4

Replicas of partition 副本只不过是分区的备份。副本永远不会读写数据。它们用于防止数据丢失。

5

Brokers 代理是负责维护已公布数据的简单系统。每个代理对于每个主题可能拥有零个或多个分区。假设一个主题有 N 个分区,并且有 N 个代理,那么每个代理将具有一个分区。假设一个主题上有 N 个分区,代理数目超过 N(n + m),那么前 N 个代理将有一个分区,而下一个 M 个代理对于该特定主题没有任何分区。假设一个主题上有 N 个分区,代理数目少于 N(n-m),那么每个代理将拥有一个或多个分区在它们之间共享。由于代理之间负载分布不均,不建议采用此方案。

6

Kafka Cluster 拥有多个代理的 Kafka 被称为 Kafka 集群。Kafka 集群可以在没有停机的情况下进行扩展。这些集群用于管理消息数据的持久性并复制消息数据。

7

Producers 生产者是消息发布者,它向一个或多个 Kafka 主题发布消息。生产者向 Kafka 代理发送数据。每次生产者向代理发布消息时,代理会将消息追加到最后一个分段文件中。实际上,消息将被追加到一个分区。生产者还可以向他们自己选择的某个分区发送消息。

8

Consumers 消费者从代理读取数据。消费者订阅一个或多个主题,并通过从代理拉取数据来消费已发布的消息。

9

Leader 领导者是负责给定分区的所有读取和写入操作节点。每个分区有一个充当领导者的服务器。

10

Follower 遵循领导者指令的节点称为跟随者。如果领导者失败,一个跟随者将自动成为新的领导者。跟随者充当正常消费者,拉取消息并更新其自身的数据存储。

Apache Kafka - Cluster Architecture

查看以下示意图。它显示了 Kafka 的集群图。

cluster architecture

下表描述了上面示意图中显示的每个组件。

S.No

Components and Description

1

Broker Kafka 集群通常包含多个代理,以维护负载平衡。Kafka 代理是无状态的,因此它们使用 ZooKeeper 维护其集群状态。一个 Kafka 代理实例每秒可以处理数十万次读取和写入,并且每个代理可以处理 TB 级消息,而不会对性能产生影响。Kafka 代理领导者选举可以通过 ZooKeeper 完成。

2

ZooKeeper ZooKeeper 用于管理和协调 Kafka 代理。ZooKeeper 服务主要用于通知生产者和消费者 Kafka 系统中任何新的代理出现或 Kafka 系统中代理发生故障。根据 Zookeeper 接收的关于代理出现或故障的通知,生产者和消费者做出决策,并开始与其他代理协调其任务。

3

Producers 生产者将数据推送到代理。当启动新的代理时,所有生产者都搜索代理,并自动向该代理发送消息。Kafka 生产者不需要等到代理的确认,并尽可能快地发送消息以供代理处理。

4

Consumers 由于 Kafka 代理是无状态的,这意味着消费者必须使用分区偏移量维护已消耗的消息数量。如果消费者确认特定的消息偏移量,则意味着消费者已消耗所有先前消息。消费者向代理发出异步拉取请求,以便准备一个可以消费的字节缓冲区。消费者可以通过提供偏移量值很简单地回退或跳至分区中的任意位置。消费者偏移量值由 ZooKeeper 通知。

Apache Kafka - WorkFlow

目前,我们已讨论了 Kafka 的核心概念。现在让我们重点介绍一下 Kafka 的工作流。

Kafka 仅仅是由一个或多个分区分割而成的主题集合。Kafka 分区是有序的消息序列,其中每条消息都由其索引(称为偏移量)标识。Kafka 集群中的所有数据都是分区的离散并集。传入消息被写入分区的末尾,消息被消费者依次读取。通过将消息复制到不同的代理来提供持久性。

Kafka 采用快速、可靠、持久、容错和零停机方式提供发布-订阅和基于队列的消息系统。在这两种情况下,生产者只需将消息发送到主题,消费者即可根据需要选择任一类型的消息系统。让我们按照下一部分中的步骤来了解消费者如何选择其所选择的消息系统。

Workflow of Pub-Sub Messaging

以下是发布-订阅消息的逐步工作流 −

  1. 生产者定期向某个主题发送消息。

  2. Kafka 代理将所有消息存储在为该特定主题配置的分区中。它确保消息在分区中得到均衡共享。如果生产者发送两条消息,并且有两个分区,则 Kafka 会将一条消息存储在第一个分区中,第二条消息存储在第二个分区中。

  3. 消费者订阅特定主题。

  4. 一旦消费者订阅某个主题,Kafka 就会向消费者提供该主题的当前偏移量,并将偏移量保存在 Zookeeper 集群中。

  5. 消费者会定期(例如每 100 毫秒)向 Kafka 请求新消息。

  6. 一旦 Kafka 从生产者处接收到消息,它就会将这些消息转发给消费者。

  7. 消费者会接收到消息并对其进行处理。

  8. 一旦消息处理完毕,消费者就会向 Kafka 代理发送确认。

  9. 一旦 Kafka 接收到确认,它就会将偏移量更改为新值并在 Zookeeper 中更新。由于偏移量维护在 Zookeeper 中,因此即使在服务器中断期间,消费者也能正确读取下一条消息。

  10. 上述流程将重复执行,直到消费者停止请求。

  11. 消费者可以选择随时回退或跳至主题的所需偏移量,并读取所有后续消息。

Workflow of Queue Messaging / Consumer Group

在队列消息系统中,一群具有相同组 ID 的消费者会订阅主题,而不是单个消费者。简单来说,订阅具有相同组 ID 的主题的消费者被视为一个组,消息在它们之间共享。让我们查看此系统的实际工作流。

  1. 生产者定期向某个主题发送消息。

  2. 与早先情景类似,Kafka 将所有消息存储在那特定主题配置的分区中。

  3. 单一使用者订阅特定主题,假设主题 ID 为“主题-01”,组 ID 为“组-1”。

  4. 在有新的使用者订阅同一主题(主题-01)并具有与组-1 相同的组 ID 之前,Kafka 与使用者的互动方式与 Pub-Sub Messaging 相同。

  5. 一旦新使用者出现,Kafka 会切换其操作模式为共享模式并共享这两位使用者之间的资料。此共享将一直持续到使用者数量达到为那特定主题配置的分区数量。

  6. 一旦使用者数量超出分区数量,新使用者不会收到任何进一步的消息,直到任何一位现有使用者退订。此情景会出现的原因是,Kafka 中的每位使用者都会被分配至少一个分区,一旦所有分区都分配给现有使用者,新使用者将不得不等待。

  7. 此功能也被称为“使用者群体”。依同样的方式,Kafka 将以非常简单且有效的方式提供上述两个系统最优秀的特点。

Role of ZooKeeper

Apache Kafka 的一个关键依赖是 Apache Zookeeper,这是一项分布式配置与同步服务。Zookeeper 是 Kafka 代理与使用者之间的协调界面。Kafka 服务器通过 Zookeeper 丛集共享资讯。Kafka 会在 Zookeeper 中存储基本元资料,例如关于主题、代理、使用者偏移(队列阅读器)等資訊。

由于所有关键资讯都被存储在 Zookeeper 中,而且它通常会跨其整体复制该资料,因此 Kafka 代理/Zookeeper 的故障不会影响 Kafka 丛集的状态。一旦 Zookeeper 重新启动,Kafka 将还原状态。这为 Kafka 带来了零停机时间。在领导者发生故障时,Kafka 代理之间的领导者选举也是通过使用 Zookeeper 进行的。

要深入了解 Zookeeper,请参阅 zookeeper

在下一章节中,让我们继续深入了解如何在您的机器上安装 Java、ZooKeeper 和 Kafka。

Apache Kafka - Installation Steps

以下是在您的机器上安装 Java 的步骤。

Step 1 - Verifying Java Installation

希望您现在已经安装 Java,只需使用以下命令进行验证。

$ java -version

如果 Java 在您的设备上安装成功,您将看到所安装 Java 的版本。

Step 1.1 - Download JDK

如果没有下载 Java,请访问以下链接下载最新版本的 JDK,然后下载最新版本。

现在最新版本是 JDK 8u 60,文件是“jdk-8u60-linux-x64.tar.gz”。请在您的机器上下载此文件。

Step 1.2 - Extract Files

一般来说,下载的文件都存储在下载文件夹中,验证并使用以下命令提取 tar 设置。

$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz

Step 1.3 - Move to Opt Directory

为了让所有使用者都能使用 java,将提取的 java 内容移动到 usr/local/java/ 文件夹中。

$ su
password: (type password of root user)
$ mkdir /opt/jdk
$ mv jdk-1.8.0_60 /opt/jdk/

Step 1.4 - Set path

若要设置 path 及 JAVA_HOME 变量,请将以下命令添加到 ~/.bashrc 文件中。

export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin

现在,将所有变更应用到当前运行系统中。

$ source ~/.bashrc

Step 1.5 - Java Alternatives

使用以下命令更改 Java 备用项。

update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100

Step 1.6 − 现在使用步骤 1 中说明的验证命令 (java -version) 验证 java。

Step 2 - ZooKeeper Framework Installation

Step 2.1 - Download ZooKeeper

要将 ZooKeeper 框架安装到您的计算机上,请访问以下链接并下载 ZooKeeper 的最新版本。

截至目前,ZooKeeper 的最新版本是 3.4.6 (ZooKeeper-3.4.6.tar.gz)。

Step 2.2 - Extract tar file

使用以下命令解压 tar 文件

$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data

Step 2.3 - Create Configuration File

使用命令 vi "conf/zoo.cfg" 打开名为 conf/zoo.cfg 的配置文件,并将所有以下参数设置为起点。

$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

配置文件成功保存并返回到终端后,您就可以启动 zookeeper 服务器。

Step 2.4 - Start ZooKeeper Server

$ bin/zkServer.sh start

执行此命令后,您将得到以下所示的响应 −

$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED

Step 2.5 - Start CLI

$ bin/zkCli.sh

输入上述命令之后,您将连接到 zookeeper 服务器,并将得到以下响应。

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]

Step 2.6 - Stop Zookeeper Server

连接服务器并执行所有操作之后,您可以使用以下命令停止 zookeeper 服务器 −

$ bin/zkServer.sh stop

现在您已成功在自己的计算机上安装 Java 和 ZooKeeper。让我们看看安装 Apache Kafka 的步骤。

Step 3 - Apache Kafka Installation

让我们继续执行以下步骤以在您的计算机上安装 Kafka。

Step 3.1 - Download Kafka

要将 Kafka 服务器安装到计算机上,请点击以下链接 −

现在,最新版本,即 – kafka_2.11_0.9.0.0.tgz 将下载到您的计算机上。

Step 3.2 - Extract the tar file

使用以下命令解压 tar 文件 −

$ cd opt/
$ tar -zxf kafka_2.11.0.9.0.0 tar.gz
$ cd kafka_2.11.0.9.0.0

现在您已在自己的计算机上下载了 Kafka 的最新版本。

Step 3.3 - Start Server

您可以使用以下命令启动服务器 −

$ bin/kafka-server-start.sh config/server.properties

服务器启动后,您将在屏幕上看到以下响应 −

$ bin/kafka-server-start.sh config/server.properties
[2016-01-02 15:37:30,410] INFO KafkaConfig values:
request.timeout.ms = 30000
log.roll.hours = 168
inter.broker.protocol.version = 0.9.0.X
log.preallocate = false
security.inter.broker.protocol = PLAINTEXT
…………………………………………….
…………………………………………….

Step 4 - Stop the Server

执行所有操作之后,您可以使用以下命令停止服务器 −

$ bin/kafka-server-stop.sh config/server.properties

我们已经讨论了 Kafka 安装,在下一章中,我们可以了解如何对 Kafka 执行基本操作。

Apache Kafka - Basic Operations

首先让我们开始实现单节点单代理配置,然后我们将设置迁移到单节点多代理配置。

希望你现在已经安装了 Java、ZooKeeper 和 Kafka。在转到 Kafka 集群设置之前,首先需要启动 ZooKeeper,因为 Kafka 集群使用 ZooKeeper。

Start ZooKeeper

打开一个新的终端并输入以下命令 -

bin/zookeeper-server-start.sh config/zookeeper.properties

要启动 Kafka 代理,请键入以下命令 -

bin/kafka-server-start.sh config/server.properties

启动 Kafka 代理后,在 ZooKeeper 终端上输入命令 jps,您将看到以下响应 -

821 QuorumPeerMain
928 Kafka
931 Jps

现在您可以在终端上看到两个正在运行的守护程序,其中 QuorumPeerMain 是 ZooKeeper 守护程序,另一个是 Kafka 守护程序。

Single Node-Single Broker Configuration

在此配置中,您有一个 ZooKeeper 和经纪人 ID 实例。以下是如何配置它的步骤 -

Creating a Kafka Topic - Kafka 提供了一个名为 kafka-topics.sh 的命令行实用程序,用于在服务器上创建主题。打开一个新的终端并输入以下示例。

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic Hello-Kafka

我们刚刚创建了一个名为 Hello-Kafka 的主题,其中只有一个分区和一个副本因子。上述创建的输出将类似于以下输出 -

Output - 创建了主题 Hello-Kafka

在创建主题后,您可以在 Kafka 经纪人终端窗口中获取通知,并在 config/server.properties 文件中指定在“/tmp/kafka-logs/”中创建的主题的日志。

List of Topics

要获取 Kafka 服务器中的主题列表,可以使用以下命令 -

Syntax

bin/kafka-topics.sh --list --zookeeper localhost:2181

Output

Hello-Kafka

由于我们已创建了一个主题,它将仅列出 Hello-Kafka。假设,如果你创建多个主题,你会在输出中获得主题名称。

Start Producer to Send Messages

Syntax

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

从上述语法中,生产者命令行客户端需要两个主要参数 −

Broker-list − 我们希望向其发送消息的代理列表。在此情况下,我们只有一个代理。Config/server.properties 文件包含代理端口 ID,因为我们知道我们的代理在端口 9092 上侦听,因此你可以直接指定它。

主题名称 − 以下是主题名称的示例。

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

生产者将等待来自标准输入的输入,并将其发布到 Kafka 集群。默认情况下,每行都将发布为一条新消息,然后在 config/producer.properties 文件中指定默认生产者属性。现在,你可以在终端中键入几行消息,如下所示。

Output

$ bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Hello-Kafka[2016-01-16 13:50:45,931]
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

Start Consumer to Receive Messages

与生产者类似,默认消费者属性在 config/consumer.proper-ties 文件中指定。打开一个新终端,并键入以下语法以使用消息。

Syntax

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name
--from-beginning

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka
--from-beginning

Output

Hello
My first message
My second message

最后,你可以从生产者的终端输入消息,并看到它们出现在消费者的终端中。截至目前,你对具有单个代理的单节点集群有很好的理解。现在让我们转向多代理配置。

Single Node-Multiple Brokers Configuration

在继续进行多代理集群设置之前,首先启动 ZooKeeper 服务器。

Create Multiple Kafka Brokers − 我们已经在 con-fig/server.properties 中拥有一个 Kafka 代理实例。现在我们需要多个代理实例,因此将现有 server.prop-erties 文件复制到两个新配置文件中,并将其重命名为 server-one.properties 和 server-two.prop-erties。然后编辑这两个新文件,并分配以下更改 −

config/server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config/server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

Start Multiple Brokers − 在三台服务器上进行所有更改后,打开三个新终端来逐个启动每个代理。

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

现在,我们有三个不同的代理在机器上运行。尝试自己通过在 ZooKeeper 终端上键入 jps 来检查所有守护程序,然后你将看到响应。

Creating a Topic

让我们为此主题分配 3 的复制因子值,因为我们有三个不同的代理在运行。如果你有两个代理,那么指定的副本值将为 2。

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic Multibrokerapplication

Output

created topic “Multibrokerapplication”

Describe 命令用于检查哪个代理正在侦听当前创建的主题,如下所示 −

bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation

Output

bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1
ReplicationFactor:3 Configs:

Topic:Multibrokerapplication Partition:0 Leader:0
Replicas:0,2,1 Isr:0,2,1

从上述输出中,我们可以得出结论,第一行概述了所有分区,显示了主题名称、分区数量以及我们已经选择的复制因子。在第二行中,每个节点将成为随机选择的分区部分的领导者。

在我们的案例中,我们看到我们的第一个代理(代理 ID 为 0)是领导者。然后 Replicas:0,2,1 表示所有代理最终会复制该主题。Isr 是同步副本的集合。那么,这是当前处于正常状态并且被领导者赶上的副本的子集。

Start Producer to Send Messages

此过程与单代理设置中的过程相同。

Example

bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Multibrokerapplication

Output

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

Start Consumer to Receive Messages

此过程与在单代理设置中显示的过程相同。

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion --from-beginning

Output

bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

Basic Topic Operations

在本节中,我们将讨论各个基本主题操作。

Modifying a Topic

正如你已经了解如何在 Kafka 集群中创建主题。现在让我们使用以下命令修改已创建的主题

Syntax

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name
--parti-tions count

Example

We have already created a topic “Hello-Kafka” with single partition count and one replica factor.
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181
--alter --topic Hello-kafka --parti-tions 2

Output

WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

Deleting a Topic

要删除主题,可以使用以下语法。

Syntax

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

Example

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

Output

> Topic Hello-kafka marked for deletion

Note −*This will have no impact if *delete.topic.enable 未设置为 true

Apache Kafka - Simple Producer Example

让我们使用 Java 客户端创建用于发布和消费消息的应用程序。Kafka 制作者客户端由以下 API 组成。

KafkaProducer API

让我们在本节中了解最重要的 Kafka 制作者 API 的集合。KafkaProducer API 的主要部分是 KafkaProducer 类。KafkaProducer 类提供了一个选项,可以使用以下方法在其构造函数中连接 Kafka 代理。

  1. KafkaProducer 类提供 send 方法,以异步方式向主题发送消息。send() 的签名如下:

producer.send(new ProducerRecord<byte[],byte[]>(topic,
partition, key1, value1) , callback);
  1. ProducerRecord − 制作者管理着等待发送的记录缓冲区。

  2. Callback − 在记录被服务器确认时执行的用户提供的回调(null 表示没有回调)。

  3. KafkaProducer 类提供一个 flush 方法以确保所有先前发送的消息都已实际完成。flush 方法的语法如下:

public void flush()
  1. KafkaProducer 类提供 partitionFor 方法,该方法有助于获取给定主题的分区元数据。这可用于自定义分区。此方法的签名如下:

public Map metrics()

它返回 制作者维护的内部度量指标映射。

  1. public void close() − KafkaProducer 类提供 close 方法,它会阻塞直到所有先前发送的请求都完成。

Producer API

制作者 API 的核心部分是 制作者 类。制作者 类提供了一个选项,可以通过以下方法在其构造函数中连接 Kafka 代理。

The Producer Class

制作者 类提供 send 方法,以使用以下签名将 send 消息发送到单个或多个主题。

public void send(KeyedMessaget<k,v> message)
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

有两种类型的制作者 - SyncAsync

相同的 API 配置也适用于同步制作者。它们之间的区别在于,同步制作者直接发送消息,但在后台发送消息。当您需要更高的吞吐量时,首选异步制作者。在 0.8 等早期版本中,异步制作者没有用于注册错误处理程序的 send() 回调。它仅在当前的 0.9 版中可用。

public void close()

制作者 类提供 close 方法以关闭制作者池与所有 Kafka 代理的连接。

Configuration Settings

制作者 API 的主要配置设置列在以下表格中,以便更好地理解 -

S.No

Configuration Settings and Description

1

client.id identifies producer application

2

producer.type either sync or async

3

acks acks 配置控制制作者请求在何种条件下被认为是完整的。

4

retries 如果制作者请求失败,则使用特定的值自动重试。

5

bootstrap.servers bootstrapping list of brokers.

6

linger.ms 如果您想减少请求的数量,您可以将 linger.ms 设置为大于某些值。

7

key.serializer 序列化接口的密钥。

8

value.serializer 序列化接口的值。

9

batch.size Buffer size.

10

buffer.memory 控制可用于缓冲区的生产者的总内存量。

ProducerRecord API

ProducerRecord 是一对键/值,发送到 Kafka 群集。ProducerRecord 类构造函数使用以下签名,根据分区、键和值对创建记录。

public ProducerRecord (string topic, int partition, k key, v value)
  1. Topic −用户的自定义主题名称,它将追加到记录中。

  2. Partition − partition count

  3. Key −将包含在记录中的密钥。

  4. Value − Record contents

public ProducerRecord (string topic, k key, v value)

ProducerRecord 类构造函数用于创建具有键、值对但不具有分区的记录。

  1. Topic −创建分配记录的主题。

  2. Key −记录的密钥。

  3. Value − record contents.

public ProducerRecord (string topic, v value)

ProducerRecord 类创建一个没有分区和密钥的记录。

  1. Topic −创建一个主题。

  2. Value − record contents.

ProducerRecord 类方法在以下表格中列出:

S.No

Class Methods and Description

1

public string topic() 主题将追加到记录。

2

public K key() 将包含在记录中的密钥。如果没有此密钥,则在此处返回空值。

3

public V value() Record contents.

4

partition() 记录分区计数

SimpleProducer application

在创建应用程序之前,首先启动 ZooKeeper 和 Kafka 代理,然后使用 create topic 命令在 Kafka 代理中创建自己的主题。然后创建一个名为 SimpleProducer.java 的 Java 类,并输入以下代码。

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {

   public static void main(String[] args) throws Exception{

      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }

      //Assign topicName to string variable
      String topicName = args[0].toString();

      // create instance for properties to access producer configs
      Properties props = new Properties();

      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");

      //Set acknowledgements for producer requests.
      props.put("acks", “all");

      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);

      //Specify buffer size in config
      props.put("batch.size", 16384);

      //Reduce the no of requests less than 0
      props.put("linger.ms", 1);

      //The buffer.memory controls the total amount of memory available to the producer for buffering.
      props.put("buffer.memory", 33554432);

      props.put("key.serializer",
         "org.apache.kafka.common.serialization.StringSerializer");

      props.put("value.serializer",
         "org.apache.kafka.common.serialization.StringSerializer");

      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);

      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName,
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

Compilation −可以使用以下命令编译应用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution −可以使用以下命令执行应用程序。

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

Output

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

Simple Consumer Example

截至目前,我们已经创建了一个生产者来向 Kafka 群集发送消息。现在让我们创建一个消费者,以便从 Kafka 群集中获取消息。KafkaConsumer API 用于获取 Kafka 群集中的消息。下面定义了 KafkaConsumer 类构造函数。

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs − 返回消费者配置映射。

KafkaConsumer 类有下面表格中列出的几个重要的方法。

S.No

Method and Description

1

public java.util.Set&lt;TopicPar-tition&gt; assignment() 获取消费者当前分配的分区集合。

2

public string subscription() 订阅给定主题列表以动态分配分区。

3

public void sub-scribe(java.util.List&lt;java.lang.String&gt; topics, ConsumerRe-balanceListener listener) 订阅给定主题列表以动态分配分区。

4

public void unsubscribe() 从给定分区列表中取消订阅主题。

5

public void sub-scribe(java.util.List&lt;java.lang.String&gt; topics) 订阅给定主题列表以动态分配分区。如果给定的主题列表为空,则将其视为 unsubscribe() 相同。

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener) 参数模式指格式为正则表达式的订阅模式,并且监听器参数从订阅模式中获取通知。

7

public void as-sign(java.util.List&lt;TopicParti-tion&gt; partitions) 手动将分区列表分配给客户。

8

poll() 抓取使用某个订阅/分配 API 指定的主题或分区的数据。如果在轮询数据之前未订阅主题,这将返回错误。

9

public void commitSync() 提交在上次 poll() 中返回的所有订阅的主题和分区列表上的偏移量。相同操作应用于 commitAsyn()。

10

public void seek(TopicPartition partition, long offset) 抓取消费者将在下一个 poll() 方法上使用的当前偏移量值。

11

public void resume() 恢复已暂停的分区。

12

public void wakeup() 唤醒消费者。

ConsumerRecord API

ConsumerRecord API 用于从 Kafka 集群接收记录。此 API 包含一个主题名称、分区号(记录从中接收)和指向 Kafka 分区中记录的偏移量。ConsumerRecord 类用于使用特定的主题名称、分区计数和 <key, value> 对创建消费者记录。它具有以下特征。

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  1. Topic − 从 Kafka 集群接收的消费者记录的主题名称。

  2. Partition − 主题的分区。

  3. Key − 记录的键,如果不存在键,将返回 null。

  4. Value − Record contents.

ConsumerRecords API

ConsumerRecords API 充当 ConsumerRecord 的容器。此 API 用于为特定主题中的每个分区保留 ConsumerRecord 列表。其构造函数定义如下。

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  1. TopicPartition − 返回特定主题的分区映射。

  2. Records − 返回 ConsumerRecord 列表。

ConsumerRecords 类定义了以下方法。

S.No

Methods and Description

1

public int count() 所有主题的记录数量。

2

public Set partitions() 此记录集中有数据的分区集合(如果没有数据返回,则集合为空)。

3

public Iterator iterator() 使用迭代器可以在集合中循环,获取或删除元素。

4

public List records() 获取给定分区记录的列表。

Configuration Settings

下面列出消费者客户端 API 主要配置设置的配置设置 −

S.No

Settings and Description

1

bootstrap.servers Bootstrapping list of brokers.

2

group.id 将单独的消费者分配到组中。

3

enable.auto.commit 如果值为 true 则为偏移量启用自动提交,否则不进行提交。

4

auto.commit.interval.ms 返回更新的已消费偏移量写入到 ZooKeeper 的频率。

5

session.timeout.ms 指示 Kafka 将等待 ZooKeeper 的时间(以毫秒为单位)以响应请求(读取或写入),直到放弃并继续消费消息。

SimpleConsumer Application

此处的生产者应用程序步骤保持不变。首先,启动 ZooKeeper 和 Kafka 代理。然后使用名为 SimpleConsumer.java 的 java 类创建一个 SimpleConsumer 应用程序,并键入以下代码。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();

      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",
         "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer",
         "org.apache.kafka.common.serialization.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);

      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))

      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;

      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)

         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n",
            record.offset(), record.key(), record.value());
      }
   }
}

Compilation −可以使用以下命令编译应用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

*执行 − *可以使用以下命令执行应用程序

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

Input − 打开生产者 CLI 并向主题发送一些消息。您可以将 Smple 输入作为“Hello Consumer”。

Output − 以下是输出。

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer

Apache Kafka - Consumer Group Example

消费者组是从 Kafka 主题进行多线程或多机器消费。

Consumer Group

  1. 消费者可以通过使用相同的组 ID 来加入组。

  2. 组的最大并行度是,组中的消费者数量 ← 分区数量。

  3. Kafka 将主题的分区分配给组中的消费者,以便该组中的一个消费者切实消耗每个分区。

  4. Kafka 保证消息仅由组中的一个消费者读取。

  5. 消费者可以看到消息与其存储在日志中的顺序。

Re-balancing of a Consumer

添加更多进程/线程会导致 Kafka 重新平衡。如果任何消费者或代理未发送心跳至 ZooKeeper,则可通过 Kafka 集群重新配置它。在此重新平衡期间,Kafka 将可用的分区分配到可用的线程,可能会将某个分区移动到另一个进程。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }

      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",
         "org.apache.kafka.common.serialization.ByteArraySerializer");
      props.put("value.deserializer",
         "org.apache.kafka.common.serialization.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " + topic);
      int i = 0;

      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n",
               record.offset(), record.key(), record.value());
      }
   }
}

Compilation

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

Execution

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group

此处我们创建名为 my-group 的示例组名并配有两个消费者。同样,你可以在该组中创建你的组并设置消费者的数量。

Input

打开生产者 CLI 并发送一些消息,例如 −

Test consumer group 01
Test consumer group 02

Output of the First Process

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

Output of the Second Process

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

现在,希望使用 Java 客户端演示理解 SimpleConsumer 和 ConsumeGroup。现在,你了解如何使用 Java 客户端发送和接收消息。让我们在下一章继续 Kafka 集成(大数据技术)。

Apache Kafka - Integration With Storm

在本章中,我们将学习如何集 Kafka 与 Apache Storm。

About Storm

Storm 最初由 Nathan Marz 和 BackType 团队创建。在很短的时间内,Apache Storm 已成为分布式实时处理系统的标准,允许你处理海量数据。Storm 非常快,基准测试证明每个节点每秒可处理超过一百万个元组。Apache Storm 连续运行,从配置的源(注水口)消耗数据,并将数据传递到处理管线(螺栓)。结合注水口和螺栓形成拓扑。

Integration with Storm

Kafka 和 Storm 会自然地相互补充,而且它们功能强大的合作可为快速移动的大数据启用实时流分析。Kafka 和 Storm 集成更便于开发人员从 Storm 拓扑中提取和发布数据流。

Conceptual flow

注水口是流的来源。例如,注水口可能会从 Kafka 主题读取元组并以流形式发出它们。螺栓会消耗输入流、处理并可能发出新的流。螺栓的操作从运行函数、过滤元组到执行流聚合、流联接、与数据库通信等可谓无所不包。Storm 拓扑中的每个节点都会并行执行。拓扑会在终止前无限期运行。Storm 会自动重新分配任何失败的任务。此外,Storm 能确保即使机器宕机且消息丢失也不会造成数据丢失。

让我们详细了解 Kafka-Storm 集成 API。有三个主要类可以将 Kafka 集成到 Storm 中。它们如下所示 −

BrokerHosts - ZkHosts & StaticHosts

BrokerHosts 是一个接口,ZkHosts 和 StaticHosts 是其两个主要实现。ZkHosts 通过维护 ZooKeeper 中的详细信息来动态跟踪 Kafka 代理,而 StaticHosts 则用于手动/静态设置 Kafka 代理及其详细信息。ZkHosts 是访问 Kafka 代理的简单快捷方式。

ZkHosts 的签名如下 −

public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)

其中,brokerZkStr 是 ZooKeeper 主机,brokerZkPath 是 ZooKeeper 路径,用于维护 Kafka 代理详细信息。

KafkaConfig API

此 API 用于定义 Kafka 集群的配置设置。Kafka 配置的签名定义如下

public KafkaConfig(BrokerHosts hosts, string topic)

SpoutConfig API

Spoutconfig 是 KafkaConfig 的扩展,支持附加的 ZooKeeper 信息。

public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
  1. Hosts − BrokerHosts 可以是 BrokerHosts 接口的任何实现

  2. Topic − topic name.

  3. zkRoot − ZooKeeper 根路径。

  4. id − 分流将它消耗的偏移量状态存储在 Zookeeper 中。id 应唯一标识你的分流。

SchemeAsMultiScheme

SchemeAsMultiScheme 是一个接口,它规定从 Kafka 消费的 ByteBuffer 转换为 storm 元组的方式。它源自 MultiScheme 并接受 Scheme 类的实现。Scheme 类的实现有很多,其中一个实现是 StringScheme,它将字节解析为简单的字符串。它还控制着输出字段的命名。签名定义如下所示。

public SchemeAsMultiScheme(Scheme scheme)
  1. Scheme − 从 kafka 消费的字节缓冲区。

KafkaSpout API

KafkaSpout 是我们的流嘴实现,它将与 Storm 集成。它从 kafka 主题提取消息,并将其作为元组发送到 Storm 生态系统中。KafkaSpout 从 SpoutConfig 获取其配置详细信息。

下面是创建简单 Kafka 流嘴的示例代码。

// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);

//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts,
   topicName, "/" + topicName UUID.randomUUID().toString());

//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

Bolt Creation

Bolt 是一个将元组作为输入,处理元组并生成新元组作为输出的组件。Bolt 将实现 IRichBolt 接口。在此程序中,Bolt 类 WordSplitter-Bolt 和 WordCounterBolt 用于执行操作。

IRichBolt 接口具有以下方法:

  1. Prepare − 为 Bolt 提供执行环境。执行器将运行此方法来初始化流嘴。

  2. Execute − 处理单个输入元组。

  3. Cleanup − 在 bolt 即将关闭时调用。

  4. declareOutputFields − 声明元组的输出模式。

让我们创建 SplitBolt.java,它实现将句子拆分为单词的逻辑和 CountBolt.java,它实现分离唯一单词并统计其出现的逻辑。

SplitBolt.java

import java.util.Map;

import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class SplitBolt implements IRichBolt {
   private OutputCollector collector;

   @Override
   public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple input) {
      String sentence = input.getString(0);
      String[] words = sentence.split(" ");

      for(String word: words) {
         word = word.trim();

         if(!word.isEmpty()) {
            word = word.toLowerCase();
            collector.emit(new Values(word));
         }

      }

      collector.ack(input);
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }

   @Override
   public void cleanup() {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }

}

CountBolt.java

import java.util.Map;
import java.util.HashMap;

import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class CountBolt implements IRichBolt{
   Map<String, Integer> counters;
   private OutputCollector collector;

   @Override
   public void prepare(Map stormConf, TopologyContext context,
   OutputCollector collector) {
      this.counters = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple input) {
      String str = input.getString(0);

      if(!counters.containsKey(str)){
         counters.put(str, 1);
      }else {
         Integer c = counters.get(str) +1;
         counters.put(str, c);
      }

      collector.ack(input);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counters.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {

   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Submitting to Topology

Storm 拓扑基本上是一个 Thrift 结构。TopologyBuilder 类提供了简单易用的方法来创建复杂的拓扑。TopologyBuilder 类具有设置分流 (setSpout) 和设置 Bolt (setBolt) 的方法。最后,TopologyBuilder 具有 createTopology 来创建拓扑。shuffleGrouping 和 fieldsGrouping 方法有助于设置流嘴和 Bolt 的流分组。

Local Cluster − 出于开发目的,我们可以使用 LocalCluster 对象创建一个本地集群,然后使用 LocalCluster 类的方法 submitTopology 提交拓扑。

KafkaStormSample.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;

public class KafkaStormSample {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
      config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
      String zkConnString = "localhost:2181";
      String topic = "my-first-topic";
      BrokerHosts hosts = new ZkHosts(zkConnString);

      SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,
         UUID.randomUUID().toString());
      kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.forceFromStart = true;
      kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
      builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
      builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());

      Thread.sleep(10000);

      cluster.shutdown();
   }
}

在进行编译之前,Kakfa-Storm 集成需要 curator ZooKeeper client java 库。Curator 版本 2.9.1 支持 Apache Storm 版本 0.9.5(本教程中使用)。下载下面指定的 jar 文件并将其放在 java 类路径中。

  1. curator-client-2.9.1.jar

  2. curator-framework-2.9.1.jar

包括依赖文件后,使用以下命令编译程序,

javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java

Execution

启动 Kafka Producer CLI(在上章中有说明),创建一个名为 my-first-topic 的新主题,并提供一些示例消息,如下所示:

hello
kafka
storm
spark
test message
another test message

现在使用以下命令执行应用程序:

java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*”:. KafkaStormSample

此应用程序的示例输出如下所示:

storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2

Apache Kafka - Integration With Spark

在本章中,我们将讨论如何将 Apache Kafka 与 Spark Streaming API 集成。

About Spark

Spark Streaming API 能够对实时数据流进行可扩展、高吞吐量、容错的流处理。可以从多个源(如 Kafka、Flume、Twitter 等等)获取数据,并且可以使用诸如 map、reduce、join 和 window 等高级函数之类的复杂算法对数据进行处理。最后,可以将处理过的数据输出到文件系统、数据库和实时仪表板。弹性分布式数据集 (RDD) 是 Spark 的基本数据结构。它是对象的可变分布式集合。RDD 中的每个数据集都分成逻辑分区,这些分区可以在集群的不同节点上计算。

Integration with Spark

Kafka 是 Spark 流处理的潜在消息和集成平台。Kafka 充当实时数据流的中央枢纽,并在 Spark Streaming 中使用复杂算法对这些数据流进行处理。一旦对数据进行了处理,Spark Streaming 便可以将结果发布到另一个 Kafka 主题中,或者将其存储在 HDFS、数据库或仪表板中。下图描绘了这个概念性的流程。

integration spark

现在,让我们详细了解一下 Kafka-Spark API。

SparkConf API

它表示针对 Spark 应用程序的配置。用于以键值对形式设置各种 Spark 参数。

SparkConf 类有以下方法:

  1. set(string key, string value) - 设置配置变量。

  2. remove(string key) - 从配置中移除键。

  3. setAppName(string name) - 为您的应用程序设置应用程序名称。

  4. get(string key) - 获取键

StreamingContext API

这是 Spark 功能的主要入口点。SparkContext 表示与 Spark 集群的连接,并且可以用它在集群上创建 RDD、累加器和广播变量。签名定义如下所示。

public StreamingContext(String master, String appName, Duration batchDuration,
   String sparkHome, scala.collection.Seq<String> jars,
   scala.collection.Map<String,String> environment)
  1. master - 要连接到的集群 URL(例如 mesos://host:port、spark://host:port、local[4])。

  2. appName - 为您的作业取一个名称,以在集群 Web UI 上显示

  3. batchDuration - 将流数据分成批次的时间间隔

public StreamingContext(SparkConf conf, Duration batchDuration)

通过为新的 SparkContext 提供必要的配置来创建一个 StreamingContext。

  1. conf − Spark parameters

  2. batchDuration - 将流数据分成批次的时间间隔

KafkaUtils API

KafkaUtils API 用于将 Kafka 集群连接到 Spark 流处理。此 API 有重要的 createStream 签名方法,其定义如下。

public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
   StreamingContext ssc, String zkQuorum, String groupId,
   scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)

上述方法用于创建一个输入流,从 Kafka 代理中提取消息。

  1. ssc − StreamingContext object.

  2. zkQuorum − Zookeeper quorum.

  3. groupId - 此消费者的组 ID。

  4. topics − 返回一个要消费的主题映射。

  5. storageLevel − 用于存储接收到的对象的存储级别。

KafkaUtils API 具有另一个方法 createDirectStream,该方法用于创建输入流,直接从 Kafka 代理中提取消息,而不使用任何接收器。此流可以保证每个来自 Kafka 的消息都恰好包含在一次转换中。

示例应用程序以 Scala 编写。要编译应用程序,请下载并安装 sbt(类似于 maven)scala 构建工具。主要应用程序代码如下所示。

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
   def main(args: Array[String]) {
      if (args.length < 4) {
         System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
         System.exit(1)
      }

      val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))
      ssc.checkpoint("checkpoint")

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L))
         .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
      wordCounts.print()

      ssc.start()
      ssc.awaitTermination()
   }
}

Build Script

spark-kafka 集成依赖于 spark、spark streaming 和 spark Kafka 集成 jar。创建一个新文件 build.sbt 并指定应用程序详细信息及其依赖项。sbt 将在编译和打包应用程序时下载必要的 jar。

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

Compilation / Packaging

运行以下命令来编译和打包应用程序的 jar 文件。我们需要将 jar 文件提交到 spark 控制台中以运行应用程序。

sbt package

Submiting to Spark

启动 Kafka Producer CLI(在上一章中说明),创建一个名为 my-first-topic 的新主题并提供一些示例消息,如下所示。

Another spark test message

运行以下命令将应用程序提交到 spark 控制台。

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

该应用程序的示例输出如下所示。

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..

Real Time Application(Twitter)

我们分析一下一个获取最新 Twitter 提要及其主题标签的实时应用程序。之前,我们已经看到过 Storm、Spark 与 Kafka 的集成。在这两种情况下,我们都创建了一个 Kafka 生产者(使用 CLI)以向 Kafka 生态系统发送消息。然后,Storm 和 Spark 集成分别使用 Kafka 消费者读取消息并将其注入 Storm 和 Spark 生态系统。所以,我们在实际中需要创建 Kafka 生产者,该生产者应:

  1. 使用“Twitter 流式传输 API”读取 Twitter 提要,

  2. Process the feeds,

  3. Extract the HashTags and

  4. Send it to Kafka.

一旦 Kafka 接收了 HashTag,Storm/Spark 集成便会收到信息并将其发送至 Storm/Spark 生态系统。

Twitter Streaming API

可以用任何编程语言访问“Twitter 流式传输 API”。“twitter4j”是一个开源的非官方 Java 库,它提供一个基于 Java 的模块来方便地访问“Twitter 流式传输 API”。“twitter4j”提供基于侦听器的框架来访问推文。要访问“Twitter 流式传输 API”,我们需要注册 Twitter 开发者帐户并获取以下 OAuth 认证详情。

  1. Customerkey

  2. CustomerSecret

  3. AccessToken

  4. AccessTookenSecret

创建开发人员帐户后,下载“twitter4j”JAR 文件并将其放入 Java 类路径中。

完整的 Twitter Kafka 生产者编码(KafkaTwitterProducer.java)如下:

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.*;
import twitter4j.conf.*;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaTwitterProducer {
   public static void main(String[] args) throws Exception {
      LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);

      if(args.length < 5){
         System.out.println(
            "Usage: KafkaTwitterProducer <twitter-consumer-key>
            <twitter-consumer-secret> <twitter-access-token>
            <twitter-access-token-secret>
            <topic-name> <twitter-search-keywords>");
         return;
      }

      String consumerKey = args[0].toString();
      String consumerSecret = args[1].toString();
      String accessToken = args[2].toString();
      String accessTokenSecret = args[3].toString();
      String topicName = args[4].toString();
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);

      ConfigurationBuilder cb = new ConfigurationBuilder();
      cb.setDebugEnabled(true)
         .setOAuthConsumerKey(consumerKey)
         .setOAuthConsumerSecret(consumerSecret)
         .setOAuthAccessToken(accessToken)
         .setOAuthAccessTokenSecret(accessTokenSecret);

      TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
      StatusListener listener = new StatusListener() {

         @Override
         public void onStatus(Status status) {
            queue.offer(status);

            // System.out.println("@" + status.getUser().getScreenName()
               + " - " + status.getText());
            // System.out.println("@" + status.getUser().getScreen-Name());

            /*for(URLEntity urle : status.getURLEntities()) {
               System.out.println(urle.getDisplayURL());
            }*/

            /*for(HashtagEntity hashtage : status.getHashtagEntities()) {
               System.out.println(hashtage.getText());
            }*/
         }

         @Override
         public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
            // System.out.println("Got a status deletion notice id:"
               + statusDeletionNotice.getStatusId());
         }

         @Override
         public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
            // System.out.println("Got track limitation notice:" +
               num-berOfLimitedStatuses);
         }

         @Override
         public void onScrubGeo(long userId, long upToStatusId) {
            // System.out.println("Got scrub_geo event userId:" + userId +
            "upToStatusId:" + upToStatusId);
         }

         @Override
         public void onStallWarning(StallWarning warning) {
            // System.out.println("Got stall warning:" + warning);
         }

         @Override
         public void onException(Exception ex) {
            ex.printStackTrace();
         }
      };
      twitterStream.addListener(listener);

      FilterQuery query = new FilterQuery().track(keyWords);
      twitterStream.filter(query);

      Thread.sleep(5000);

      //Add Kafka producer config settings
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);

      props.put("key.serializer",
         "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer",
         "org.apache.kafka.common.serialization.StringSerializer");

      Producer<String, String> producer = new KafkaProducer<String, String>(props);
      int i = 0;
      int j = 0;

      while(i < 10) {
         Status ret = queue.poll();

         if (ret == null) {
            Thread.sleep(100);
            i++;
         }else {
            for(HashtagEntity hashtage : ret.getHashtagEntities()) {
               System.out.println("Hashtag: " + hashtage.getText());
               producer.send(new ProducerRecord<String, String>(
                  top-icName, Integer.toString(j++), hashtage.getText()));
            }
         }
      }
      producer.close();
      Thread.sleep(5000);
      twitterStream.shutdown();
   }
}

Compilation

使用以下命令编译应用程序:

javac -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:. KafkaTwitterProducer.java

Execution

打开两个控制台。在一个控制台中运行上述已编译应用程序,如下所示。

java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food

在另一个窗口中运行上一章中解释的任何 Spark/Storm 应用程序。主要的注意事项是,在两种情况下使用的主题应相同。此处,我们将“my-first-topic”用作主题名称。

Output

该应用程序的输出将取决于关键字和 Twitter 的当前提要。样本输出如下所示(Storm 集成)。

. . .
food : 1
foodie : 2
burger : 1
. . .

Apache Kafka - Tools

Kafka 工具打包在 “org.apache.kafka.tools.* 下”。工具分为系统工具和复制工具。

System Tools

系统工具可以使用运行类脚本从命令行运行。语法如下 −

bin/kafka-run-class.sh package.class - - options

下面提到了一些系统工具 −

  1. Kafka Migration Tool − 此工具用于将代理从一个版本迁移到另一个版本。

  2. Mirror Maker − 此工具用于为一个 Kafka 集群提供镜像另一个 Kafka 集群的功能。

  3. Consumer Offset Checker − 此工具显示针对指定的一组主题和消费者组的消费者组、主题、分区、偏移量、logSize、所有者。

Replication Tool

Kafka 复制是一种高级设计工具。添加复制工具的目的是增强持久性和提高可用性。下面提到了一些复制工具 −

  1. Create Topic Tool − 此工具创建了一个具有默认分区数、复制因子的主题并使用 Kafka 的默认方案进行副本分配。

  2. List Topic Tool − 此工具列出给定主题列表的信息。如果命令行中未提供任何主题,则此工具将查询 Zookeeper 以获取所有主题并列出其信息。此工具显示的字段包括主题名称、分区、领导者、副本、isr。

  3. Add Partition Tool − 创建一个主题,必须指定主题的分区数。稍后,当主题的卷增大时,主题可能需要更多分区。此工具有助于为特定主题添加更多分区,还允许手动分配已添加分区的副本。

Apache Kafka - Applications

Kafka 支持当今许多最佳工业应用程序。本章中,我们会对 Kafka 的一些最显着应用程序提供非常简要的概述。

Twitter

Twitter 是一个在线社交网络服务,提供一个平台来发送和接收用户推文。已注册的用户可以阅读并发布推文,但未注册的用户只能阅读推文。Twitter 将 Storm-Kafka 用作其流处理基础设施的一部分。

LinkedIn

Apache Kafka 在 LinkedIn 用作活动流数据和运营指标。Kafka 消息系统可帮助 LinkedIn 处理各种产品,比如 LinkedIn 新闻源、LinkedIn 今日在线消息消费以及 Hadoop 等离线分析系统。Kafka 的强大持久性也是连接到 LinkedIn 的关键因素之一。

Netflix

Netflix 是一家美国跨国按需互联网流媒体服务提供商。Netflix 使用 Kafka 进行实时监控和事件处理。

Mozilla

Mozilla 是一个自由软件社区,由 Netscape 成员于 1998 年创立。Kafka 很快就会替换 Mozilla 当前的部分生产系统,以便为遥测、试点等项目收集终端用户浏览器的性能和使用数据。

Oracle

Oracle 从其名为 OSB(Oracle Service Bus)的企业服务总线产品提供对 Kafka 的本机连接,该连接允许开发人员利用 OSB 内置的中介功能来实现分阶段数据管道。