Message Listeners

当你使用 message listener container 时,你必须提供一个侦听器来接收数据。当前有八个受支持的邮件侦听器接口。如下清单所示这些接口:

When you use a message listener container, you must provide a listener to receive data. There are currently eight supported interfaces for message listeners. The following listing shows these interfaces:

public interface MessageListener<K, V> { 1

    void onMessage(ConsumerRecord<K, V> data);

}

public interface AcknowledgingMessageListener<K, V> { 2

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);

}

public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 3

    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);

}

public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 4

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}

public interface BatchMessageListener<K, V> { 5

    void onMessage(List<ConsumerRecord<K, V>> data);

}

public interface BatchAcknowledgingMessageListener<K, V> { 6

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);

}

public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 7

    void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);

}

public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 8

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}
1 Use this interface for processing individual ConsumerRecord instances received from the Kafka consumer poll() operation when using auto-commit or one of the container-managed commit methods.
2 Use this interface for processing individual ConsumerRecord instances received from the Kafka consumer poll() operation when using one of the manual commit methods.
3 Use this interface for processing individual ConsumerRecord instances received from the Kafka consumer poll() operation when using auto-commit or one of the container-managed commit methods. Access to the Consumer object is provided.
4 Use this interface for processing individual ConsumerRecord instances received from the Kafka consumer poll() operation when using one of the manual commit methods. Access to the Consumer object is provided.
5 Use this interface for processing all ConsumerRecord instances received from the Kafka consumer poll() operation when using auto-commit or one of the container-managed commit methods. AckMode.RECORD is not supported when you use this interface, since the listener is given the complete batch.
6 Use this interface for processing all ConsumerRecord instances received from the Kafka consumer poll() operation when using one of the manual commit methods.
7 Use this interface for processing all ConsumerRecord instances received from the Kafka consumer poll() operation when using auto-commit or one of the container-managed commit methods. AckMode.RECORD is not supported when you use this interface, since the listener is given the complete batch. Access to the Consumer object is provided.
8 Use this interface for processing all ConsumerRecord instances received from the Kafka consumer poll() operation when using one of the manual commit methods. Access to the Consumer object is provided.

Consumer 对象不是线程安全的。您必须仅在其调用侦听器的线程中调用其方法。

The Consumer object is not thread-safe. You must only invoke its methods on the thread that calls the listener.

您不应在侦听器中执行任何影响消费者位置或已提交偏移量的 Consumer<?, ?> 方法;该容器需要管理此类信息。

You should not execute any Consumer<?, ?> methods that affect the consumer’s positions or committed offsets in your listener; the container needs to manage such information.