Message Listeners
当你使用 message listener container 时,你必须提供一个侦听器来接收数据。当前有八个受支持的邮件侦听器接口。如下清单所示这些接口:
When you use a message listener container, you must provide a listener to receive data. There are currently eight supported interfaces for message listeners. The following listing shows these interfaces:
public interface MessageListener<K, V> { 1
void onMessage(ConsumerRecord<K, V> data);
}
public interface AcknowledgingMessageListener<K, V> { 2
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 3
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 4
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
public interface BatchMessageListener<K, V> { 5
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K, V> { 6
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 7
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 8
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
1 | Use this interface for processing individual ConsumerRecord instances received from the Kafka consumer poll() operation when using auto-commit or one of the container-managed commit methods. |
2 | Use this interface for processing individual ConsumerRecord instances received from the Kafka consumer poll() operation when using one of the manual commit methods. |
3 | Use this interface for processing individual ConsumerRecord instances received from the Kafka consumer poll() operation when using auto-commit or one of the container-managed commit methods.
Access to the Consumer object is provided. |
4 | Use this interface for processing individual ConsumerRecord instances received from the Kafka consumer poll() operation when using one of the manual commit methods.
Access to the Consumer object is provided. |
5 | Use this interface for processing all ConsumerRecord instances received from the Kafka consumer poll() operation when using auto-commit or one of the container-managed commit methods.
AckMode.RECORD is not supported when you use this interface, since the listener is given the complete batch. |
6 | Use this interface for processing all ConsumerRecord instances received from the Kafka consumer poll() operation when using one of the manual commit methods. |
7 | Use this interface for processing all ConsumerRecord instances received from the Kafka consumer poll() operation when using auto-commit or one of the container-managed commit methods.
AckMode.RECORD is not supported when you use this interface, since the listener is given the complete batch.
Access to the Consumer object is provided. |
8 | Use this interface for processing all ConsumerRecord instances received from the Kafka consumer poll() operation when using one of the manual commit methods.
Access to the Consumer object is provided. |
Consumer
对象不是线程安全的。您必须仅在其调用侦听器的线程中调用其方法。
The Consumer
object is not thread-safe.
You must only invoke its methods on the thread that calls the listener.
您不应在侦听器中执行任何影响消费者位置或已提交偏移量的 Consumer<?, ?>
方法;该容器需要管理此类信息。
You should not execute any Consumer<?, ?>
methods that affect the consumer’s positions or committed offsets in your listener; the container needs to manage such information.