Apache Kafka 简明教程

Apache Kafka - Consumer Group Example

消费者组是从 Kafka 主题进行多线程或多机器消费。

Consumer group is a multi-threaded or multi-machine consumption from Kafka topics.

Consumer Group

  1. Consumers can join a group by using the samegroup.id.

  2. The maximum parallelism of a group is that the number of consumers in the group ← no of partitions.

  3. Kafka assigns the partitions of a topic to the consumer in a group, so that each partition is consumed by exactly one consumer in the group.

  4. Kafka guarantees that a message is only ever read by a single consumer in the group.

  5. Consumers can see the message in the order they were stored in the log.

Re-balancing of a Consumer

添加更多进程/线程会导致 Kafka 重新平衡。如果任何消费者或代理未发送心跳至 ZooKeeper,则可通过 Kafka 集群重新配置它。在此重新平衡期间,Kafka 将可用的分区分配到可用的线程,可能会将某个分区移动到另一个进程。

Adding more processes/threads will cause Kafka to re-balance. If any consumer or broker fails to send heartbeat to ZooKeeper, then it can be re-configured via the Kafka cluster. During this re-balance, Kafka will assign available partitions to the available threads, possibly moving a partition to another process.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }

      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",
         "org.apache.kafka.common.serialization.ByteArraySerializer");
      props.put("value.deserializer",
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " + topic);
      int i = 0;

      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n",
               record.offset(), record.key(), record.value());
      }
   }
}

Compilation

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

Execution

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group

此处我们创建名为 my-group 的示例组名并配有两个消费者。同样,你可以在该组中创建你的组并设置消费者的数量。

Here we have created a sample group name as my-group with two consumers. Similarly, you can create your group and number of consumers in the group.

Input

打开生产者 CLI 并发送一些消息,例如 −

Open producer CLI and send some messages like −

Test consumer group 01
Test consumer group 02

Output of the First Process

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

Output of the Second Process

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

现在,希望使用 Java 客户端演示理解 SimpleConsumer 和 ConsumeGroup。现在,你了解如何使用 Java 客户端发送和接收消息。让我们在下一章继续 Kafka 集成(大数据技术)。

Now hopefully you would have understood SimpleConsumer and ConsumeGroup by using the Java client demo. Now you have an idea about how to send and receive messages using a Java client. Let us continue Kafka integration with big data technologies in the next chapter.