Apache Kafka 简明教程
Apache Kafka - Simple Producer Example
让我们使用 Java 客户端创建用于发布和消费消息的应用程序。Kafka 制作者客户端由以下 API 组成。
KafkaProducer API
让我们在本节中了解最重要的 Kafka 制作者 API 的集合。KafkaProducer API 的主要部分是 KafkaProducer 类。KafkaProducer 类提供了一个选项,可以使用以下方法在其构造函数中连接 Kafka 代理。
-
KafkaProducer 类提供 send 方法,以异步方式向主题发送消息。send() 的签名如下:
producer.send(new ProducerRecord<byte[],byte[]>(topic,
partition, key1, value1) , callback);
-
ProducerRecord − 制作者管理着等待发送的记录缓冲区。
-
Callback − 在记录被服务器确认时执行的用户提供的回调(null 表示没有回调)。
-
KafkaProducer 类提供一个 flush 方法以确保所有先前发送的消息都已实际完成。flush 方法的语法如下:
public void flush()
-
KafkaProducer 类提供 partitionFor 方法,该方法有助于获取给定主题的分区元数据。这可用于自定义分区。此方法的签名如下:
public Map metrics()
它返回 制作者维护的内部度量指标映射。
-
public void close() − KafkaProducer 类提供 close 方法,它会阻塞直到所有先前发送的请求都完成。
Producer API
制作者 API 的核心部分是 制作者 类。制作者 类提供了一个选项,可以通过以下方法在其构造函数中连接 Kafka 代理。
The Producer Class
制作者 类提供 send 方法,以使用以下签名将 send 消息发送到单个或多个主题。
public void send(KeyedMessaget<k,v> message)
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);
有两种类型的制作者 - Sync 和 Async 。
相同的 API 配置也适用于同步制作者。它们之间的区别在于,同步制作者直接发送消息,但在后台发送消息。当您需要更高的吞吐量时,首选异步制作者。在 0.8 等早期版本中,异步制作者没有用于注册错误处理程序的 send() 回调。它仅在当前的 0.9 版中可用。
Configuration Settings
制作者 API 的主要配置设置列在以下表格中,以便更好地理解 -
S.No |
Configuration Settings and Description |
1 |
client.id identifies producer application |
2 |
producer.type either sync or async |
3 |
acks acks 配置控制制作者请求在何种条件下被认为是完整的。 |
4 |
retries 如果制作者请求失败,则使用特定的值自动重试。 |
5 |
bootstrap.servers bootstrapping list of brokers. |
6 |
linger.ms 如果您想减少请求的数量,您可以将 linger.ms 设置为大于某些值。 |
7 |
key.serializer 序列化接口的密钥。 |
8 |
value.serializer 序列化接口的值。 |
9 |
batch.size Buffer size. |
10 |
buffer.memory 控制可用于缓冲区的生产者的总内存量。 |
ProducerRecord API
ProducerRecord 是一对键/值,发送到 Kafka 群集。ProducerRecord 类构造函数使用以下签名,根据分区、键和值对创建记录。
public ProducerRecord (string topic, int partition, k key, v value)
-
Topic −用户的自定义主题名称,它将追加到记录中。
-
Partition − partition count
-
Key −将包含在记录中的密钥。
-
Value − Record contents
public ProducerRecord (string topic, k key, v value)
ProducerRecord 类构造函数用于创建具有键、值对但不具有分区的记录。
-
Topic −创建分配记录的主题。
-
Key −记录的密钥。
-
Value − record contents.
public ProducerRecord (string topic, v value)
ProducerRecord 类创建一个没有分区和密钥的记录。
-
Topic −创建一个主题。
-
Value − record contents.
ProducerRecord 类方法在以下表格中列出:
S.No |
Class Methods and Description |
1 |
public string topic() 主题将追加到记录。 |
2 |
public K key() 将包含在记录中的密钥。如果没有此密钥,则在此处返回空值。 |
3 |
public V value() Record contents. |
4 |
partition() 记录分区计数 |
SimpleProducer application
在创建应用程序之前,首先启动 ZooKeeper 和 Kafka 代理,然后使用 create topic 命令在 Kafka 代理中创建自己的主题。然后创建一个名为 SimpleProducer.java 的 Java 类,并输入以下代码。
//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer”
public class SimpleProducer {
public static void main(String[] args) throws Exception{
// Check arguments length value
if(args.length == 0){
System.out.println("Enter topic name");
return;
}
//Assign topicName to string variable
String topicName = args[0].toString();
// create instance for properties to access producer configs
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", “localhost:9092");
//Set acknowledgements for producer requests.
props.put("acks", “all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
System.out.println(“Message sent successfully”);
producer.close();
}
}
Compilation −可以使用以下命令编译应用程序。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
Execution −可以使用以下命令执行应用程序。
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>
Output
Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10
Simple Consumer Example
截至目前,我们已经创建了一个生产者来向 Kafka 群集发送消息。现在让我们创建一个消费者,以便从 Kafka 群集中获取消息。KafkaConsumer API 用于获取 Kafka 群集中的消息。下面定义了 KafkaConsumer 类构造函数。
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
configs − 返回消费者配置映射。
KafkaConsumer 类有下面表格中列出的几个重要的方法。
S.No |
Method and Description |
1 |
public java.util.Set<TopicPar-tition> assignment() 获取消费者当前分配的分区集合。 |
2 |
public string subscription() 订阅给定主题列表以动态分配分区。 |
3 |
public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener) 订阅给定主题列表以动态分配分区。 |
4 |
public void unsubscribe() 从给定分区列表中取消订阅主题。 |
5 |
public void sub-scribe(java.util.List<java.lang.String> topics) 订阅给定主题列表以动态分配分区。如果给定的主题列表为空,则将其视为 unsubscribe() 相同。 |
6 |
public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener) 参数模式指格式为正则表达式的订阅模式,并且监听器参数从订阅模式中获取通知。 |
7 |
public void as-sign(java.util.List<TopicParti-tion> partitions) 手动将分区列表分配给客户。 |
8 |
poll() 抓取使用某个订阅/分配 API 指定的主题或分区的数据。如果在轮询数据之前未订阅主题,这将返回错误。 |
9 |
public void commitSync() 提交在上次 poll() 中返回的所有订阅的主题和分区列表上的偏移量。相同操作应用于 commitAsyn()。 |
10 |
public void seek(TopicPartition partition, long offset) 抓取消费者将在下一个 poll() 方法上使用的当前偏移量值。 |
11 |
public void resume() 恢复已暂停的分区。 |
12 |
public void wakeup() 唤醒消费者。 |
ConsumerRecord API
ConsumerRecord API 用于从 Kafka 集群接收记录。此 API 包含一个主题名称、分区号(记录从中接收)和指向 Kafka 分区中记录的偏移量。ConsumerRecord 类用于使用特定的主题名称、分区计数和 <key, value> 对创建消费者记录。它具有以下特征。
public ConsumerRecord(string topic,int partition, long offset,K key, V value)
-
Topic − 从 Kafka 集群接收的消费者记录的主题名称。
-
Partition − 主题的分区。
-
Key − 记录的键,如果不存在键,将返回 null。
-
Value − Record contents.
ConsumerRecords API
ConsumerRecords API 充当 ConsumerRecord 的容器。此 API 用于为特定主题中的每个分区保留 ConsumerRecord 列表。其构造函数定义如下。
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
-
TopicPartition − 返回特定主题的分区映射。
-
Records − 返回 ConsumerRecord 列表。
ConsumerRecords 类定义了以下方法。
S.No |
Methods and Description |
1 |
public int count() 所有主题的记录数量。 |
2 |
public Set partitions() 此记录集中有数据的分区集合(如果没有数据返回,则集合为空)。 |
3 |
public Iterator iterator() 使用迭代器可以在集合中循环,获取或删除元素。 |
4 |
public List records() 获取给定分区记录的列表。 |
Configuration Settings
下面列出消费者客户端 API 主要配置设置的配置设置 −
S.No |
Settings and Description |
1 |
bootstrap.servers Bootstrapping list of brokers. |
2 |
group.id 将单独的消费者分配到组中。 |
3 |
enable.auto.commit 如果值为 true 则为偏移量启用自动提交,否则不进行提交。 |
4 |
auto.commit.interval.ms 返回更新的已消费偏移量写入到 ZooKeeper 的频率。 |
5 |
session.timeout.ms 指示 Kafka 将等待 ZooKeeper 的时间(以毫秒为单位)以响应请求(读取或写入),直到放弃并继续消费消息。 |
SimpleConsumer Application
此处的生产者应用程序步骤保持不变。首先,启动 ZooKeeper 和 Kafka 代理。然后使用名为 SimpleConsumer.java 的 java 类创建一个 SimpleConsumer 应用程序,并键入以下代码。
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
if(args.length == 0){
System.out.println("Enter topic name");
return;
}
//Kafka consumer configuration settings
String topicName = args[0].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName))
//print the topic name
System.out.println("Subscribed to topic " + topicName);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = con-sumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
Compilation −可以使用以下命令编译应用程序。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
*执行 − *可以使用以下命令执行应用程序
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>
Input − 打开生产者 CLI 并向主题发送一些消息。您可以将 Smple 输入作为“Hello Consumer”。
Output − 以下是输出。
Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer