Apache Kafka 简明教程

Apache Kafka - Simple Producer Example

让我们使用 Java 客户端创建用于发布和消费消息的应用程序。Kafka 制作者客户端由以下 API 组成。

Let us create an application for publishing and consuming messages using a Java client. Kafka producer client consists of the following API’s.

KafkaProducer API

让我们在本节中了解最重要的 Kafka 制作者 API 的集合。KafkaProducer API 的主要部分是 KafkaProducer 类。KafkaProducer 类提供了一个选项,可以使用以下方法在其构造函数中连接 Kafka 代理。

Let us understand the most important set of Kafka producer API in this section. The central part of the KafkaProducer API is KafkaProducer class. The KafkaProducer class provides an option to connect a Kafka broker in its constructor with the following methods.

  1. KafkaProducer class provides send method to send messages asynchronously to a topic. The signature of send() is as follows

producer.send(new ProducerRecord<byte[],byte[]>(topic,
partition, key1, value1) , callback);
  1. ProducerRecord − The producer manages a buffer of records waiting to be sent.

  2. Callback − A user-supplied callback to execute when the record has been acknowl-edged by the server (null indicates no callback).

  3. KafkaProducer class provides a flush method to ensure all previously sent messages have been actually completed. Syntax of the flush method is as follows −

public void flush()
  1. KafkaProducer class provides partitionFor method, which helps in getting the partition metadata for a given topic. This can be used for custom partitioning. The signature of this method is as follows −

public Map metrics()

它返回 制作者维护的内部度量指标映射。

It returns the map of internal metrics maintained by the producer.

  1. public void close() − KafkaProducer class provides close method blocks until all previously sent requests are completed.

Producer API

制作者 API 的核心部分是 制作者 类。制作者 类提供了一个选项,可以通过以下方法在其构造函数中连接 Kafka 代理。

The central part of the Producer API is Producer class. Producer class provides an option to connect Kafka broker in its constructor by the following methods.

The Producer Class

制作者 类提供 send 方法,以使用以下签名将 send 消息发送到单个或多个主题。

The producer class provides send method to send messages to either single or multiple topics using the following signatures.

public void send(KeyedMessaget<k,v> message)
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

有两种类型的制作者 - SyncAsync

There are two types of producers – Sync and Async.

相同的 API 配置也适用于同步制作者。它们之间的区别在于,同步制作者直接发送消息,但在后台发送消息。当您需要更高的吞吐量时,首选异步制作者。在 0.8 等早期版本中,异步制作者没有用于注册错误处理程序的 send() 回调。它仅在当前的 0.9 版中可用。

The same API configuration applies to Sync producer as well. The difference between them is a sync producer sends messages directly, but sends messages in background. Async producer is preferred when you want a higher throughput. In the previous releases like 0.8, an async producer does not have a callback for send() to register error handlers. This is available only in the current release of 0.9.

public void close()

制作者 类提供 close 方法以关闭制作者池与所有 Kafka 代理的连接。

Producer class provides close method to close the producer pool connections to all Kafka bro-kers.

Configuration Settings

制作者 API 的主要配置设置列在以下表格中,以便更好地理解 -

The Producer API’s main configuration settings are listed in the following table for better under-standing −

S.No

Configuration Settings and Description

1

client.id identifies producer application

2

producer.type either sync or async

3

acks The acks config controls the criteria under producer requests are con-sidered complete.

4

retries If producer request fails, then automatically retry with specific value.

5

bootstrap.servers bootstrapping list of brokers.

6

linger.ms if you want to reduce the number of requests you can set linger.ms to something greater than some value.

7

key.serializer Key for the serializer interface.

8

value.serializer value for the serializer interface.

9

batch.size Buffer size.

10

buffer.memory controls the total amount of memory available to the producer for buff-ering.

ProducerRecord API

ProducerRecord 是一对键/值,发送到 Kafka 群集。ProducerRecord 类构造函数使用以下签名,根据分区、键和值对创建记录。

ProducerRecord is a key/value pair that is sent to Kafka cluster.ProducerRecord class constructor for creating a record with partition, key and value pairs using the following signature.

public ProducerRecord (string topic, int partition, k key, v value)
  1. Topic − user defined topic name that will appended to record.

  2. Partition − partition count

  3. Key − The key that will be included in the record.

  4. Value − Record contents

public ProducerRecord (string topic, k key, v value)

ProducerRecord 类构造函数用于创建具有键、值对但不具有分区的记录。

ProducerRecord class constructor is used to create a record with key, value pairs and without partition.

  1. Topic − Create a topic to assign record.

  2. Key − key for the record.

  3. Value − record contents.

public ProducerRecord (string topic, v value)

ProducerRecord 类创建一个没有分区和密钥的记录。

ProducerRecord class creates a record without partition and key.

  1. Topic − create a topic.

  2. Value − record contents.

ProducerRecord 类方法在以下表格中列出:

The ProducerRecord class methods are listed in the following table −

S.No

Class Methods and Description

1

public string topic() Topic will append to the record.

2

public K key() Key that will be included in the record. If no such key, null will be re-turned here.

3

public V value() Record contents.

4

partition() Partition count for the record

SimpleProducer application

在创建应用程序之前,首先启动 ZooKeeper 和 Kafka 代理,然后使用 create topic 命令在 Kafka 代理中创建自己的主题。然后创建一个名为 SimpleProducer.java 的 Java 类,并输入以下代码。

Before creating the application, first start ZooKeeper and Kafka broker then create your own topic in Kafka broker using create topic command. After that create a java class named Sim-pleProducer.java and type in the following coding.

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {

   public static void main(String[] args) throws Exception{

      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }

      //Assign topicName to string variable
      String topicName = args[0].toString();

      // create instance for properties to access producer configs
      Properties props = new Properties();

      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");

      //Set acknowledgements for producer requests.
      props.put("acks", “all");

      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);

      //Specify buffer size in config
      props.put("batch.size", 16384);

      //Reduce the no of requests less than 0
      props.put("linger.ms", 1);

      //The buffer.memory controls the total amount of memory available to the producer for buffering.
      props.put("buffer.memory", 33554432);

      props.put("key.serializer",
         "org.apache.kafka.common.serialization.StringSerializer");

      props.put("value.serializer",
         "org.apache.kafka.common.serialization.StringSerializer");

      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);

      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName,
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

Compilation −可以使用以下命令编译应用程序。

Compilation − The application can be compiled using the following command.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution −可以使用以下命令执行应用程序。

Execution − The application can be executed using the following command.

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

Output

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

Simple Consumer Example

截至目前,我们已经创建了一个生产者来向 Kafka 群集发送消息。现在让我们创建一个消费者,以便从 Kafka 群集中获取消息。KafkaConsumer API 用于获取 Kafka 群集中的消息。下面定义了 KafkaConsumer 类构造函数。

As of now we have created a producer to send messages to Kafka cluster. Now let us create a consumer to consume messages form the Kafka cluster. KafkaConsumer API is used to consume messages from the Kafka cluster. KafkaConsumer class constructor is defined below.

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs − 返回消费者配置映射。

configs − Return a map of consumer configs.

KafkaConsumer 类有下面表格中列出的几个重要的方法。

KafkaConsumer class has the following significant methods that are listed in the table below.

S.No

Method and Description

1

public java.util.Set<TopicPar-tition> assignment() Get the set of partitions currently assigned by the con-sumer.

2

public string subscription() Subscribe to the given list of topics to get dynamically as-signed partitions.

3

public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener) Subscribe to the given list of topics to get dynamically as-signed partitions.

4

public void unsubscribe() Unsubscribe the topics from the given list of partitions.

5

public void sub-scribe(java.util.List<java.lang.String> topics) Subscribe to the given list of topics to get dynamically as-signed partitions. If the given list of topics is empty, it is treated the same as unsubscribe().

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener) The argument pattern refers to the subscribing pattern in the format of regular expression and the listener argument gets notifications from the subscribing pattern.

7

public void as-sign(java.util.List<TopicParti-tion> partitions) Manually assign a list of partitions to the customer.

8

poll() Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. This will return error, if the topics are not subscribed before the polling for data.

9

public void commitSync() Commit offsets returned on the last poll() for all the sub-scribed list of topics and partitions. The same operation is applied to commitAsyn().

10

public void seek(TopicPartition partition, long offset) Fetch the current offset value that consumer will use on the next poll() method.

11

public void resume() Resume the paused partitions.

12

public void wakeup() Wakeup the consumer.

ConsumerRecord API

ConsumerRecord API 用于从 Kafka 集群接收记录。此 API 包含一个主题名称、分区号(记录从中接收)和指向 Kafka 分区中记录的偏移量。ConsumerRecord 类用于使用特定的主题名称、分区计数和 <key, value> 对创建消费者记录。它具有以下特征。

The ConsumerRecord API is used to receive records from the Kafka cluster. This API consists of a topic name, partition number, from which the record is being received and an offset that points to the record in a Kafka partition. ConsumerRecord class is used to create a consumer record with specific topic name, partition count and <key, value> pairs. It has the following signature.

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  1. Topic − The topic name for consumer record received from the Kafka cluster.

  2. Partition − Partition for the topic.

  3. Key − The key of the record, if no key exists null will be returned.

  4. Value − Record contents.

ConsumerRecords API

ConsumerRecords API 充当 ConsumerRecord 的容器。此 API 用于为特定主题中的每个分区保留 ConsumerRecord 列表。其构造函数定义如下。

ConsumerRecords API acts as a container for ConsumerRecord. This API is used to keep the list of ConsumerRecord per partition for a particular topic. Its Constructor is defined below.

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  1. TopicPartition − Return a map of partition for a particular topic.

  2. Records − Return list of ConsumerRecord.

ConsumerRecords 类定义了以下方法。

ConsumerRecords class has the following methods defined.

S.No

Methods and Description

1

public int count() The number of records for all the topics.

2

public Set partitions() The set of partitions with data in this record set (if no data was returned then the set is empty).

3

public Iterator iterator() Iterator enables you to cycle through a collection, obtaining or re-moving elements.

4

public List records() Get list of records for the given partition.

Configuration Settings

下面列出消费者客户端 API 主要配置设置的配置设置 −

The configuration settings for the Consumer client API main configuration settings are listed below −

S.No

Settings and Description

1

bootstrap.servers Bootstrapping list of brokers.

2

group.id Assigns an individual consumer to a group.

3

enable.auto.commit Enable auto commit for offsets if the value is true, otherwise not committed.

4

auto.commit.interval.ms Return how often updated consumed offsets are written to ZooKeeper.

5

session.timeout.ms Indicates how many milliseconds Kafka will wait for the ZooKeeper to respond to a request (read or write) before giving up and continuing to consume messages.

SimpleConsumer Application

此处的生产者应用程序步骤保持不变。首先,启动 ZooKeeper 和 Kafka 代理。然后使用名为 SimpleConsumer.java 的 java 类创建一个 SimpleConsumer 应用程序,并键入以下代码。

The producer application steps remain the same here. First, start your ZooKeeper and Kafka broker. Then create a SimpleConsumer application with the java class named SimpleCon-sumer.java and type the following code.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();

      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",
         "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer",
         "org.apache.kafka.common.serialization.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);

      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))

      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;

      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)

         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n",
            record.offset(), record.key(), record.value());
      }
   }
}

Compilation −可以使用以下命令编译应用程序。

Compilation − The application can be compiled using the following command.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

*执行 − *可以使用以下命令执行应用程序

*Execution − *The application can be executed using the following command

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

Input − 打开生产者 CLI 并向主题发送一些消息。您可以将 Smple 输入作为“Hello Consumer”。

Input − Open the producer CLI and send some messages to the topic. You can put the smple input as ‘Hello Consumer’.

Output − 以下是输出。

Output − Following will be the output.

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer