Apache Kafka 简明教程
Apache Kafka - Introduction
大数据中使用大量数据。关于数据,我们有两个主要挑战。第一个挑战是如何收集大量数据,第二个挑战是如何分析收集到的数据。为了应对这些挑战,你肯定需要一个消息传递系统。
In Big Data, an enormous volume of data is used. Regarding data, we have two main challenges.The first challenge is how to collect large volume of data and the second challenge is to analyze the collected data. To overcome those challenges, you must need a messaging system.
Kafka 是专为分布式高吞吐量系统而设计的。Kafka 作为更传统的消息代理的替代品往往能很好地工作。与其他消息传递系统相比,Kafka 具有更好的吞吐量、内置分区、复制和固有的容错性,这使其非常适合大规模消息处理应用程序。
Kafka is designed for distributed high throughput systems. Kafka tends to work very well as a replacement for a more traditional message broker. In comparison to other messaging systems, Kafka has better throughput, built-in partitioning, replication and inherent fault-tolerance, which makes it a good fit for large-scale message processing applications.
What is a Messaging System?
消息传递系统负责在应用程序之间传输数据,因此应用程序可以专注于数据,而不用担心如何共享数据。分布式消息传递基于可靠消息排队的概念。消息在客户端应用程序和消息传递系统之间异步排队。有两种消息传递模式可用 - 一种是点对点,另一种是发布订阅 (pub-sub) 消息传递系统。大多数消息传递模式遵循 pub-sub 。
A Messaging System is responsible for transferring data from one application to another, so the applications can focus on data, but not worry about how to share it. Distributed messaging is based on the concept of reliable message queuing. Messages are queued asynchronously between client applications and messaging system. Two types of messaging patterns are available − one is point to point and the other is publish-subscribe (pub-sub) messaging system. Most of the messaging patterns follow pub-sub.
Point to Point Messaging System
在点对点系统中,消息保存在队列中。一个或多个消费者可以消费队列中的消息,但特定消息只能被最多一个消费者消费。一旦消费者读取了队列中的消息,它就会从该队列中消失。此系统的典型示例是订单处理系统,其中每个订单将由一个订单处理程序处理,但多个订单处理程序也可以同时工作。下图描绘了结构。
In a point-to-point system, messages are persisted in a queue. One or more consumers can consume the messages in the queue, but a particular message can be consumed by a maximum of one consumer only. Once a consumer reads a message in the queue, it disappears from that queue. The typical example of this system is an Order Processing System, where each order will be processed by one Order Processor, but Multiple Order Processors can work as well at the same time. The following diagram depicts the structure.
Publish-Subscribe Messaging System
在发布订阅系统中,消息保存在主题中。与点对点系统不同,消费者可以订阅一个或多个主题并消费该主题中的所有消息。在发布订阅系统中,消息生产者称为发布者,消息消费者称为订阅者。一个现实生活中的例子是 Dish TV,它发布不同的频道,如体育、电影、音乐等,任何人都可以订阅他们自己的一组频道,并在他们订阅的频道可用时获取它们。
In the publish-subscribe system, messages are persisted in a topic. Unlike point-to-point system, consumers can subscribe to one or more topic and consume all the messages in that topic. In the Publish-Subscribe system, message producers are called publishers and message consumers are called subscribers. A real-life example is Dish TV, which publishes different channels like sports, movies, music, etc., and anyone can subscribe to their own set of channels and get them whenever their subscribed channels are available.
What is Kafka?
Apache Kafka 是一个分布式发布订阅消息传递系统,也是一个可以处理大量数据的健壮队列,它使你可以将消息从一个端点传递到另一个端点。Kafka 适用于离线和在线消息消费。Kafka 消息保存在磁盘上并在集群内复制以防止数据丢失。Kafka 构建在 ZooKeeper 同步服务的基础之上。它与 Apache Storm 和 Spark 集成得很好,可用于实时流数据分析。
Apache Kafka is a distributed publish-subscribe messaging system and a robust queue that can handle a high volume of data and enables you to pass messages from one end-point to another. Kafka is suitable for both offline and online message consumption. Kafka messages are persisted on the disk and replicated within the cluster to prevent data loss. Kafka is built on top of the ZooKeeper synchronization service. It integrates very well with Apache Storm and Spark for real-time streaming data analysis.
Benefits
以下是 Kafka 的一些优点 −
Following are a few benefits of Kafka −
-
Reliability − Kafka is distributed, partitioned, replicated and fault tolerance.
-
Scalability − Kafka messaging system scales easily without down time..
-
Durability − Kafka uses Distributed commit log which means messages persists on disk as fast as possible, hence it is durable..
-
Performance − Kafka has high throughput for both publishing and subscribing messages. It maintains stable performance even many TB of messages are stored.
Kafka 非常快,并且保证零停机时间和零数据丢失。
Kafka is very fast and guarantees zero downtime and zero data loss.
Use Cases
Kafka 可以用于许多用例。其中一些如下所示 −
Kafka can be used in many Use Cases. Some of them are listed below −
-
Metrics − Kafka is often used for operational monitoring data. This involves aggregating statistics from distributed applications to produce centralized feeds of operational data.
-
Log Aggregation Solution − Kafka can be used across an organization to collect logs from multiple services and make them available in a standard format to multiple con-sumers.
-
Stream Processing − Popular frameworks such as Storm and Spark Streaming read data from a topic, processes it, and write processed data to a new topic where it becomes available for users and applications. Kafka’s strong durability is also very useful in the context of stream processing.
Need for Kafka
Apache Kafka 是一个用于处理所有实时数据馈送的统一平台。Kafka 支持低延迟的消息传递,并且在发生机器故障时可保证故障容错。它能够处理大量不同的消费者。Kafka 非常快,每秒可执行 200 万次写入。Kafka 将所有数据保存到磁盘,这意味着实际上所有写入都进入操作系统的页面缓存(RAM)。这使得将数据从页面缓存传输到网络套接字非常高效。
Kafka is a unified platform for handling all the real-time data feeds. Kafka supports low latency message delivery and gives guarantee for fault tolerance in the presence of machine failures. It has the ability to handle a large number of diverse consumers. Kafka is very fast, performs 2 million writes/sec. Kafka persists all data to the disk, which essentially means that all the writes go to the page cache of the OS (RAM). This makes it very efficient to transfer data from page cache to a network socket.
Apache Kafka - Fundamentals
在深入了解 Kafka 之前,你必须了解主题、代理、生产者和消费者等主要术语。以下示意图说明了主要术语,表格详细描述了示意图组件。
Before moving deep into the Kafka, you must aware of the main terminologies such as topics, brokers, producers and consumers. The following diagram illustrates the main terminologies and the table describes the diagram components in detail.
在上面的示意图中,一个主题被配置为三个分区。分区 1 具有两个偏移量因子 0 和 1。分区 2 具有四个偏移量因子 0、1、2 和 3。分区 3 具有一个偏移量因子 0。副本的 ID 与托管它的服务器的 ID 相同。
In the above diagram, a topic is configured into three partitions. Partition 1 has two offset factors 0 and 1. Partition 2 has four offset factors 0, 1, 2, and 3. Partition 3 has one offset factor 0. The id of the replica is same as the id of the server that hosts it.
假设主题的复制因子被设置为 3,那么 Kafka 将为每个分区创建 3 个相同的副本并将它们放在群集内,以用于所有操作。为了平衡集群中的负载,每个代理会存储一个或多个分区。多个生产者和消费者可以同时发布和检索消息。
Assume, if the replication factor of the topic is set to 3, then Kafka will create 3 identical replicas of each partition and place them in the cluster to make available for all its operations. To balance a load in cluster, each broker stores one or more of those partitions. Multiple producers and consumers can publish and retrieve messages at the same time.
S.No |
Components and Description |
1 |
Topics A stream of messages belonging to a particular category is called a topic. Data is stored in topics. Topics are split into partitions. For each topic, Kafka keeps a mini-mum of one partition. Each such partition contains messages in an immutable ordered sequence. A partition is implemented as a set of segment files of equal sizes. |
2 |
Partition Topics may have many partitions, so it can handle an arbitrary amount of data. |
3 |
Partition offset Each partitioned message has a unique sequence id called as offset. |
4 |
Replicas of partition Replicas are nothing but backups of a partition. Replicas are never read or write data. They are used to prevent data loss. |
5 |
Brokers Brokers are simple system responsible for maintaining the pub-lished data. Each broker may have zero or more partitions per topic. Assume, if there are N partitions in a topic and N number of brokers, each broker will have one partition. Assume if there are N partitions in a topic and more than N brokers (n + m), the first N broker will have one partition and the next M broker will not have any partition for that particular topic. Assume if there are N partitions in a topic and less than N brokers (n-m), each broker will have one or more partition sharing among them. This scenario is not recommended due to unequal load distri-bution among the broker. |
6 |
Kafka Cluster Kafka’s having more than one broker are called as Kafka cluster. A Kafka cluster can be expanded without downtime. These clusters are used to manage the persistence and replication of message data. |
7 |
Producers Producers are the publisher of messages to one or more Kafka topics. Producers send data to Kafka brokers. Every time a producer pub-lishes a message to a broker, the broker simply appends the message to the last segment file. Actually, the message will be appended to a partition. Producer can also send messages to a partition of their choice. |
8 |
Consumers Consumers read data from brokers. Consumers subscribes to one or more topics and consume published messages by pulling data from the brokers. |
9 |
Leader Leader is the node responsible for all reads and writes for the given partition. Every partition has one server acting as a leader. |
10 |
Follower Node which follows leader instructions are called as follower. If the leader fails, one of the follower will automatically become the new leader. A follower acts as normal consumer, pulls messages and up-dates its own data store. |
Apache Kafka - Cluster Architecture
查看以下示意图。它显示了 Kafka 的集群图。
Take a look at the following illustration. It shows the cluster diagram of Kafka.
下表描述了上面示意图中显示的每个组件。
The following table describes each of the components shown in the above diagram.
S.No |
Components and Description |
1 |
Broker Kafka cluster typically consists of multiple brokers to maintain load balance. Kafka brokers are stateless, so they use ZooKeeper for maintaining their cluster state. One Kafka broker instance can handle hundreds of thousands of reads and writes per second and each bro-ker can handle TB of messages without performance impact. Kafka broker leader election can be done by ZooKeeper. |
2 |
ZooKeeper ZooKeeper is used for managing and coordinating Kafka broker. ZooKeeper service is mainly used to notify producer and consumer about the presence of any new broker in the Kafka system or failure of the broker in the Kafka system. As per the notification received by the Zookeeper regarding presence or failure of the broker then pro-ducer and consumer takes decision and starts coordinating their task with some other broker. |
3 |
Producers Producers push data to brokers. When the new broker is started, all the producers search it and automatically sends a message to that new broker. Kafka producer doesn’t wait for acknowledgements from the broker and sends messages as fast as the broker can handle. |
4 |
Consumers Since Kafka brokers are stateless, which means that the consumer has to maintain how many messages have been consumed by using partition offset. If the consumer acknowledges a particular message offset, it implies that the consumer has consumed all prior messages. The consumer issues an asynchronous pull request to the broker to have a buffer of bytes ready to consume. The consumers can rewind or skip to any point in a partition simply by supplying an offset value. Consumer offset value is notified by ZooKeeper. |
Apache Kafka - WorkFlow
目前,我们已讨论了 Kafka 的核心概念。现在让我们重点介绍一下 Kafka 的工作流。
As of now, we discussed the core concepts of Kafka. Let us now throw some light on the workflow of Kafka.
Kafka 仅仅是由一个或多个分区分割而成的主题集合。Kafka 分区是有序的消息序列,其中每条消息都由其索引(称为偏移量)标识。Kafka 集群中的所有数据都是分区的离散并集。传入消息被写入分区的末尾,消息被消费者依次读取。通过将消息复制到不同的代理来提供持久性。
Kafka is simply a collection of topics split into one or more partitions. A Kafka partition is a linearly ordered sequence of messages, where each message is identified by their index (called as offset). All the data in a Kafka cluster is the disjointed union of partitions. Incoming messages are written at the end of a partition and messages are sequentially read by consumers. Durability is provided by replicating messages to different brokers.
Kafka 采用快速、可靠、持久、容错和零停机方式提供发布-订阅和基于队列的消息系统。在这两种情况下,生产者只需将消息发送到主题,消费者即可根据需要选择任一类型的消息系统。让我们按照下一部分中的步骤来了解消费者如何选择其所选择的消息系统。
Kafka provides both pub-sub and queue based messaging system in a fast, reliable, persisted, fault-tolerance and zero downtime manner. In both cases, producers simply send the message to a topic and consumer can choose any one type of messaging system depending on their need. Let us follow the steps in the next section to understand how the consumer can choose the messaging system of their choice.
Workflow of Pub-Sub Messaging
以下是发布-订阅消息的逐步工作流 −
Following is the step wise workflow of the Pub-Sub Messaging −
-
Producers send message to a topic at regular intervals.
-
Kafka broker stores all messages in the partitions configured for that particular topic. It ensures the messages are equally shared between partitions. If the producer sends two messages and there are two partitions, Kafka will store one message in the first partition and the second message in the second partition.
-
Consumer subscribes to a specific topic.
-
Once the consumer subscribes to a topic, Kafka will provide the current offset of the topic to the consumer and also saves the offset in the Zookeeper ensemble.
-
Consumer will request the Kafka in a regular interval (like 100 Ms) for new messages.
-
Once Kafka receives the messages from producers, it forwards these messages to the consumers.
-
Consumer will receive the message and process it.
-
Once the messages are processed, consumer will send an acknowledgement to the Kafka broker.
-
Once Kafka receives an acknowledgement, it changes the offset to the new value and updates it in the Zookeeper. Since offsets are maintained in the Zookeeper, the consumer can read next message correctly even during server outrages.
-
This above flow will repeat until the consumer stops the request.
-
Consumer has the option to rewind/skip to the desired offset of a topic at any time and read all the subsequent messages.
Workflow of Queue Messaging / Consumer Group
在队列消息系统中,一群具有相同组 ID 的消费者会订阅主题,而不是单个消费者。简单来说,订阅具有相同组 ID 的主题的消费者被视为一个组,消息在它们之间共享。让我们查看此系统的实际工作流。
In a queue messaging system instead of a single consumer, a group of consumers having the same Group ID will subscribe to a topic. In simple terms, consumers subscribing to a topic with same Group ID are considered as a single group and the messages are shared among them. Let us check the actual workflow of this system.
-
Producers send message to a topic in a regular interval.
-
Kafka stores all messages in the partitions configured for that particular topic similar to the earlier scenario.
-
A single consumer subscribes to a specific topic, assume Topic-01 with Group ID as Group-1.
-
Kafka interacts with the consumer in the same way as Pub-Sub Messaging until new consumer subscribes the same topic, Topic-01 with the same Group ID as Group-1.
-
Once the new consumer arrives, Kafka switches its operation to share mode and shares the data between the two consumers. This sharing will go on until the number of con-sumers reach the number of partition configured for that particular topic.
-
Once the number of consumer exceeds the number of partitions, the new consumer will not receive any further message until any one of the existing consumer unsubscribes. This scenario arises because each consumer in Kafka will be assigned a minimum of one partition and once all the partitions are assigned to the existing consumers, the new consumers will have to wait.
-
This feature is also called as Consumer Group. In the same way, Kafka will provide the best of both the systems in a very simple and efficient manner.
Role of ZooKeeper
Apache Kafka 的一个关键依赖是 Apache Zookeeper,这是一项分布式配置与同步服务。Zookeeper 是 Kafka 代理与使用者之间的协调界面。Kafka 服务器通过 Zookeeper 丛集共享资讯。Kafka 会在 Zookeeper 中存储基本元资料,例如关于主题、代理、使用者偏移(队列阅读器)等資訊。
A critical dependency of Apache Kafka is Apache Zookeeper, which is a distributed configuration and synchronization service. Zookeeper serves as the coordination interface between the Kafka brokers and consumers. The Kafka servers share information via a Zookeeper cluster. Kafka stores basic metadata in Zookeeper such as information about topics, brokers, consumer offsets (queue readers) and so on.
由于所有关键资讯都被存储在 Zookeeper 中,而且它通常会跨其整体复制该资料,因此 Kafka 代理/Zookeeper 的故障不会影响 Kafka 丛集的状态。一旦 Zookeeper 重新启动,Kafka 将还原状态。这为 Kafka 带来了零停机时间。在领导者发生故障时,Kafka 代理之间的领导者选举也是通过使用 Zookeeper 进行的。
Since all the critical information is stored in the Zookeeper and it normally replicates this data across its ensemble, failure of Kafka broker / Zookeeper does not affect the state of the Kafka cluster. Kafka will restore the state, once the Zookeeper restarts. This gives zero downtime for Kafka. The leader election between the Kafka broker is also done by using Zookeeper in the event of leader failure.
要深入了解 Zookeeper,请参阅 zookeeper 。
To learn more on Zookeeper, please refer zookeeper
在下一章节中,让我们继续深入了解如何在您的机器上安装 Java、ZooKeeper 和 Kafka。
Let us continue further on how to install Java, ZooKeeper, and Kafka on your machine in the next chapter.
Apache Kafka - Installation Steps
以下是在您的机器上安装 Java 的步骤。
Following are the steps for installing Java on your machine.
Step 1 - Verifying Java Installation
希望您现在已经安装 Java,只需使用以下命令进行验证。
Hopefully you have already installed java on your machine right now, so you just verify it using the following command.
$ java -version
如果 Java 在您的设备上安装成功,您将看到所安装 Java 的版本。
If java is successfully installed on your machine, you could see the version of the installed Java.
Step 1.1 - Download JDK
如果没有下载 Java,请访问以下链接下载最新版本的 JDK,然后下载最新版本。
If Java is not downloaded, please download the latest version of JDK by visiting the following link and download latest version.
现在最新版本是 JDK 8u 60,文件是“jdk-8u60-linux-x64.tar.gz”。请在您的机器上下载此文件。
Now the latest version is JDK 8u 60 and the file is “jdk-8u60-linux-x64.tar.gz”. Please download the file on your machine.
Step 1.2 - Extract Files
一般来说,下载的文件都存储在下载文件夹中,验证并使用以下命令提取 tar 设置。
Generally, files being downloaded are stored in the downloads folder, verify it and extract the tar setup using the following commands.
$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz
Step 1.3 - Move to Opt Directory
为了让所有使用者都能使用 java,将提取的 java 内容移动到 usr/local/java/ 文件夹中。
To make java available to all users, move the extracted java content to usr/local/java/ folder.
$ su
password: (type password of root user)
$ mkdir /opt/jdk
$ mv jdk-1.8.0_60 /opt/jdk/
Step 1.4 - Set path
若要设置 path 及 JAVA_HOME 变量,请将以下命令添加到 ~/.bashrc 文件中。
To set path and JAVA_HOME variables, add the following commands to ~/.bashrc file.
export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin
现在,将所有变更应用到当前运行系统中。
Now apply all the changes into current running system.
$ source ~/.bashrc
Step 1.5 - Java Alternatives
使用以下命令更改 Java 备用项。
Use the following command to change Java Alternatives.
update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100
Step 1.6 − 现在使用步骤 1 中说明的验证命令 (java -version) 验证 java。
Step 1.6 − Now verify java using verification command (java -version) explained in Step 1.
Step 2 - ZooKeeper Framework Installation
Step 2.1 - Download ZooKeeper
要将 ZooKeeper 框架安装到您的计算机上,请访问以下链接并下载 ZooKeeper 的最新版本。
To install ZooKeeper framework on your machine, visit the following link and download the latest version of ZooKeeper.
截至目前,ZooKeeper 的最新版本是 3.4.6 (ZooKeeper-3.4.6.tar.gz)。
As of now, latest version of ZooKeeper is 3.4.6 (ZooKeeper-3.4.6.tar.gz).
Step 2.2 - Extract tar file
使用以下命令解压 tar 文件
Extract tar file using the following command
$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data
Step 2.3 - Create Configuration File
使用命令 vi "conf/zoo.cfg" 打开名为 conf/zoo.cfg 的配置文件,并将所有以下参数设置为起点。
Open Configuration File named conf/zoo.cfg using the command vi “conf/zoo.cfg” and all the following parameters to set as starting point.
$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2
配置文件成功保存并返回到终端后,您就可以启动 zookeeper 服务器。
Once the configuration file has been saved successfully and return to terminal again, you can start the zookeeper server.
Step 2.4 - Start ZooKeeper Server
$ bin/zkServer.sh start
执行此命令后,您将得到以下所示的响应 −
After executing this command, you will get a response as shown below −
$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED
Step 2.5 - Start CLI
$ bin/zkCli.sh
输入上述命令之后,您将连接到 zookeeper 服务器,并将得到以下响应。
After typing the above command, you will be connected to the zookeeper server and will get the below response.
Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]
Step 2.6 - Stop Zookeeper Server
连接服务器并执行所有操作之后,您可以使用以下命令停止 zookeeper 服务器 −
After connecting the server and performing all the operations, you can stop the zookeeper server with the following command −
$ bin/zkServer.sh stop
现在您已成功在自己的计算机上安装 Java 和 ZooKeeper。让我们看看安装 Apache Kafka 的步骤。
Now you have successfully installed Java and ZooKeeper on your machine. Let us see the steps to install Apache Kafka.
Step 3 - Apache Kafka Installation
让我们继续执行以下步骤以在您的计算机上安装 Kafka。
Let us continue with the following steps to install Kafka on your machine.
Step 3.1 - Download Kafka
要将 Kafka 服务器安装到计算机上,请点击以下链接 −
To install Kafka on your machine, click on the below link −
现在,最新版本,即 – kafka_2.11_0.9.0.0.tgz 将下载到您的计算机上。
Now the latest version i.e., – kafka_2.11_0.9.0.0.tgz will be downloaded onto your machine.
Step 3.2 - Extract the tar file
使用以下命令解压 tar 文件 −
Extract the tar file using the following command −
$ cd opt/
$ tar -zxf kafka_2.11.0.9.0.0 tar.gz
$ cd kafka_2.11.0.9.0.0
现在您已在自己的计算机上下载了 Kafka 的最新版本。
Now you have downloaded the latest version of Kafka on your machine.
Step 3.3 - Start Server
您可以使用以下命令启动服务器 −
You can start the server by giving the following command −
$ bin/kafka-server-start.sh config/server.properties
服务器启动后,您将在屏幕上看到以下响应 −
After the server starts, you would see the below response on your screen −
$ bin/kafka-server-start.sh config/server.properties
[2016-01-02 15:37:30,410] INFO KafkaConfig values:
request.timeout.ms = 30000
log.roll.hours = 168
inter.broker.protocol.version = 0.9.0.X
log.preallocate = false
security.inter.broker.protocol = PLAINTEXT
…………………………………………….
…………………………………………….
Step 4 - Stop the Server
执行所有操作之后,您可以使用以下命令停止服务器 −
After performing all the operations, you can stop the server using the following command −
$ bin/kafka-server-stop.sh config/server.properties
我们已经讨论了 Kafka 安装,在下一章中,我们可以了解如何对 Kafka 执行基本操作。
Now that we have already discussed the Kafka installation, we can learn how to perform basic operations on Kafka in the next chapter.
Apache Kafka - Basic Operations
首先让我们开始实现单节点单代理配置,然后我们将设置迁移到单节点多代理配置。
First let us start implementing single node-single broker configuration and we will then migrate our setup to single node-multiple brokers configuration.
希望你现在已经安装了 Java、ZooKeeper 和 Kafka。在转到 Kafka 集群设置之前,首先需要启动 ZooKeeper,因为 Kafka 集群使用 ZooKeeper。
Hopefully you would have installed Java, ZooKeeper and Kafka on your machine by now. Before moving to the Kafka Cluster Setup, first you would need to start your ZooKeeper because Kafka Cluster uses ZooKeeper.
Start ZooKeeper
打开一个新的终端并输入以下命令 -
Open a new terminal and type the following command −
bin/zookeeper-server-start.sh config/zookeeper.properties
要启动 Kafka 代理,请键入以下命令 -
To start Kafka Broker, type the following command −
bin/kafka-server-start.sh config/server.properties
启动 Kafka 代理后,在 ZooKeeper 终端上输入命令 jps,您将看到以下响应 -
After starting Kafka Broker, type the command jps on ZooKeeper terminal and you would see the following response −
821 QuorumPeerMain
928 Kafka
931 Jps
现在您可以在终端上看到两个正在运行的守护程序,其中 QuorumPeerMain 是 ZooKeeper 守护程序,另一个是 Kafka 守护程序。
Now you could see two daemons running on the terminal where QuorumPeerMain is ZooKeeper daemon and another one is Kafka daemon.
Single Node-Single Broker Configuration
在此配置中,您有一个 ZooKeeper 和经纪人 ID 实例。以下是如何配置它的步骤 -
In this configuration you have a single ZooKeeper and broker id instance. Following are the steps to configure it −
Creating a Kafka Topic - Kafka 提供了一个名为 kafka-topics.sh 的命令行实用程序,用于在服务器上创建主题。打开一个新的终端并输入以下示例。
Creating a Kafka Topic − Kafka provides a command line utility named kafka-topics.sh to create topics on the server. Open new terminal and type the below example.
Syntax
Syntax
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic topic-name
Example
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic Hello-Kafka
我们刚刚创建了一个名为 Hello-Kafka 的主题,其中只有一个分区和一个副本因子。上述创建的输出将类似于以下输出 -
We just created a topic named Hello-Kafka with a single partition and one replica factor. The above created output will be similar to the following output −
Output - 创建了主题 Hello-Kafka
Output − Created topic Hello-Kafka
在创建主题后,您可以在 Kafka 经纪人终端窗口中获取通知,并在 config/server.properties 文件中指定在“/tmp/kafka-logs/”中创建的主题的日志。
Once the topic has been created, you can get the notification in Kafka broker terminal window and the log for the created topic specified in “/tmp/kafka-logs/“ in the config/server.properties file.
List of Topics
要获取 Kafka 服务器中的主题列表,可以使用以下命令 -
To get a list of topics in Kafka server, you can use the following command −
Syntax
Syntax
bin/kafka-topics.sh --list --zookeeper localhost:2181
Output
Hello-Kafka
由于我们已创建了一个主题,它将仅列出 Hello-Kafka。假设,如果你创建多个主题,你会在输出中获得主题名称。
Since we have created a topic, it will list out Hello-Kafka only. Suppose, if you create more than one topics, you will get the topic names in the output.
Start Producer to Send Messages
Syntax
Syntax
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name
从上述语法中,生产者命令行客户端需要两个主要参数 −
From the above syntax, two main parameters are required for the producer command line client −
Broker-list − 我们希望向其发送消息的代理列表。在此情况下,我们只有一个代理。Config/server.properties 文件包含代理端口 ID,因为我们知道我们的代理在端口 9092 上侦听,因此你可以直接指定它。
Broker-list − The list of brokers that we want to send the messages to. In this case we only have one broker. The Config/server.properties file contains broker port id, since we know our broker is listening on port 9092, so you can specify it directly.
主题名称 − 以下是主题名称的示例。
Topic name − Here is an example for the topic name.
Example
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
生产者将等待来自标准输入的输入,并将其发布到 Kafka 集群。默认情况下,每行都将发布为一条新消息,然后在 config/producer.properties 文件中指定默认生产者属性。现在,你可以在终端中键入几行消息,如下所示。
The producer will wait on input from stdin and publishes to the Kafka cluster. By default, every new line is published as a new message then the default producer properties are specified in config/producer.properties file. Now you can type a few lines of messages in the terminal as shown below.
Output
$ bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Hello-Kafka[2016-01-16 13:50:45,931]
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message
Start Consumer to Receive Messages
与生产者类似,默认消费者属性在 config/consumer.proper-ties 文件中指定。打开一个新终端,并键入以下语法以使用消息。
Similar to producer, the default consumer properties are specified in config/consumer.proper-ties file. Open a new terminal and type the below syntax for consuming messages.
Syntax
Syntax
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name
--from-beginning
Example
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka
--from-beginning
Output
Hello
My first message
My second message
最后,你可以从生产者的终端输入消息,并看到它们出现在消费者的终端中。截至目前,你对具有单个代理的单节点集群有很好的理解。现在让我们转向多代理配置。
Finally, you are able to enter messages from the producer’s terminal and see them appearing in the consumer’s terminal. As of now, you have a very good understanding on the single node cluster with a single broker. Let us now move on to the multiple brokers configuration.
Single Node-Multiple Brokers Configuration
在继续进行多代理集群设置之前,首先启动 ZooKeeper 服务器。
Before moving on to the multiple brokers cluster setup, first start your ZooKeeper server.
Create Multiple Kafka Brokers − 我们已经在 con-fig/server.properties 中拥有一个 Kafka 代理实例。现在我们需要多个代理实例,因此将现有 server.prop-erties 文件复制到两个新配置文件中,并将其重命名为 server-one.properties 和 server-two.prop-erties。然后编辑这两个新文件,并分配以下更改 −
Create Multiple Kafka Brokers − We have one Kafka broker instance already in con-fig/server.properties. Now we need multiple broker instances, so copy the existing server.prop-erties file into two new config files and rename it as server-one.properties and server-two.prop-erties. Then edit both new files and assign the following changes −
config/server-one.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1
config/server-two.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2
Start Multiple Brokers − 在三台服务器上进行所有更改后,打开三个新终端来逐个启动每个代理。
Start Multiple Brokers− After all the changes have been made on three servers then open three new terminals to start each broker one by one.
Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties
现在,我们有三个不同的代理在机器上运行。尝试自己通过在 ZooKeeper 终端上键入 jps 来检查所有守护程序,然后你将看到响应。
Now we have three different brokers running on the machine. Try it by yourself to check all the daemons by typing jps on the ZooKeeper terminal, then you would see the response.
Creating a Topic
让我们为此主题分配 3 的复制因子值,因为我们有三个不同的代理在运行。如果你有两个代理,那么指定的副本值将为 2。
Let us assign the replication factor value as three for this topic because we have three different brokers running. If you have two brokers, then the assigned replica value will be two.
Syntax
Syntax
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic topic-name
Example
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic Multibrokerapplication
Output
created topic “Multibrokerapplication”
Describe 命令用于检查哪个代理正在侦听当前创建的主题,如下所示 −
The Describe command is used to check which broker is listening on the current created topic as shown below −
bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation
Output
bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation
Topic:Multibrokerapplication PartitionCount:1
ReplicationFactor:3 Configs:
Topic:Multibrokerapplication Partition:0 Leader:0
Replicas:0,2,1 Isr:0,2,1
从上述输出中,我们可以得出结论,第一行概述了所有分区,显示了主题名称、分区数量以及我们已经选择的复制因子。在第二行中,每个节点将成为随机选择的分区部分的领导者。
From the above output, we can conclude that first line gives a summary of all the partitions, showing topic name, partition count and the replication factor that we have chosen already. In the second line, each node will be the leader for a randomly selected portion of the partitions.
在我们的案例中,我们看到我们的第一个代理(代理 ID 为 0)是领导者。然后 Replicas:0,2,1 表示所有代理最终会复制该主题。Isr 是同步副本的集合。那么,这是当前处于正常状态并且被领导者赶上的副本的子集。
In our case, we see that our first broker (with broker.id 0) is the leader. Then Replicas:0,2,1 means that all the brokers replicate the topic finally Isr is the set of in-sync replicas. Well, this is the subset of replicas that are currently alive and caught up by the leader.
Start Producer to Send Messages
此过程与单代理设置中的过程相同。
This procedure remains the same as in the single broker setup.
Example
bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Multibrokerapplication
Output
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message
Start Consumer to Receive Messages
此过程与在单代理设置中显示的过程相同。
This procedure remains the same as shown in the single broker setup.
Example
bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion --from-beginning
Output
bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message
Basic Topic Operations
在本节中,我们将讨论各个基本主题操作。
In this chapter we will discuss the various basic topic operations.
Modifying a Topic
正如你已经了解如何在 Kafka 集群中创建主题。现在让我们使用以下命令修改已创建的主题
As you have already understood how to create a topic in Kafka Cluster. Now let us modify a created topic using the following command
Syntax
Syntax
bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name
--parti-tions count
Example
We have already created a topic “Hello-Kafka” with single partition count and one replica factor.
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181
--alter --topic Hello-kafka --parti-tions 2
Output
WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
Deleting a Topic
要删除主题,可以使用以下语法。
To delete a topic, you can use the following syntax.
Syntax
Syntax
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
Example
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka
Output
> Topic Hello-kafka marked for deletion
Note −*This will have no impact if *delete.topic.enable 未设置为 true
Note −*This will have no impact if *delete.topic.enable is not set to true
Apache Kafka - Simple Producer Example
让我们使用 Java 客户端创建用于发布和消费消息的应用程序。Kafka 制作者客户端由以下 API 组成。
Let us create an application for publishing and consuming messages using a Java client. Kafka producer client consists of the following API’s.
KafkaProducer API
让我们在本节中了解最重要的 Kafka 制作者 API 的集合。KafkaProducer API 的主要部分是 KafkaProducer 类。KafkaProducer 类提供了一个选项,可以使用以下方法在其构造函数中连接 Kafka 代理。
Let us understand the most important set of Kafka producer API in this section. The central part of the KafkaProducer API is KafkaProducer class. The KafkaProducer class provides an option to connect a Kafka broker in its constructor with the following methods.
-
KafkaProducer class provides send method to send messages asynchronously to a topic. The signature of send() is as follows
producer.send(new ProducerRecord<byte[],byte[]>(topic,
partition, key1, value1) , callback);
-
ProducerRecord − The producer manages a buffer of records waiting to be sent.
-
Callback − A user-supplied callback to execute when the record has been acknowl-edged by the server (null indicates no callback).
-
KafkaProducer class provides a flush method to ensure all previously sent messages have been actually completed. Syntax of the flush method is as follows −
public void flush()
-
KafkaProducer class provides partitionFor method, which helps in getting the partition metadata for a given topic. This can be used for custom partitioning. The signature of this method is as follows −
public Map metrics()
它返回 制作者维护的内部度量指标映射。
It returns the map of internal metrics maintained by the producer.
-
public void close() − KafkaProducer class provides close method blocks until all previously sent requests are completed.
Producer API
制作者 API 的核心部分是 制作者 类。制作者 类提供了一个选项,可以通过以下方法在其构造函数中连接 Kafka 代理。
The central part of the Producer API is Producer class. Producer class provides an option to connect Kafka broker in its constructor by the following methods.
The Producer Class
制作者 类提供 send 方法,以使用以下签名将 send 消息发送到单个或多个主题。
The producer class provides send method to send messages to either single or multiple topics using the following signatures.
public void send(KeyedMessaget<k,v> message)
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);
有两种类型的制作者 - Sync 和 Async 。
There are two types of producers – Sync and Async.
相同的 API 配置也适用于同步制作者。它们之间的区别在于,同步制作者直接发送消息,但在后台发送消息。当您需要更高的吞吐量时,首选异步制作者。在 0.8 等早期版本中,异步制作者没有用于注册错误处理程序的 send() 回调。它仅在当前的 0.9 版中可用。
The same API configuration applies to Sync producer as well. The difference between them is a sync producer sends messages directly, but sends messages in background. Async producer is preferred when you want a higher throughput. In the previous releases like 0.8, an async producer does not have a callback for send() to register error handlers. This is available only in the current release of 0.9.
Configuration Settings
制作者 API 的主要配置设置列在以下表格中,以便更好地理解 -
The Producer API’s main configuration settings are listed in the following table for better under-standing −
S.No |
Configuration Settings and Description |
1 |
client.id identifies producer application |
2 |
producer.type either sync or async |
3 |
acks The acks config controls the criteria under producer requests are con-sidered complete. |
4 |
retries If producer request fails, then automatically retry with specific value. |
5 |
bootstrap.servers bootstrapping list of brokers. |
6 |
linger.ms if you want to reduce the number of requests you can set linger.ms to something greater than some value. |
7 |
key.serializer Key for the serializer interface. |
8 |
value.serializer value for the serializer interface. |
9 |
batch.size Buffer size. |
10 |
buffer.memory controls the total amount of memory available to the producer for buff-ering. |
ProducerRecord API
ProducerRecord 是一对键/值,发送到 Kafka 群集。ProducerRecord 类构造函数使用以下签名,根据分区、键和值对创建记录。
ProducerRecord is a key/value pair that is sent to Kafka cluster.ProducerRecord class constructor for creating a record with partition, key and value pairs using the following signature.
public ProducerRecord (string topic, int partition, k key, v value)
-
Topic − user defined topic name that will appended to record.
-
Partition − partition count
-
Key − The key that will be included in the record.
-
Value − Record contents
public ProducerRecord (string topic, k key, v value)
ProducerRecord 类构造函数用于创建具有键、值对但不具有分区的记录。
ProducerRecord class constructor is used to create a record with key, value pairs and without partition.
-
Topic − Create a topic to assign record.
-
Key − key for the record.
-
Value − record contents.
public ProducerRecord (string topic, v value)
ProducerRecord 类创建一个没有分区和密钥的记录。
ProducerRecord class creates a record without partition and key.
-
Topic − create a topic.
-
Value − record contents.
ProducerRecord 类方法在以下表格中列出:
The ProducerRecord class methods are listed in the following table −
S.No |
Class Methods and Description |
1 |
public string topic() Topic will append to the record. |
2 |
public K key() Key that will be included in the record. If no such key, null will be re-turned here. |
3 |
public V value() Record contents. |
4 |
partition() Partition count for the record |
SimpleProducer application
在创建应用程序之前,首先启动 ZooKeeper 和 Kafka 代理,然后使用 create topic 命令在 Kafka 代理中创建自己的主题。然后创建一个名为 SimpleProducer.java 的 Java 类,并输入以下代码。
Before creating the application, first start ZooKeeper and Kafka broker then create your own topic in Kafka broker using create topic command. After that create a java class named Sim-pleProducer.java and type in the following coding.
//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer”
public class SimpleProducer {
public static void main(String[] args) throws Exception{
// Check arguments length value
if(args.length == 0){
System.out.println("Enter topic name");
return;
}
//Assign topicName to string variable
String topicName = args[0].toString();
// create instance for properties to access producer configs
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", “localhost:9092");
//Set acknowledgements for producer requests.
props.put("acks", “all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
System.out.println(“Message sent successfully”);
producer.close();
}
}
Compilation −可以使用以下命令编译应用程序。
Compilation − The application can be compiled using the following command.
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
Execution −可以使用以下命令执行应用程序。
Execution − The application can be executed using the following command.
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>
Output
Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10
Simple Consumer Example
截至目前,我们已经创建了一个生产者来向 Kafka 群集发送消息。现在让我们创建一个消费者,以便从 Kafka 群集中获取消息。KafkaConsumer API 用于获取 Kafka 群集中的消息。下面定义了 KafkaConsumer 类构造函数。
As of now we have created a producer to send messages to Kafka cluster. Now let us create a consumer to consume messages form the Kafka cluster. KafkaConsumer API is used to consume messages from the Kafka cluster. KafkaConsumer class constructor is defined below.
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
configs − 返回消费者配置映射。
configs − Return a map of consumer configs.
KafkaConsumer 类有下面表格中列出的几个重要的方法。
KafkaConsumer class has the following significant methods that are listed in the table below.
S.No |
Method and Description |
1 |
public java.util.Set<TopicPar-tition> assignment() Get the set of partitions currently assigned by the con-sumer. |
2 |
public string subscription() Subscribe to the given list of topics to get dynamically as-signed partitions. |
3 |
public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener) Subscribe to the given list of topics to get dynamically as-signed partitions. |
4 |
public void unsubscribe() Unsubscribe the topics from the given list of partitions. |
5 |
public void sub-scribe(java.util.List<java.lang.String> topics) Subscribe to the given list of topics to get dynamically as-signed partitions. If the given list of topics is empty, it is treated the same as unsubscribe(). |
6 |
public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener) The argument pattern refers to the subscribing pattern in the format of regular expression and the listener argument gets notifications from the subscribing pattern. |
7 |
public void as-sign(java.util.List<TopicParti-tion> partitions) Manually assign a list of partitions to the customer. |
8 |
poll() Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. This will return error, if the topics are not subscribed before the polling for data. |
9 |
public void commitSync() Commit offsets returned on the last poll() for all the sub-scribed list of topics and partitions. The same operation is applied to commitAsyn(). |
10 |
public void seek(TopicPartition partition, long offset) Fetch the current offset value that consumer will use on the next poll() method. |
11 |
public void resume() Resume the paused partitions. |
12 |
public void wakeup() Wakeup the consumer. |
ConsumerRecord API
ConsumerRecord API 用于从 Kafka 集群接收记录。此 API 包含一个主题名称、分区号(记录从中接收)和指向 Kafka 分区中记录的偏移量。ConsumerRecord 类用于使用特定的主题名称、分区计数和 <key, value> 对创建消费者记录。它具有以下特征。
The ConsumerRecord API is used to receive records from the Kafka cluster. This API consists of a topic name, partition number, from which the record is being received and an offset that points to the record in a Kafka partition. ConsumerRecord class is used to create a consumer record with specific topic name, partition count and <key, value> pairs. It has the following signature.
public ConsumerRecord(string topic,int partition, long offset,K key, V value)
-
Topic − The topic name for consumer record received from the Kafka cluster.
-
Partition − Partition for the topic.
-
Key − The key of the record, if no key exists null will be returned.
-
Value − Record contents.
ConsumerRecords API
ConsumerRecords API 充当 ConsumerRecord 的容器。此 API 用于为特定主题中的每个分区保留 ConsumerRecord 列表。其构造函数定义如下。
ConsumerRecords API acts as a container for ConsumerRecord. This API is used to keep the list of ConsumerRecord per partition for a particular topic. Its Constructor is defined below.
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
-
TopicPartition − Return a map of partition for a particular topic.
-
Records − Return list of ConsumerRecord.
ConsumerRecords 类定义了以下方法。
ConsumerRecords class has the following methods defined.
S.No |
Methods and Description |
1 |
public int count() The number of records for all the topics. |
2 |
public Set partitions() The set of partitions with data in this record set (if no data was returned then the set is empty). |
3 |
public Iterator iterator() Iterator enables you to cycle through a collection, obtaining or re-moving elements. |
4 |
public List records() Get list of records for the given partition. |
Configuration Settings
下面列出消费者客户端 API 主要配置设置的配置设置 −
The configuration settings for the Consumer client API main configuration settings are listed below −
S.No |
Settings and Description |
1 |
bootstrap.servers Bootstrapping list of brokers. |
2 |
group.id Assigns an individual consumer to a group. |
3 |
enable.auto.commit Enable auto commit for offsets if the value is true, otherwise not committed. |
4 |
auto.commit.interval.ms Return how often updated consumed offsets are written to ZooKeeper. |
5 |
session.timeout.ms Indicates how many milliseconds Kafka will wait for the ZooKeeper to respond to a request (read or write) before giving up and continuing to consume messages. |
SimpleConsumer Application
此处的生产者应用程序步骤保持不变。首先,启动 ZooKeeper 和 Kafka 代理。然后使用名为 SimpleConsumer.java 的 java 类创建一个 SimpleConsumer 应用程序,并键入以下代码。
The producer application steps remain the same here. First, start your ZooKeeper and Kafka broker. Then create a SimpleConsumer application with the java class named SimpleCon-sumer.java and type the following code.
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
if(args.length == 0){
System.out.println("Enter topic name");
return;
}
//Kafka consumer configuration settings
String topicName = args[0].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName))
//print the topic name
System.out.println("Subscribed to topic " + topicName);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = con-sumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
Compilation −可以使用以下命令编译应用程序。
Compilation − The application can be compiled using the following command.
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
*执行 − *可以使用以下命令执行应用程序
*Execution − *The application can be executed using the following command
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>
Input − 打开生产者 CLI 并向主题发送一些消息。您可以将 Smple 输入作为“Hello Consumer”。
Input − Open the producer CLI and send some messages to the topic. You can put the smple input as ‘Hello Consumer’.
Output − 以下是输出。
Output − Following will be the output.
Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer
Apache Kafka - Consumer Group Example
消费者组是从 Kafka 主题进行多线程或多机器消费。
Consumer group is a multi-threaded or multi-machine consumption from Kafka topics.
Consumer Group
-
Consumers can join a group by using the samegroup.id.
-
The maximum parallelism of a group is that the number of consumers in the group ← no of partitions.
-
Kafka assigns the partitions of a topic to the consumer in a group, so that each partition is consumed by exactly one consumer in the group.
-
Kafka guarantees that a message is only ever read by a single consumer in the group.
-
Consumers can see the message in the order they were stored in the log.
Re-balancing of a Consumer
添加更多进程/线程会导致 Kafka 重新平衡。如果任何消费者或代理未发送心跳至 ZooKeeper,则可通过 Kafka 集群重新配置它。在此重新平衡期间,Kafka 将可用的分区分配到可用的线程,可能会将某个分区移动到另一个进程。
Adding more processes/threads will cause Kafka to re-balance. If any consumer or broker fails to send heartbeat to ZooKeeper, then it can be re-configured via the Kafka cluster. During this re-balance, Kafka will assign available partitions to the available threads, possibly moving a partition to another process.
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {
public static void main(String[] args) throws Exception {
if(args.length < 2){
System.out.println("Usage: consumer <topic> <groupname>");
return;
}
String topic = args[0].toString();
String group = args[1].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
Execution
>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
此处我们创建名为 my-group 的示例组名并配有两个消费者。同样,你可以在该组中创建你的组并设置消费者的数量。
Here we have created a sample group name as my-group with two consumers. Similarly, you can create your group and number of consumers in the group.
Input
打开生产者 CLI 并发送一些消息,例如 −
Open producer CLI and send some messages like −
Test consumer group 01
Test consumer group 02
Output of the First Process
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01
Output of the Second Process
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02
现在,希望使用 Java 客户端演示理解 SimpleConsumer 和 ConsumeGroup。现在,你了解如何使用 Java 客户端发送和接收消息。让我们在下一章继续 Kafka 集成(大数据技术)。
Now hopefully you would have understood SimpleConsumer and ConsumeGroup by using the Java client demo. Now you have an idea about how to send and receive messages using a Java client. Let us continue Kafka integration with big data technologies in the next chapter.
Apache Kafka - Integration With Storm
在本章中,我们将学习如何集 Kafka 与 Apache Storm。
In this chapter, we will learn how to integrate Kafka with Apache Storm.
About Storm
Storm 最初由 Nathan Marz 和 BackType 团队创建。在很短的时间内,Apache Storm 已成为分布式实时处理系统的标准,允许你处理海量数据。Storm 非常快,基准测试证明每个节点每秒可处理超过一百万个元组。Apache Storm 连续运行,从配置的源(注水口)消耗数据,并将数据传递到处理管线(螺栓)。结合注水口和螺栓形成拓扑。
Storm was originally created by Nathan Marz and team at BackType. In a short time, Apache Storm became a standard for distributed real-time processing system that allows you to process a huge volume of data. Storm is very fast and a benchmark clocked it at over a million tuples processed per second per node. Apache Storm runs continuously, consuming data from the configured sources (Spouts) and passes the data down the processing pipeline (Bolts). Com-bined, Spouts and Bolts make a Topology.
Integration with Storm
Kafka 和 Storm 会自然地相互补充,而且它们功能强大的合作可为快速移动的大数据启用实时流分析。Kafka 和 Storm 集成更便于开发人员从 Storm 拓扑中提取和发布数据流。
Kafka and Storm naturally complement each other, and their powerful cooperation enables real-time streaming analytics for fast-moving big data. Kafka and Storm integration is to make easier for developers to ingest and publish data streams from Storm topologies.
Conceptual flow
注水口是流的来源。例如,注水口可能会从 Kafka 主题读取元组并以流形式发出它们。螺栓会消耗输入流、处理并可能发出新的流。螺栓的操作从运行函数、过滤元组到执行流聚合、流联接、与数据库通信等可谓无所不包。Storm 拓扑中的每个节点都会并行执行。拓扑会在终止前无限期运行。Storm 会自动重新分配任何失败的任务。此外,Storm 能确保即使机器宕机且消息丢失也不会造成数据丢失。
A spout is a source of streams. For example, a spout may read tuples off a Kafka Topic and emit them as a stream. A bolt consumes input streams, process and possibly emits new streams. Bolts can do anything from running functions, filtering tuples, do streaming aggregations, streaming joins, talk to databases, and more. Each node in a Storm topology executes in parallel. A topology runs indefinitely until you terminate it. Storm will automatically reassign any failed tasks. Additionally, Storm guarantees that there will be no data loss, even if the machines go down and messages are dropped.
让我们详细了解 Kafka-Storm 集成 API。有三个主要类可以将 Kafka 集成到 Storm 中。它们如下所示 −
Let us go through the Kafka-Storm integration API’s in detail. There are three main classes to integrate Kafka with Storm. They are as follows −
BrokerHosts - ZkHosts & StaticHosts
BrokerHosts 是一个接口,ZkHosts 和 StaticHosts 是其两个主要实现。ZkHosts 通过维护 ZooKeeper 中的详细信息来动态跟踪 Kafka 代理,而 StaticHosts 则用于手动/静态设置 Kafka 代理及其详细信息。ZkHosts 是访问 Kafka 代理的简单快捷方式。
BrokerHosts is an interface and ZkHosts and StaticHosts are its two main implementations. ZkHosts is used to track the Kafka brokers dynamically by maintaining the details in ZooKeeper, while StaticHosts is used to manually / statically set the Kafka brokers and its details. ZkHosts is the simple and fast way to access the Kafka broker.
ZkHosts 的签名如下 −
The signature of ZkHosts is as follows −
public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)
其中,brokerZkStr 是 ZooKeeper 主机,brokerZkPath 是 ZooKeeper 路径,用于维护 Kafka 代理详细信息。
Where brokerZkStr is ZooKeeper host and brokerZkPath is the ZooKeeper path to maintain the Kafka broker details.
KafkaConfig API
此 API 用于定义 Kafka 集群的配置设置。Kafka 配置的签名定义如下
This API is used to define configuration settings for the Kafka cluster. The signature of Kafka Con-fig is defined as follows
public KafkaConfig(BrokerHosts hosts, string topic)
SpoutConfig API
Spoutconfig 是 KafkaConfig 的扩展,支持附加的 ZooKeeper 信息。
Spoutconfig is an extension of KafkaConfig that supports additional ZooKeeper information.
public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
-
Hosts − The BrokerHosts can be any implementation of BrokerHosts interface
-
Topic − topic name.
-
zkRoot − ZooKeeper root path.
-
id − The spout stores the state of the offsets its consumed in Zookeeper. The id should uniquely identify your spout.
SchemeAsMultiScheme
SchemeAsMultiScheme 是一个接口,它规定从 Kafka 消费的 ByteBuffer 转换为 storm 元组的方式。它源自 MultiScheme 并接受 Scheme 类的实现。Scheme 类的实现有很多,其中一个实现是 StringScheme,它将字节解析为简单的字符串。它还控制着输出字段的命名。签名定义如下所示。
SchemeAsMultiScheme is an interface that dictates how the ByteBuffer consumed from Kafka gets transformed into a storm tuple. It is derived from MultiScheme and accept implementation of Scheme class. There are lot of implementation of Scheme class and one such implementation is StringScheme, which parses the byte as a simple string. It also controls the naming of your output field. The signature is defined as follows.
public SchemeAsMultiScheme(Scheme scheme)
-
Scheme − byte buffer consumed from kafka.
KafkaSpout API
KafkaSpout 是我们的流嘴实现,它将与 Storm 集成。它从 kafka 主题提取消息,并将其作为元组发送到 Storm 生态系统中。KafkaSpout 从 SpoutConfig 获取其配置详细信息。
KafkaSpout is our spout implementation, which will integrate with Storm. It fetches the mes-sages from kafka topic and emits it into Storm ecosystem as tuples. KafkaSpout get its config-uration details from SpoutConfig.
下面是创建简单 Kafka 流嘴的示例代码。
Below is a sample code to create a simple Kafka spout.
// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);
//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts,
topicName, "/" + topicName UUID.randomUUID().toString());
//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
Bolt Creation
Bolt 是一个将元组作为输入,处理元组并生成新元组作为输出的组件。Bolt 将实现 IRichBolt 接口。在此程序中,Bolt 类 WordSplitter-Bolt 和 WordCounterBolt 用于执行操作。
Bolt is a component that takes tuples as input, processes the tuple, and produces new tuples as output. Bolts will implement IRichBolt interface. In this program, two bolt classes WordSplitter-Bolt and WordCounterBolt are used to perform the operations.
IRichBolt 接口具有以下方法:
IRichBolt interface has the following methods −
-
Prepare − Provides the bolt with an environment to execute. The executors will run this method to initialize the spout.
-
Execute − Process a single tuple of input.
-
Cleanup − Called when a bolt is going to shut down.
-
declareOutputFields − Declares the output schema of the tuple.
让我们创建 SplitBolt.java,它实现将句子拆分为单词的逻辑和 CountBolt.java,它实现分离唯一单词并统计其出现的逻辑。
Let us create SplitBolt.java, which implements the logic to split a sentence into words and CountBolt.java, which implements logic to separate unique words and count its occurrence.
SplitBolt.java
import java.util.Map;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;
public class SplitBolt implements IRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for(String word: words) {
word = word.trim();
if(!word.isEmpty()) {
word = word.toLowerCase();
collector.emit(new Values(word));
}
}
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public void cleanup() {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
CountBolt.java
import java.util.Map;
import java.util.HashMap;
import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;
public class CountBolt implements IRichBolt{
Map<String, Integer> counters;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String str = input.getString(0);
if(!counters.containsKey(str)){
counters.put(str, 1);
}else {
Integer c = counters.get(str) +1;
counters.put(str, c);
}
collector.ack(input);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counters.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Submitting to Topology
Storm 拓扑基本上是一个 Thrift 结构。TopologyBuilder 类提供了简单易用的方法来创建复杂的拓扑。TopologyBuilder 类具有设置分流 (setSpout) 和设置 Bolt (setBolt) 的方法。最后,TopologyBuilder 具有 createTopology 来创建拓扑。shuffleGrouping 和 fieldsGrouping 方法有助于设置流嘴和 Bolt 的流分组。
The Storm topology is basically a Thrift structure. TopologyBuilder class provides simple and easy methods to create complex topologies. The TopologyBuilder class has methods to set spout (setSpout) and to set bolt (setBolt). Finally, TopologyBuilder has createTopology to create to-pology. shuffleGrouping and fieldsGrouping methods help to set stream grouping for spout and bolts.
Local Cluster − 出于开发目的,我们可以使用 LocalCluster 对象创建一个本地集群,然后使用 LocalCluster 类的方法 submitTopology 提交拓扑。
Local Cluster − For development purposes, we can create a local cluster using LocalCluster object and then submit the topology using submitTopology method of LocalCluster class.
KafkaStormSample.java
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;
public class KafkaStormSample {
public static void main(String[] args) throws Exception{
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
String zkConnString = "localhost:2181";
String topic = "my-first-topic";
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,
UUID.randomUUID().toString());
kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.forceFromStart = true;
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());
Thread.sleep(10000);
cluster.shutdown();
}
}
在进行编译之前,Kakfa-Storm 集成需要 curator ZooKeeper client java 库。Curator 版本 2.9.1 支持 Apache Storm 版本 0.9.5(本教程中使用)。下载下面指定的 jar 文件并将其放在 java 类路径中。
Before moving compilation, Kakfa-Storm integration needs curator ZooKeeper client java library. Curator version 2.9.1 support Apache Storm version 0.9.5 (which we use in this tutorial). Down-load the below specified jar files and place it in java class path.
-
curator-client-2.9.1.jar
-
curator-framework-2.9.1.jar
包括依赖文件后,使用以下命令编译程序,
After including dependency files, compile the program using the following command,
javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java
Execution
启动 Kafka Producer CLI(在上章中有说明),创建一个名为 my-first-topic 的新主题,并提供一些示例消息,如下所示:
Start Kafka Producer CLI (explained in previous chapter), create a new topic called my-first-topic and provide some sample messages as shown below −
hello
kafka
storm
spark
test message
another test message
现在使用以下命令执行应用程序:
Now execute the application using the following command −
java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*”:. KafkaStormSample
此应用程序的示例输出如下所示:
The sample output of this application is specified below −
storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2
Apache Kafka - Integration With Spark
在本章中,我们将讨论如何将 Apache Kafka 与 Spark Streaming API 集成。
In this chapter, we will be discussing about how to integrate Apache Kafka with Spark Streaming API.
About Spark
Spark Streaming API 能够对实时数据流进行可扩展、高吞吐量、容错的流处理。可以从多个源(如 Kafka、Flume、Twitter 等等)获取数据,并且可以使用诸如 map、reduce、join 和 window 等高级函数之类的复杂算法对数据进行处理。最后,可以将处理过的数据输出到文件系统、数据库和实时仪表板。弹性分布式数据集 (RDD) 是 Spark 的基本数据结构。它是对象的可变分布式集合。RDD 中的每个数据集都分成逻辑分区,这些分区可以在集群的不同节点上计算。
Spark Streaming API enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Twitter, etc., and can be processed using complex algorithms such as high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dash-boards. Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster.
Integration with Spark
Kafka 是 Spark 流处理的潜在消息和集成平台。Kafka 充当实时数据流的中央枢纽,并在 Spark Streaming 中使用复杂算法对这些数据流进行处理。一旦对数据进行了处理,Spark Streaming 便可以将结果发布到另一个 Kafka 主题中,或者将其存储在 HDFS、数据库或仪表板中。下图描绘了这个概念性的流程。
Kafka is a potential messaging and integration platform for Spark streaming. Kafka act as the central hub for real-time streams of data and are processed using complex algorithms in Spark Streaming. Once the data is processed, Spark Streaming could be publishing results into yet another Kafka topic or store in HDFS, databases or dashboards. The following diagram depicts the conceptual flow.
现在,让我们详细了解一下 Kafka-Spark API。
Now, let us go through Kafka-Spark API’s in detail.
SparkConf API
它表示针对 Spark 应用程序的配置。用于以键值对形式设置各种 Spark 参数。
It represents configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
SparkConf 类有以下方法:
SparkConf class has the following methods −
-
set(string key, string value) − set configuration variable.
-
remove(string key) − remove key from the configuration.
-
setAppName(string name) − set application name for your application.
-
get(string key) − get key
StreamingContext API
这是 Spark 功能的主要入口点。SparkContext 表示与 Spark 集群的连接,并且可以用它在集群上创建 RDD、累加器和广播变量。签名定义如下所示。
This is the main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on the cluster. The signature is defined as shown below.
public StreamingContext(String master, String appName, Duration batchDuration,
String sparkHome, scala.collection.Seq<String> jars,
scala.collection.Map<String,String> environment)
-
master − cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
-
appName − a name for your job, to display on the cluster web UI
-
batchDuration − the time interval at which streaming data will be divided into batches
public StreamingContext(SparkConf conf, Duration batchDuration)
通过为新的 SparkContext 提供必要的配置来创建一个 StreamingContext。
Create a StreamingContext by providing the configuration necessary for a new SparkContext.
-
conf − Spark parameters
-
batchDuration − the time interval at which streaming data will be divided into batches
KafkaUtils API
KafkaUtils API 用于将 Kafka 集群连接到 Spark 流处理。此 API 有重要的 createStream 签名方法,其定义如下。
KafkaUtils API is used to connect the Kafka cluster to Spark streaming. This API has the signifi-cant method createStream signature defined as below.
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
StreamingContext ssc, String zkQuorum, String groupId,
scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
上述方法用于创建一个输入流,从 Kafka 代理中提取消息。
The above shown method is used to Create an input stream that pulls messages from Kafka Brokers.
-
ssc − StreamingContext object.
-
zkQuorum − Zookeeper quorum.
-
groupId − The group id for this consumer.
-
topics − return a map of topics to consume.
-
storageLevel − Storage level to use for storing the received objects.
KafkaUtils API 具有另一个方法 createDirectStream,该方法用于创建输入流,直接从 Kafka 代理中提取消息,而不使用任何接收器。此流可以保证每个来自 Kafka 的消息都恰好包含在一次转换中。
KafkaUtils API has another method createDirectStream, which is used to create an input stream that directly pulls messages from Kafka Brokers without using any receiver. This stream can guarantee that each message from Kafka is included in transformations exactly once.
示例应用程序以 Scala 编写。要编译应用程序,请下载并安装 sbt(类似于 maven)scala 构建工具。主要应用程序代码如下所示。
The sample application is done in Scala. To compile the application, please download and install sbt, scala build tool (similar to maven). The main application code is presented below.
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
Build Script
spark-kafka 集成依赖于 spark、spark streaming 和 spark Kafka 集成 jar。创建一个新文件 build.sbt 并指定应用程序详细信息及其依赖项。sbt 将在编译和打包应用程序时下载必要的 jar。
The spark-kafka integration depends on the spark, spark streaming and spark Kafka integration jar. Create a new file build.sbt and specify the application details and its dependency. The sbt will download the necessary jar while compiling and packing the application.
name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
Compilation / Packaging
运行以下命令来编译和打包应用程序的 jar 文件。我们需要将 jar 文件提交到 spark 控制台中以运行应用程序。
Run the following command to compile and package the jar file of the application. We need to submit the jar file into the spark console to run the application.
sbt package
Submiting to Spark
启动 Kafka Producer CLI(在上一章中说明),创建一个名为 my-first-topic 的新主题并提供一些示例消息,如下所示。
Start Kafka Producer CLI (explained in the previous chapter), create a new topic called my-first-topic and provide some sample messages as shown below.
Another spark test message
运行以下命令将应用程序提交到 spark 控制台。
Run the following command to submit the application to spark console.
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
该应用程序的示例输出如下所示。
The sample output of this application is shown below.
spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..
Real Time Application(Twitter)
我们分析一下一个获取最新 Twitter 提要及其主题标签的实时应用程序。之前,我们已经看到过 Storm、Spark 与 Kafka 的集成。在这两种情况下,我们都创建了一个 Kafka 生产者(使用 CLI)以向 Kafka 生态系统发送消息。然后,Storm 和 Spark 集成分别使用 Kafka 消费者读取消息并将其注入 Storm 和 Spark 生态系统。所以,我们在实际中需要创建 Kafka 生产者,该生产者应:
Let us analyze a real time application to get the latest twitter feeds and its hashtags. Earlier, we have seen integration of Storm and Spark with Kafka. In both the scenarios, we created a Kafka Producer (using cli) to send message to the Kafka ecosystem. Then, the storm and spark inte-gration reads the messages by using the Kafka consumer and injects it into storm and spark ecosystem respectively. So, practically we need to create a Kafka Producer, which should −
-
Read the twitter feeds using “Twitter Streaming API”,
-
Process the feeds,
-
Extract the HashTags and
-
Send it to Kafka.
一旦 Kafka 接收了 HashTag,Storm/Spark 集成便会收到信息并将其发送至 Storm/Spark 生态系统。
Once the HashTags are received by Kafka, the Storm / Spark integration receive the infor-mation and send it to Storm / Spark ecosystem.
Twitter Streaming API
可以用任何编程语言访问“Twitter 流式传输 API”。“twitter4j”是一个开源的非官方 Java 库,它提供一个基于 Java 的模块来方便地访问“Twitter 流式传输 API”。“twitter4j”提供基于侦听器的框架来访问推文。要访问“Twitter 流式传输 API”,我们需要注册 Twitter 开发者帐户并获取以下 OAuth 认证详情。
The “Twitter Streaming API” can be accessed in any programming language. The “twitter4j” is an open source, unofficial Java library, which provides a Java based module to easily access the “Twitter Streaming API”. The “twitter4j” provides a listener based framework to access the tweets. To access the “Twitter Streaming API”, we need to sign in for Twitter developer account and should get the following OAuth authentication details.
-
Customerkey
-
CustomerSecret
-
AccessToken
-
AccessTookenSecret
创建开发人员帐户后,下载“twitter4j”JAR 文件并将其放入 Java 类路径中。
Once the developer account is created, download the “twitter4j” jar files and place it in the java class path.
完整的 Twitter Kafka 生产者编码(KafkaTwitterProducer.java)如下:
The Complete Twitter Kafka producer coding (KafkaTwitterProducer.java) is listed below −
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import twitter4j.*;
import twitter4j.conf.*;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaTwitterProducer {
public static void main(String[] args) throws Exception {
LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);
if(args.length < 5){
System.out.println(
"Usage: KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret> <twitter-access-token>
<twitter-access-token-secret>
<topic-name> <twitter-search-keywords>");
return;
}
String consumerKey = args[0].toString();
String consumerSecret = args[1].toString();
String accessToken = args[2].toString();
String accessTokenSecret = args[3].toString();
String topicName = args[4].toString();
String[] arguments = args.clone();
String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true)
.setOAuthConsumerKey(consumerKey)
.setOAuthConsumerSecret(consumerSecret)
.setOAuthAccessToken(accessToken)
.setOAuthAccessTokenSecret(accessTokenSecret);
TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
StatusListener listener = new StatusListener() {
@Override
public void onStatus(Status status) {
queue.offer(status);
// System.out.println("@" + status.getUser().getScreenName()
+ " - " + status.getText());
// System.out.println("@" + status.getUser().getScreen-Name());
/*for(URLEntity urle : status.getURLEntities()) {
System.out.println(urle.getDisplayURL());
}*/
/*for(HashtagEntity hashtage : status.getHashtagEntities()) {
System.out.println(hashtage.getText());
}*/
}
@Override
public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
// System.out.println("Got a status deletion notice id:"
+ statusDeletionNotice.getStatusId());
}
@Override
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
// System.out.println("Got track limitation notice:" +
num-berOfLimitedStatuses);
}
@Override
public void onScrubGeo(long userId, long upToStatusId) {
// System.out.println("Got scrub_geo event userId:" + userId +
"upToStatusId:" + upToStatusId);
}
@Override
public void onStallWarning(StallWarning warning) {
// System.out.println("Got stall warning:" + warning);
}
@Override
public void onException(Exception ex) {
ex.printStackTrace();
}
};
twitterStream.addListener(listener);
FilterQuery query = new FilterQuery().track(keyWords);
twitterStream.filter(query);
Thread.sleep(5000);
//Add Kafka producer config settings
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
int i = 0;
int j = 0;
while(i < 10) {
Status ret = queue.poll();
if (ret == null) {
Thread.sleep(100);
i++;
}else {
for(HashtagEntity hashtage : ret.getHashtagEntities()) {
System.out.println("Hashtag: " + hashtage.getText());
producer.send(new ProducerRecord<String, String>(
top-icName, Integer.toString(j++), hashtage.getText()));
}
}
}
producer.close();
Thread.sleep(5000);
twitterStream.shutdown();
}
}
Compilation
使用以下命令编译应用程序:
Compile the application using the following command −
javac -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:. KafkaTwitterProducer.java
Execution
打开两个控制台。在一个控制台中运行上述已编译应用程序,如下所示。
Open two consoles. Run the above compiled application as shown below in one console.
java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food
在另一个窗口中运行上一章中解释的任何 Spark/Storm 应用程序。主要的注意事项是,在两种情况下使用的主题应相同。此处,我们将“my-first-topic”用作主题名称。
Run any one of the Spark / Storm application explained in the previous chapter in another win-dow. The main point to note is that the topic used should be same in both cases. Here, we have used “my-first-topic” as the topic name.
Apache Kafka - Tools
Kafka 工具打包在 “org.apache.kafka.tools.* 下”。工具分为系统工具和复制工具。
Kafka Tool packaged under “org.apache.kafka.tools.*. Tools are categorized into system tools and replication tools.
System Tools
系统工具可以使用运行类脚本从命令行运行。语法如下 −
System tools can be run from the command line using the run class script. The syntax is as follows −
bin/kafka-run-class.sh package.class - - options
下面提到了一些系统工具 −
Some of the system tools are mentioned below −
-
Kafka Migration Tool − This tool is used to migrate a broker from one version to an-other.
-
Mirror Maker − This tool is used to provide mirroring of one Kafka cluster to another.
-
Consumer Offset Checker − This tool displays Consumer Group, Topic, Partitions, Off-set, logSize, Owner for the specified set of Topics and Consumer Group.
Replication Tool
Kafka 复制是一种高级设计工具。添加复制工具的目的是增强持久性和提高可用性。下面提到了一些复制工具 −
Kafka replication is a high level design tool. The purpose of adding replication tool is for stronger durability and higher availability. Some of the replication tools are mentioned below −
-
Create Topic Tool − This creates a topic with a default number of partitions, replication factor and uses Kafka’s default scheme to do replica assignment.
-
List Topic Tool − This tool lists the information for a given list of topics. If no topics are provided in the command line, the tool queries Zookeeper to get all the topics and lists the information for them. The fields that the tool displays are topic name, partition, leader, replicas, isr.
-
Add Partition Tool − Creation of a topic, the number of partitions for topic has to be specified. Later on, more partitions may be needed for the topic, when the volume of the topic will increase. This tool helps to add more partitions for a specific topic and also allows manual replica assignment of the added partitions.
Apache Kafka - Applications
Kafka 支持当今许多最佳工业应用程序。本章中,我们会对 Kafka 的一些最显着应用程序提供非常简要的概述。
Kafka supports many of today’s best industrial applications. We will provide a very brief overview of some of the most notable applications of Kafka in this chapter.
Twitter 是一个在线社交网络服务,提供一个平台来发送和接收用户推文。已注册的用户可以阅读并发布推文,但未注册的用户只能阅读推文。Twitter 将 Storm-Kafka 用作其流处理基础设施的一部分。
Twitter is an online social networking service that provides a platform to send and receive user tweets. Registered users can read and post tweets, but unregistered users can only read tweets. Twitter uses Storm-Kafka as a part of their stream processing infrastructure.
Apache Kafka 在 LinkedIn 用作活动流数据和运营指标。Kafka 消息系统可帮助 LinkedIn 处理各种产品,比如 LinkedIn 新闻源、LinkedIn 今日在线消息消费以及 Hadoop 等离线分析系统。Kafka 的强大持久性也是连接到 LinkedIn 的关键因素之一。
Apache Kafka is used at LinkedIn for activity stream data and operational metrics. Kafka mes-saging system helps LinkedIn with various products like LinkedIn Newsfeed, LinkedIn Today for online message consumption and in addition to offline analytics systems like Hadoop. Kafka’s strong durability is also one of the key factors in connection with LinkedIn.
Netflix
Netflix 是一家美国跨国按需互联网流媒体服务提供商。Netflix 使用 Kafka 进行实时监控和事件处理。
Netflix is an American multinational provider of on-demand Internet streaming media. Netflix uses Kafka for real-time monitoring and event processing.
Mozilla
Mozilla 是一个自由软件社区,由 Netscape 成员于 1998 年创立。Kafka 很快就会替换 Mozilla 当前的部分生产系统,以便为遥测、试点等项目收集终端用户浏览器的性能和使用数据。
Mozilla is a free-software community, created in 1998 by members of Netscape. Kafka will soon be replacing a part of Mozilla current production system to collect performance and usage data from the end-user’s browser for projects like Telemetry, Test Pilot, etc.
Oracle
Oracle 从其名为 OSB(Oracle Service Bus)的企业服务总线产品提供对 Kafka 的本机连接,该连接允许开发人员利用 OSB 内置的中介功能来实现分阶段数据管道。
Oracle provides native connectivity to Kafka from its Enterprise Service Bus product called OSB (Oracle Service Bus) which allows developers to leverage OSB built-in mediation capabilities to implement staged data pipelines.