Asynchronous @KafkaListener Return Types

从 3.2 版开始,可以使用异步返回类型指定 @KafkaListener(和 @KafkaHandler)方法,允许以异步方式发送回复。返回类型包括 CompletableFuture<?>Mono<?> 和 Kotlin suspend 函数。

@KafkaListener(id = "myListener", topics = "myTopic")
public CompletableFuture<String> listen(String data) {
    ...
    CompletableFuture<String> future = new CompletableFuture<>();
    future.complete("done");
    return future;
}
@KafkaListener(id = "myListener", topics = "myTopic")
public Mono<Void> listen(String data) {
    ...
    return Mono.empty();
}

检测到异步返回类型时,将自动为 AckMode 设置 MANUAL 并启用无序提交;异步操作完成时,异步完成将确认。如果异步结果完成时出现错误,则消息是否恢复取决于容器错误处理程序。如果在侦听器方法中发生某些异常,导致无法创建异步结果对象,则必须捕获该异常并返回适当的返回对象,它将导致该消息确认或恢复。

如果在采用异步返回类型的侦听器(包括 Kotlin 挂起函数时)配置了 KafkaListenerErrorHandler,则会在失败后调用错误处理程序。请参阅 Handling Exceptions 了解更多有关此错误处理程序及其用途的信息。