Asynchronous @KafkaListener Return Types

从 3.2 版开始,可以使用异步返回类型指定 @KafkaListener(和 @KafkaHandler)方法,允许以异步方式发送回复。返回类型包括 CompletableFuture<?>Mono<?> 和 Kotlin suspend 函数。

Starting with version 3.2, @KafkaListener (and @KafkaHandler) methods can be specified with asynchronous return types, letting the reply be sent asynchronously. return types include CompletableFuture<?>, Mono<?> and Kotlin suspend functions.

@KafkaListener(id = "myListener", topics = "myTopic")
public CompletableFuture<String> listen(String data) {
    ...
    CompletableFuture<String> future = new CompletableFuture<>();
    future.complete("done");
    return future;
}
@KafkaListener(id = "myListener", topics = "myTopic")
public Mono<Void> listen(String data) {
    ...
    return Mono.empty();
}

检测到异步返回类型时,将自动为 AckMode 设置 MANUAL 并启用无序提交;异步操作完成时,异步完成将确认。如果异步结果完成时出现错误,则消息是否恢复取决于容器错误处理程序。如果在侦听器方法中发生某些异常,导致无法创建异步结果对象,则必须捕获该异常并返回适当的返回对象,它将导致该消息确认或恢复。

The AckMode will be automatically set the MANUAL and enable out-of-order commits when async return types are detected; instead, the asynchronous completion will ack when the async operation completes. When the async result is completed with an error, whether the message is recover or not depends on the container error handler. If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be ack or recover.

如果在采用异步返回类型的侦听器(包括 Kotlin 挂起函数时)配置了 KafkaListenerErrorHandler,则会在失败后调用错误处理程序。请参阅 Handling Exceptions 了解更多有关此错误处理程序及其用途的信息。

If a KafkaListenerErrorHandler is configured on a listener with an async return type (including Kotlin suspend functions), the error handler is invoked after a failure. See Handling Exceptions for more information about this error handler and its purpose.