Asynchronous @KafkaListener
Return Types
从 3.2 版开始,可以使用异步返回类型指定 @KafkaListener
(和 @KafkaHandler
)方法,允许以异步方式发送回复。返回类型包括 CompletableFuture<?>
、Mono<?>
和 Kotlin suspend
函数。
@KafkaListener(id = "myListener", topics = "myTopic")
public CompletableFuture<String> listen(String data) {
...
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("done");
return future;
}
@KafkaListener(id = "myListener", topics = "myTopic")
public Mono<Void> listen(String data) {
...
return Mono.empty();
}
检测到异步返回类型时,将自动为 AckMode
设置 MANUAL
并启用无序提交;异步操作完成时,异步完成将确认。如果异步结果完成时出现错误,则消息是否恢复取决于容器错误处理程序。如果在侦听器方法中发生某些异常,导致无法创建异步结果对象,则必须捕获该异常并返回适当的返回对象,它将导致该消息确认或恢复。
如果在采用异步返回类型的侦听器(包括 Kotlin 挂起函数时)配置了 KafkaListenerErrorHandler
,则会在失败后调用错误处理程序。请参阅 Handling Exceptions 了解更多有关此错误处理程序及其用途的信息。