Apache Kafka 简明教程
Apache Kafka - Consumer Group Example
消费者组是从 Kafka 主题进行多线程或多机器消费。
Consumer group is a multi-threaded or multi-machine consumption from Kafka topics.
Consumer Group
-
Consumers can join a group by using the samegroup.id.
-
The maximum parallelism of a group is that the number of consumers in the group ← no of partitions.
-
Kafka assigns the partitions of a topic to the consumer in a group, so that each partition is consumed by exactly one consumer in the group.
-
Kafka guarantees that a message is only ever read by a single consumer in the group.
-
Consumers can see the message in the order they were stored in the log.
Re-balancing of a Consumer
添加更多进程/线程会导致 Kafka 重新平衡。如果任何消费者或代理未发送心跳至 ZooKeeper,则可通过 Kafka 集群重新配置它。在此重新平衡期间,Kafka 将可用的分区分配到可用的线程,可能会将某个分区移动到另一个进程。
Adding more processes/threads will cause Kafka to re-balance. If any consumer or broker fails to send heartbeat to ZooKeeper, then it can be re-configured via the Kafka cluster. During this re-balance, Kafka will assign available partitions to the available threads, possibly moving a partition to another process.
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {
public static void main(String[] args) throws Exception {
if(args.length < 2){
System.out.println("Usage: consumer <topic> <groupname>");
return;
}
String topic = args[0].toString();
String group = args[1].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
Execution
>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
此处我们创建名为 my-group 的示例组名并配有两个消费者。同样,你可以在该组中创建你的组并设置消费者的数量。
Here we have created a sample group name as my-group with two consumers. Similarly, you can create your group and number of consumers in the group.
Input
打开生产者 CLI 并发送一些消息,例如 −
Open producer CLI and send some messages like −
Test consumer group 01
Test consumer group 02
Output of the First Process
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01
Output of the Second Process
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02
现在,希望使用 Java 客户端演示理解 SimpleConsumer 和 ConsumeGroup。现在,你了解如何使用 Java 客户端发送和接收消息。让我们在下一章继续 Kafka 集成(大数据技术)。
Now hopefully you would have understood SimpleConsumer and ConsumeGroup by using the Java client demo. Now you have an idea about how to send and receive messages using a Java client. Let us continue Kafka integration with big data technologies in the next chapter.