Apache Kafka 简明教程

Apache Kafka - Simple Producer Example

让我们使用 Java 客户端创建用于发布和消费消息的应用程序。Kafka 制作者客户端由以下 API 组成。

KafkaProducer API

让我们在本节中了解最重要的 Kafka 制作者 API 的集合。KafkaProducer API 的主要部分是 KafkaProducer 类。KafkaProducer 类提供了一个选项,可以使用以下方法在其构造函数中连接 Kafka 代理。

  1. KafkaProducer 类提供 send 方法,以异步方式向主题发送消息。send() 的签名如下:

producer.send(new ProducerRecord<byte[],byte[]>(topic,
partition, key1, value1) , callback);
  1. ProducerRecord − 制作者管理着等待发送的记录缓冲区。

  2. Callback − 在记录被服务器确认时执行的用户提供的回调(null 表示没有回调)。

  3. KafkaProducer 类提供一个 flush 方法以确保所有先前发送的消息都已实际完成。flush 方法的语法如下:

public void flush()
  1. KafkaProducer 类提供 partitionFor 方法,该方法有助于获取给定主题的分区元数据。这可用于自定义分区。此方法的签名如下:

public Map metrics()

它返回 制作者维护的内部度量指标映射。

  1. public void close() − KafkaProducer 类提供 close 方法,它会阻塞直到所有先前发送的请求都完成。

Producer API

制作者 API 的核心部分是 制作者 类。制作者 类提供了一个选项,可以通过以下方法在其构造函数中连接 Kafka 代理。

The Producer Class

制作者 类提供 send 方法,以使用以下签名将 send 消息发送到单个或多个主题。

public void send(KeyedMessaget<k,v> message)
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

有两种类型的制作者 - SyncAsync

相同的 API 配置也适用于同步制作者。它们之间的区别在于,同步制作者直接发送消息,但在后台发送消息。当您需要更高的吞吐量时,首选异步制作者。在 0.8 等早期版本中,异步制作者没有用于注册错误处理程序的 send() 回调。它仅在当前的 0.9 版中可用。

public void close()

制作者 类提供 close 方法以关闭制作者池与所有 Kafka 代理的连接。

Configuration Settings

制作者 API 的主要配置设置列在以下表格中,以便更好地理解 -

S.No

Configuration Settings and Description

1

client.id identifies producer application

2

producer.type either sync or async

3

acks acks 配置控制制作者请求在何种条件下被认为是完整的。

4

retries 如果制作者请求失败,则使用特定的值自动重试。

5

bootstrap.servers bootstrapping list of brokers.

6

linger.ms 如果您想减少请求的数量,您可以将 linger.ms 设置为大于某些值。

7

key.serializer 序列化接口的密钥。

8

value.serializer 序列化接口的值。

9

batch.size Buffer size.

10

buffer.memory 控制可用于缓冲区的生产者的总内存量。

ProducerRecord API

ProducerRecord 是一对键/值,发送到 Kafka 群集。ProducerRecord 类构造函数使用以下签名,根据分区、键和值对创建记录。

public ProducerRecord (string topic, int partition, k key, v value)
  1. Topic −用户的自定义主题名称,它将追加到记录中。

  2. Partition − partition count

  3. Key −将包含在记录中的密钥。

  4. Value − Record contents

public ProducerRecord (string topic, k key, v value)

ProducerRecord 类构造函数用于创建具有键、值对但不具有分区的记录。

  1. Topic −创建分配记录的主题。

  2. Key −记录的密钥。

  3. Value − record contents.

public ProducerRecord (string topic, v value)

ProducerRecord 类创建一个没有分区和密钥的记录。

  1. Topic −创建一个主题。

  2. Value − record contents.

ProducerRecord 类方法在以下表格中列出:

S.No

Class Methods and Description

1

public string topic() 主题将追加到记录。

2

public K key() 将包含在记录中的密钥。如果没有此密钥,则在此处返回空值。

3

public V value() Record contents.

4

partition() 记录分区计数

SimpleProducer application

在创建应用程序之前,首先启动 ZooKeeper 和 Kafka 代理,然后使用 create topic 命令在 Kafka 代理中创建自己的主题。然后创建一个名为 SimpleProducer.java 的 Java 类,并输入以下代码。

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {

   public static void main(String[] args) throws Exception{

      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }

      //Assign topicName to string variable
      String topicName = args[0].toString();

      // create instance for properties to access producer configs
      Properties props = new Properties();

      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");

      //Set acknowledgements for producer requests.
      props.put("acks", “all");

      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);

      //Specify buffer size in config
      props.put("batch.size", 16384);

      //Reduce the no of requests less than 0
      props.put("linger.ms", 1);

      //The buffer.memory controls the total amount of memory available to the producer for buffering.
      props.put("buffer.memory", 33554432);

      props.put("key.serializer",
         "org.apache.kafka.common.serialization.StringSerializer");

      props.put("value.serializer",
         "org.apache.kafka.common.serialization.StringSerializer");

      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);

      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName,
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

Compilation −可以使用以下命令编译应用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution −可以使用以下命令执行应用程序。

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

Output

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

Simple Consumer Example

截至目前,我们已经创建了一个生产者来向 Kafka 群集发送消息。现在让我们创建一个消费者,以便从 Kafka 群集中获取消息。KafkaConsumer API 用于获取 Kafka 群集中的消息。下面定义了 KafkaConsumer 类构造函数。

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs − 返回消费者配置映射。

KafkaConsumer 类有下面表格中列出的几个重要的方法。

S.No

Method and Description

1

public java.util.Set&lt;TopicPar-tition&gt; assignment() 获取消费者当前分配的分区集合。

2

public string subscription() 订阅给定主题列表以动态分配分区。

3

public void sub-scribe(java.util.List&lt;java.lang.String&gt; topics, ConsumerRe-balanceListener listener) 订阅给定主题列表以动态分配分区。

4

public void unsubscribe() 从给定分区列表中取消订阅主题。

5

public void sub-scribe(java.util.List&lt;java.lang.String&gt; topics) 订阅给定主题列表以动态分配分区。如果给定的主题列表为空,则将其视为 unsubscribe() 相同。

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener) 参数模式指格式为正则表达式的订阅模式,并且监听器参数从订阅模式中获取通知。

7

public void as-sign(java.util.List&lt;TopicParti-tion&gt; partitions) 手动将分区列表分配给客户。

8

poll() 抓取使用某个订阅/分配 API 指定的主题或分区的数据。如果在轮询数据之前未订阅主题,这将返回错误。

9

public void commitSync() 提交在上次 poll() 中返回的所有订阅的主题和分区列表上的偏移量。相同操作应用于 commitAsyn()。

10

public void seek(TopicPartition partition, long offset) 抓取消费者将在下一个 poll() 方法上使用的当前偏移量值。

11

public void resume() 恢复已暂停的分区。

12

public void wakeup() 唤醒消费者。

ConsumerRecord API

ConsumerRecord API 用于从 Kafka 集群接收记录。此 API 包含一个主题名称、分区号(记录从中接收)和指向 Kafka 分区中记录的偏移量。ConsumerRecord 类用于使用特定的主题名称、分区计数和 <key, value> 对创建消费者记录。它具有以下特征。

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  1. Topic − 从 Kafka 集群接收的消费者记录的主题名称。

  2. Partition − 主题的分区。

  3. Key − 记录的键,如果不存在键,将返回 null。

  4. Value − Record contents.

ConsumerRecords API

ConsumerRecords API 充当 ConsumerRecord 的容器。此 API 用于为特定主题中的每个分区保留 ConsumerRecord 列表。其构造函数定义如下。

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  1. TopicPartition − 返回特定主题的分区映射。

  2. Records − 返回 ConsumerRecord 列表。

ConsumerRecords 类定义了以下方法。

S.No

Methods and Description

1

public int count() 所有主题的记录数量。

2

public Set partitions() 此记录集中有数据的分区集合(如果没有数据返回,则集合为空)。

3

public Iterator iterator() 使用迭代器可以在集合中循环,获取或删除元素。

4

public List records() 获取给定分区记录的列表。

Configuration Settings

下面列出消费者客户端 API 主要配置设置的配置设置 −

S.No

Settings and Description

1

bootstrap.servers Bootstrapping list of brokers.

2

group.id 将单独的消费者分配到组中。

3

enable.auto.commit 如果值为 true 则为偏移量启用自动提交,否则不进行提交。

4

auto.commit.interval.ms 返回更新的已消费偏移量写入到 ZooKeeper 的频率。

5

session.timeout.ms 指示 Kafka 将等待 ZooKeeper 的时间(以毫秒为单位)以响应请求(读取或写入),直到放弃并继续消费消息。

SimpleConsumer Application

此处的生产者应用程序步骤保持不变。首先,启动 ZooKeeper 和 Kafka 代理。然后使用名为 SimpleConsumer.java 的 java 类创建一个 SimpleConsumer 应用程序,并键入以下代码。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();

      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",
         "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer",
         "org.apache.kafka.common.serialization.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);

      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))

      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;

      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)

         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n",
            record.offset(), record.key(), record.value());
      }
   }
}

Compilation −可以使用以下命令编译应用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

*执行 − *可以使用以下命令执行应用程序

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

Input − 打开生产者 CLI 并向主题发送一些消息。您可以将 Smple 输入作为“Hello Consumer”。

Output − 以下是输出。

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer