Apache Kafka 简明教程

Apache Kafka - Basic Operations

首先让我们开始实现单节点单代理配置,然后我们将设置迁移到单节点多代理配置。

First let us start implementing single node-single broker configuration and we will then migrate our setup to single node-multiple brokers configuration.

希望你现在已经安装了 Java、ZooKeeper 和 Kafka。在转到 Kafka 集群设置之前,首先需要启动 ZooKeeper,因为 Kafka 集群使用 ZooKeeper。

Hopefully you would have installed Java, ZooKeeper and Kafka on your machine by now. Before moving to the Kafka Cluster Setup, first you would need to start your ZooKeeper because Kafka Cluster uses ZooKeeper.

Start ZooKeeper

打开一个新的终端并输入以下命令 -

Open a new terminal and type the following command −

bin/zookeeper-server-start.sh config/zookeeper.properties

要启动 Kafka 代理,请键入以下命令 -

To start Kafka Broker, type the following command −

bin/kafka-server-start.sh config/server.properties

启动 Kafka 代理后,在 ZooKeeper 终端上输入命令 jps,您将看到以下响应 -

After starting Kafka Broker, type the command jps on ZooKeeper terminal and you would see the following response −

821 QuorumPeerMain
928 Kafka
931 Jps

现在您可以在终端上看到两个正在运行的守护程序,其中 QuorumPeerMain 是 ZooKeeper 守护程序,另一个是 Kafka 守护程序。

Now you could see two daemons running on the terminal where QuorumPeerMain is ZooKeeper daemon and another one is Kafka daemon.

Single Node-Single Broker Configuration

在此配置中,您有一个 ZooKeeper 和经纪人 ID 实例。以下是如何配置它的步骤 -

In this configuration you have a single ZooKeeper and broker id instance. Following are the steps to configure it −

Creating a Kafka Topic - Kafka 提供了一个名为 kafka-topics.sh 的命令行实用程序,用于在服务器上创建主题。打开一个新的终端并输入以下示例。

Creating a Kafka Topic − Kafka provides a command line utility named kafka-topics.sh to create topics on the server. Open new terminal and type the below example.

Syntax

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic Hello-Kafka

我们刚刚创建了一个名为 Hello-Kafka 的主题,其中只有一个分区和一个副本因子。上述创建的输出将类似于以下输出 -

We just created a topic named Hello-Kafka with a single partition and one replica factor. The above created output will be similar to the following output −

Output - 创建了主题 Hello-Kafka

Output − Created topic Hello-Kafka

在创建主题后,您可以在 Kafka 经纪人终端窗口中获取通知,并在 config/server.properties 文件中指定在“/tmp/kafka-logs/”中创建的主题的日志。

Once the topic has been created, you can get the notification in Kafka broker terminal window and the log for the created topic specified in “/tmp/kafka-logs/“ in the config/server.properties file.

List of Topics

要获取 Kafka 服务器中的主题列表,可以使用以下命令 -

To get a list of topics in Kafka server, you can use the following command −

Syntax

Syntax

bin/kafka-topics.sh --list --zookeeper localhost:2181

Output

Hello-Kafka

由于我们已创建了一个主题,它将仅列出 Hello-Kafka。假设,如果你创建多个主题,你会在输出中获得主题名称。

Since we have created a topic, it will list out Hello-Kafka only. Suppose, if you create more than one topics, you will get the topic names in the output.

Start Producer to Send Messages

Syntax

Syntax

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

从上述语法中,生产者命令行客户端需要两个主要参数 −

From the above syntax, two main parameters are required for the producer command line client −

Broker-list − 我们希望向其发送消息的代理列表。在此情况下,我们只有一个代理。Config/server.properties 文件包含代理端口 ID,因为我们知道我们的代理在端口 9092 上侦听,因此你可以直接指定它。

Broker-list − The list of brokers that we want to send the messages to. In this case we only have one broker. The Config/server.properties file contains broker port id, since we know our broker is listening on port 9092, so you can specify it directly.

主题名称 − 以下是主题名称的示例。

Topic name − Here is an example for the topic name.

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

生产者将等待来自标准输入的输入,并将其发布到 Kafka 集群。默认情况下,每行都将发布为一条新消息,然后在 config/producer.properties 文件中指定默认生产者属性。现在,你可以在终端中键入几行消息,如下所示。

The producer will wait on input from stdin and publishes to the Kafka cluster. By default, every new line is published as a new message then the default producer properties are specified in config/producer.properties file. Now you can type a few lines of messages in the terminal as shown below.

Output

$ bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Hello-Kafka[2016-01-16 13:50:45,931]
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

Start Consumer to Receive Messages

与生产者类似,默认消费者属性在 config/consumer.proper-ties 文件中指定。打开一个新终端,并键入以下语法以使用消息。

Similar to producer, the default consumer properties are specified in config/consumer.proper-ties file. Open a new terminal and type the below syntax for consuming messages.

Syntax

Syntax

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name
--from-beginning

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka
--from-beginning

Output

Hello
My first message
My second message

最后,你可以从生产者的终端输入消息,并看到它们出现在消费者的终端中。截至目前,你对具有单个代理的单节点集群有很好的理解。现在让我们转向多代理配置。

Finally, you are able to enter messages from the producer’s terminal and see them appearing in the consumer’s terminal. As of now, you have a very good understanding on the single node cluster with a single broker. Let us now move on to the multiple brokers configuration.

Single Node-Multiple Brokers Configuration

在继续进行多代理集群设置之前,首先启动 ZooKeeper 服务器。

Before moving on to the multiple brokers cluster setup, first start your ZooKeeper server.

Create Multiple Kafka Brokers − 我们已经在 con-fig/server.properties 中拥有一个 Kafka 代理实例。现在我们需要多个代理实例,因此将现有 server.prop-erties 文件复制到两个新配置文件中,并将其重命名为 server-one.properties 和 server-two.prop-erties。然后编辑这两个新文件,并分配以下更改 −

Create Multiple Kafka Brokers − We have one Kafka broker instance already in con-fig/server.properties. Now we need multiple broker instances, so copy the existing server.prop-erties file into two new config files and rename it as server-one.properties and server-two.prop-erties. Then edit both new files and assign the following changes −

config/server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config/server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

Start Multiple Brokers − 在三台服务器上进行所有更改后,打开三个新终端来逐个启动每个代理。

Start Multiple Brokers− After all the changes have been made on three servers then open three new terminals to start each broker one by one.

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

现在,我们有三个不同的代理在机器上运行。尝试自己通过在 ZooKeeper 终端上键入 jps 来检查所有守护程序,然后你将看到响应。

Now we have three different brokers running on the machine. Try it by yourself to check all the daemons by typing jps on the ZooKeeper terminal, then you would see the response.

Creating a Topic

让我们为此主题分配 3 的复制因子值,因为我们有三个不同的代理在运行。如果你有两个代理,那么指定的副本值将为 2。

Let us assign the replication factor value as three for this topic because we have three different brokers running. If you have two brokers, then the assigned replica value will be two.

Syntax

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic Multibrokerapplication

Output

created topic “Multibrokerapplication”

Describe 命令用于检查哪个代理正在侦听当前创建的主题,如下所示 −

The Describe command is used to check which broker is listening on the current created topic as shown below −

bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation

Output

bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1
ReplicationFactor:3 Configs:

Topic:Multibrokerapplication Partition:0 Leader:0
Replicas:0,2,1 Isr:0,2,1

从上述输出中,我们可以得出结论,第一行概述了所有分区,显示了主题名称、分区数量以及我们已经选择的复制因子。在第二行中,每个节点将成为随机选择的分区部分的领导者。

From the above output, we can conclude that first line gives a summary of all the partitions, showing topic name, partition count and the replication factor that we have chosen already. In the second line, each node will be the leader for a randomly selected portion of the partitions.

在我们的案例中,我们看到我们的第一个代理(代理 ID 为 0)是领导者。然后 Replicas:0,2,1 表示所有代理最终会复制该主题。Isr 是同步副本的集合。那么,这是当前处于正常状态并且被领导者赶上的副本的子集。

In our case, we see that our first broker (with broker.id 0) is the leader. Then Replicas:0,2,1 means that all the brokers replicate the topic finally Isr is the set of in-sync replicas. Well, this is the subset of replicas that are currently alive and caught up by the leader.

Start Producer to Send Messages

此过程与单代理设置中的过程相同。

This procedure remains the same as in the single broker setup.

Example

bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Multibrokerapplication

Output

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

Start Consumer to Receive Messages

此过程与在单代理设置中显示的过程相同。

This procedure remains the same as shown in the single broker setup.

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion --from-beginning

Output

bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

Basic Topic Operations

在本节中,我们将讨论各个基本主题操作。

In this chapter we will discuss the various basic topic operations.

Modifying a Topic

正如你已经了解如何在 Kafka 集群中创建主题。现在让我们使用以下命令修改已创建的主题

As you have already understood how to create a topic in Kafka Cluster. Now let us modify a created topic using the following command

Syntax

Syntax

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name
--parti-tions count

Example

We have already created a topic “Hello-Kafka” with single partition count and one replica factor.
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181
--alter --topic Hello-kafka --parti-tions 2

Output

WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

Deleting a Topic

要删除主题,可以使用以下语法。

To delete a topic, you can use the following syntax.

Syntax

Syntax

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

Example

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

Output

> Topic Hello-kafka marked for deletion

Note −*This will have no impact if *delete.topic.enable 未设置为 true

Note −*This will have no impact if *delete.topic.enable is not set to true